This notebook is both a guided lesson and a working pipeline for detecting human pose landmarks from local video files or entire folders of videos using MediaPipe Tasks.
Goal¶
Set up a clean Python 3.12 environment and verify required packages.
Understand each step and terminologies.
Download and select a Pose Landmarker model (lite / full / heavy) and understand accuracy–speed trade-offs.
Read videos with OpenCV and run inference in
RunningMode.VIDEOwith correct timestamps.Export results as tidy CSVs for analysis: 2D image-normalized and 3D world landmarks.
Create an annotated MP4 showing the skeleton overlay.
Build intuition for visibility, image vs. world coordinates, and simple feature engineering (e.g., joint angles).
Built for learning: Along the way you’ll see short callouts explaining why each step exists (e.g., timestamps in VIDEO mode), how coordinate spaces differ, and how to tune speed vs. accuracy.
After completing this guide, you will be able to¶
Load one video—or loop through an entire folder—and extract the coordinates of the landmark bodypoints frame-by-frame.
Save two analysis-ready CSVs per video: one for 2D normalized landmarks and one for 3D world coordinates.
Produce an annotated MP4 with landmarks and connections overlaid.
Explain and adjust
RunningMode.VIDEO, per-frame timestamps, visibility filtering, image vs. world coordinates, and model variants (lite/full/heavy).
Prerequisites
Python 3.12 virtual environment selected as the active Jupyter kernel. In case yo8u need help, please refer to the “LS100_Guide 3_Introduction to Pose Estimation Using MediaPipe.pdf” guide.
Installed packages:
mediapipe opencv-python pandas numpy tqdm matplotlib seabornOne or more local video files (e.g.,
.mp4) to test.
Ethics & consent
If processing videos of people, obtain consent and store data securely. Avoid uploading sensitive content to third-party services.
References for learners¶
MediaPipe Pose Landmarker (Python guide): https://
ai .google .dev /edge /mediapipe /solutions /vision /pose _landmarker /python Pose Landmarker API: https://
ai .google .dev /edge /api /mediapipe /python /mp /tasks /vision /PoseLandmarker Model card (BlazePose GHUM 3D; lite/full/heavy): https://
storage .googleapis .com /mediapipe -assets /Model %20Card %20BlazePose %20GHUM %203D .pdf
0. Environment Setup and Verification (LS100 Standard)¶
Before running any code, make sure you’re using the LS100_PoseEstimation_MP kernel that was created in your Python 3.12 virtual environment. This section verifies your environment and installs all required packages.
What you should already have¶
✅ Python 3.12 installed
✅ Virtual environment activated ((MediaPipeEnv) should appear in your terminal)
✅ Kernel registered as LS100_PoseEstimation_MP
If you haven’t completed those steps, revisit the LS100_Guide 3_Introduction to Pose Estimation Using MediaPipe.pdf document.
Required packages¶
This notebook uses the following libraries:
mediapipe– pose landmark model and APIopencv-python– video I/O (input/output) and frame conversionpandas&numpy– data handling and analysistqdm– progress bars for video processingmatplotlib&seaborn– visualization and data inspection
Run the next cell to ensure these are installed and to confirm the environment details.
Learning focus¶
Why virtual environments prevent version conflicts
Why we require Python 3.12 (MediaPipe Tasks currently supports Python 3.9–3.12 only)
How each library fits into the MediaPipe Pose pipeline
0. Environment setup¶
If running locally (VS Code/Jupyter), run the following cell once; it might take about a minute to run.
# ============================================
# 0. Environment Setup and Package Verification
# ============================================
import sys
import importlib
import subprocess
# ---- 1. Check Python version ----
py_version = sys.version_info
print(f"🧠 Python version: {py_version.major}.{py_version.minor}.{py_version.micro}")
if py_version < (3, 9) or py_version >= (3, 13):
print("⚠️ MediaPipe Tasks officially supports Python 3.9–3.12.")
print("⚠️ Please switch to Python 3.12 for this notebook (as used in LS100).")
# ---- 2. Define required packages ----
required_packages = [
"mediapipe",
"opencv-python",
"pandas",
"numpy",
"tqdm",
"matplotlib",
"seaborn",
]
# ---- 3. Function to check and install ----
def install_if_missing(pkg):
"""
Try importing the package; if not found, install it quietly.
"""
try:
importlib.import_module(pkg.split("==")[0])
print(f"✅ {pkg} already installed")
except ImportError:
print(f"⬇️ Installing {pkg} ...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])
# ---- 4. Verify each dependency ----
for package in required_packages:
install_if_missing(package)
# ---- 5. Print package versions for reproducibility ----
import mediapipe as mp
import cv2, pandas as pd, numpy as np, tqdm, matplotlib, seaborn
print("\n📦 Package versions:")
print(f"mediapipe : {mp.__version__}")
print(f"opencv-python : {cv2.__version__}")
print(f"pandas : {pd.__version__}")
print(f"numpy : {np.__version__}")
print(f"matplotlib : {matplotlib.__version__}")
print(f"seaborn : {seaborn.__version__}")
print("\n✅ Environment is ready to proceed!")
🧠 Python version: 3.12.12
✅ mediapipe already installed
⬇️ Installing opencv-python ...
✅ pandas already installed
✅ numpy already installed
✅ tqdm already installed
✅ matplotlib already installed
✅ seaborn already installed
📦 Package versions:
mediapipe : 0.10.21
opencv-python : 4.11.0
pandas : 2.3.3
numpy : 1.26.4
matplotlib : 3.10.7
seaborn : 0.13.2
✅ Environment is ready to proceed!
1. Imports & version checks¶
1. Imports and Version Verification¶
Now that your environment is ready, let’s import the main libraries used throughout this notebook.
This step helps confirm that:
The correct packages are installed inside your LS100 virtual environment
MediaPipe loads successfully (and we can access its Tasks API)
OpenCV, NumPy, and Pandas are working properly
If an import fails, it usually means you’re running the notebook in a different kernel (not the one you registered). You can fix that by selecting Kernel → Change Kernel → LS100_PoseEstimation_MP (or the name you chose).
# ======================================
# 1. Import Libraries and Verify Versions (fixed for MediaPipe >=0.10)
# ======================================
import os, cv2, numpy as np, pandas as pd, matplotlib, seaborn as sns
from tqdm import tqdm
import mediapipe as mp
from mediapipe.tasks import python as mp_python
from mediapipe.tasks.python import vision as mp_vision
print("✅ MediaPipe Tasks API imported successfully!\n")
print(f"mediapipe version : {mp.__version__}")
print(f"opencv version : {cv2.__version__}")
print(f"pandas version : {pd.__version__}")
print(f"numpy version : {np.__version__}")
# Optional: check GPU availability
backend = "GPU" if cv2.cuda.getCudaEnabledDeviceCount() > 0 else "CPU"
print(f"⚙️ Running on {backend}")
# ---- Smoke test: confirm Tasks API symbols exist ----
BaseOptions = mp_python.BaseOptions
PoseLandmarker = mp_vision.PoseLandmarker
PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
RunningMode = mp_vision.RunningMode
print("\n MediaPipe Tasks API is available:")
print(f"- BaseOptions : {BaseOptions is not None}")
print(f"- PoseLandmarker : {PoseLandmarker is not None}")
print(f"- PoseLandmarkerOptions : {PoseLandmarkerOptions is not None}")
print(f"- RunningMode : {RunningMode is not None}")✅ MediaPipe Tasks API imported successfully!
mediapipe version : 0.10.21
opencv version : 4.11.0
pandas version : 2.3.3
numpy version : 1.26.4
⚙️ Running on CPU
MediaPipe Tasks API is available:
- BaseOptions : True
- PoseLandmarker : True
- PoseLandmarkerOptions : True
- RunningMode : True
Notes¶
Why this matters: ensures that the environment is truly isolated and reproducible.
Discussion prompt: Can you tell why we check MediaPipe imports before running the pipeline? (to confirm the Tasks API is available and working).
TASK: Print
mp.__file__to confirm MediaPipe’s path. This helps you understand where packages live inside the venv.
2. How Pose Landmarker works¶
Running modes:
IMAGE,VIDEO,LIVE_STREAM. For offline videos we useVIDEOand must pass a timestamp (ms) for each frame; the task uses tracking to avoid re-running the full model on every frame (reduces latency at the same accuracy settings).Outputs:
2D normalized landmarks in image coordinates (x,y in [0,1] relative to width/height; z is a depth-like value; visibility in [0,1]).
3D world landmarks (meters, origin near hip center; handy for biomechanical features).
Variants: lite / full / heavy. Heavier models = more accurate, slower (see model card).
Accuracy vs speed knobs:
num_poses(usually 1 for single-person),min_pose_detection_confidence,min_pose_presence_confidence,min_tracking_confidence, and frame stride (e.g., analyze every 2nd/3rd frame).
We’ll expose all of these transparently in helper functions below.
3. Download a Pose Landmarker model (.task bundle)¶
Choose one of: "lite", "full", "heavy" (default).
URLs follow Google’s published pattern; we try latest/… first and then fall back to version 1/….
You only need to download once; it will be cached under
models/.
# ================================
# 3. Model Selection & Download
# ================================
import os
import pathlib
import urllib.request
import urllib.error
import mediapipe as mp
from mediapipe.tasks import python as mp_python
from mediapipe.tasks.python import vision as mp_vision
# ---- Where to save models ----
MODELS_DIR = pathlib.Path("models")
MODELS_DIR.mkdir(parents=True, exist_ok=True)
# ---- Official model URLs (latest, with fallback to v1) ----
MODEL_URLS = {
"lite": [
"https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/latest/pose_landmarker_lite.task",
"https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/1/pose_landmarker_lite.task",
],
"full": [
"https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_full/float16/latest/pose_landmarker_full.task",
"https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_full/float16/1/pose_landmarker_full.task",
],
"heavy": [
"https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/latest/pose_landmarker_heavy.task",
"https://storage.googleapis.com/mediapipe-models/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task",
],
}
def download_pose_model(variant: str = "heavy") -> str:
"""
Download the selected model variant (.task) to MODELS_DIR.
Returns the local file path.
"""
variant = variant.lower().strip()
assert variant in MODEL_URLS, f"Unknown variant '{variant}'. Choose: lite, full, heavy."
out_path = MODELS_DIR / f"pose_landmarker_{variant}.task"
if out_path.exists() and out_path.stat().st_size > 50_000:
print(f"✔ Model already present: {out_path}")
return str(out_path)
last_err = None
for url in MODEL_URLS[variant]:
try:
print(f"Downloading {variant} model from:\n {url}")
with urllib.request.urlopen(url, timeout=60) as r, open(out_path, "wb") as f:
f.write(r.read())
if out_path.stat().st_size <= 50_000:
raise RuntimeError("Downloaded file seems too small; trying fallback...")
print(f"✔ Saved to {out_path} ({out_path.stat().st_size/1e6:.2f} MB)")
return str(out_path)
except Exception as e:
print(f"… failed: {e}")
last_err = e
raise RuntimeError(f"Could not download model for variant '{variant}'. Last error: {last_err}")
# ---- Choose your default model here ----
# If the previous cell set `selected_model`, use it; otherwise default to "heavy".
try:
MODEL_VARIANT = selected_model.lower().strip()
except NameError:
MODEL_VARIANT = "heavy" # default
MODEL_PATH = download_pose_model(MODEL_VARIANT)
# ---- Verify we can initialize the Pose Landmarker (VIDEO mode) ----
BaseOptions = mp_python.BaseOptions
PoseLandmarker = mp_vision.PoseLandmarker
PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
RunningMode = mp_vision.RunningMode
options = PoseLandmarkerOptions(
base_options=BaseOptions(model_asset_path=MODEL_PATH),
running_mode=RunningMode.VIDEO,
num_poses=1,
min_pose_detection_confidence=0.5,
min_pose_presence_confidence=0.5,
min_tracking_confidence=0.5,
output_segmentation_masks=False,
)
try:
with PoseLandmarker.create_from_options(options) as landmarker:
print("✅ PoseLandmarker initialized successfully (VIDEO mode).")
print(f" Model: {MODEL_VARIANT} → {MODEL_PATH}")
except Exception as e:
print("❌ Failed to initialize PoseLandmarker. Check the model file and MediaPipe version.")
raise
✔ Model already present: models/pose_landmarker_heavy.task
✅ PoseLandmarker initialized successfully (VIDEO mode).
Model: heavy → models/pose_landmarker_heavy.task
I0000 00:00:1761584076.985338 53872720 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M2 Max
W0000 00:00:1761584077.082610 53919524 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761584077.170321 53919532 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
4. VIDEO mode: timestamps & inference loop¶
For offline videos, we must use RunningMode.VIDEO and pass a monotonic timestamp (ms) for each frame:
We read frames with OpenCV, compute timestamp_ms = int((frame_idx / fps) * 1000), and call landmarker.detect_for_video(mp_image, timestamp_ms).
The Task returns normalized 2D landmarks (x, y ∈ [0,1], z depth-like, plus visibility) and world 3D landmarks (x_m, y_m, z_m in meters).
We’ll save tidy CSV files for 2D and 3D landmarks.
We’ll also write an annotated MP4 by drawing a simple skeleton over each frame.
Parameters you can tune¶
MODEL_VARIANT(lite/full/heavy),num_poses(usually 1),frame_stride(skip frames for speed),min_pose_detection_confidence,min_pose_presence_confidence,min_tracking_confidence.
5. Choose Your Parameters¶
Before running extraction, set the tunable parameters in the next cell.
These control model accuracy, processing speed, output organization, and post-processing filters (anti-jitter smoothing).
Model Variant¶
MODEL_VARIANT— choose one of:lite→ fastest but least accuratefull→ balanced (medium accuracy & speed)heavy→ most accurate (default; recommended for LS100 on modern hardware)
Changing
MODEL_VARIANTautomatically downloads the correct.taskfile to your localmodels/folder if needed.
Inference Settings¶
frame_stride— process every k-th frame1= every frame (maximum precision)2= every other frame (faster)3+= skip more frames (fastest, least temporal detail)
num_poses— number of people to detect per frameUse
1for single-person videos (default in LS100)
Confidence thresholds
min_pose_detection_confidence— confidence for detecting a posemin_pose_presence_confidence— confidence that a person is visiblemin_tracking_confidence— confidence for stable tracking across frames
Output Settings¶
make_annotated_video— ifTrue, saves an annotated.mp4showing the skeleton overlay.outputs_subdir_name— defines where outputs are saved:All CSVs and optional annotated MP4s are written to an
outputs/folder placed next to each input video (same directory).
Post-Processing Filters (Anti-Jitter)¶
After landmark extraction, you can smooth or clean the data:
visibility_thresh— discard landmarks with confidence below thresholdhampel_window/hampel_nsigmas— outlier removal using a Hampel filterRemoves sudden jumps and replaces them with local medians
rolling_window— rolling average smoother (reduces frame-to-frame jitter)
💡 Tip:
If you have a slow computer, yo can choose
MODEL_VARIANT = "lite"orframe_stride = 2to reduce load.After extraction, apply filtering to clean up the 2D CSV before using it in analysis.
The Pose Landmarker returns:
2D normalized landmarks:
(x, y ∈ [0,1]),z(depth-like, unitless),visibility(0–1 confidence).3D world landmarks:
(x_m, y_m, z_m)in meters.
Outputs:
CSV files for both 2D and 3D landmarks.
Optional annotated MP4 with the skeleton overlay.
# =========================================
# 5. Parameters — YOU CAN EDIT THIS BLOCK
# =========================================
# --- Model choice ---
MODEL_VARIANT = "heavy" # options: "lite", "full", "heavy"
# --- Inference behavior ---
frame_stride = 1 # 1=every frame; 2=every other; 3=every third, etc.
num_poses = 1 # typically 1 for single-person videos
# Confidence thresholds
min_pose_detection_confidence = 0.5
min_pose_presence_confidence = 0.5
min_tracking_confidence = 0.5
# --- Output location ---
# If you input a single video file → outputs will be saved to: <video_dir>/<outputs_subdir_name>/
# If you input a folder path → outputs will be saved to: <parent_of_folder>/<outputs_subdir_name>/
outputs_subdir_name = "outputs"
make_annotated_video = True # set False to skip saving annotated MP4s
# --- Post-processing filters (applied AFTER extraction to the 2D CSV) ---
# NOTE: Filtering improves smoothness but is slower. Turn off to speed up runs.
enable_filtering = True # ← students toggle this (True/False)
visibility_thresh = 0.5 # keep rows where visibility >= this
hampel_window = 7 # odd int (in frames); robust outlier window
hampel_nsigmas = 3.0 # sensitivity for Hampel (higher = fewer outliers)
rolling_window = 3 # odd int (in frames); centered rolling average for x,y
# ======================================================
# 5.a DO NOT EDIT BELOW — this ensures everything runs correctly
# ======================================================
from pathlib import Path
# --- Ensure filtering utility is available (only defines if missing) ---
try:
apply_filters_to_pose2d
except NameError:
import pandas as pd
from pathlib import Path
def _hampel(series: pd.Series, window: int, nsigmas: float) -> pd.Series:
med = series.rolling(window, center=True, min_periods=1).median()
diff = (series - med).abs()
mad = diff.rolling(window, center=True, min_periods=1).median()
thr = nsigmas * 1.4826 * mad.fillna(0)
outlier = diff > thr
return series.where(~outlier, med)
def apply_filters_to_pose2d(csv2d_path: str,
visibility_thresh: float = 0.5,
rolling_window: int = 3,
hampel_window: int = 7,
hampel_nsigmas: float = 3.0) -> str:
"""
Saves a filtered CSV next to the original as *_filtered.csv.
Returns the filtered path.
"""
df = pd.read_csv(csv2d_path)
if "visibility" in df.columns:
df = df[df["visibility"].fillna(0.0) >= visibility_thresh].copy()
# Sort for stable rolling ops
df = df.sort_values(["video","landmark_index","frame"])
# Hampel (robust outlier removal), then rolling mean smooth
for coord in ("x","y"):
if coord in df.columns:
df[coord] = (
df.groupby(["video","landmark_index"], group_keys=False)[coord]
.apply(lambda s: _hampel(s, hampel_window, hampel_nsigmas))
.rolling(rolling_window, center=True, min_periods=1).mean()
)
out_path = Path(csv2d_path).with_name(Path(csv2d_path).stem + "_filtered.csv")
df.to_csv(out_path, index=False)
return str(out_path)
# Ensure MODEL_PATH exists (downloaded earlier)
try:
MODEL_PATH
except NameError:
raise RuntimeError("MODEL_PATH not found. Please run the previous model download cell first.")
# --- Helper: Ensure odd window sizes for filters ---
def _ensure_odd(n: int) -> int:
try:
n = int(n)
except Exception:
return 3
return n if n % 2 == 1 else n + 1
hampel_window = _ensure_odd(hampel_window)
rolling_window = _ensure_odd(rolling_window)
# --- Helper: Resolve output folder location ---
VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".avi", ".mkv"}
def resolve_outputs_dir(input_path: str | Path, outputs_subdir_name: str = "outputs") -> Path:
"""
If input is a file (has a known video extension):
-> <file_dir>/<outputs_subdir_name>/
If input is a folder (no extension):
-> <parent_of_folder>/<outputs_subdir_name>/
"""
p = Path(input_path)
if p.is_file() or p.suffix.lower() in VIDEO_EXTS:
return p.parent / outputs_subdir_name
else:
return p.parent / outputs_subdir_name
# --- Sanity check summary ---
print("\n===== Parameter Summary =====")
print(f"MODEL_VARIANT : {MODEL_VARIANT}")
print(f"MODEL_PATH : {MODEL_PATH}")
print(f"frame_stride : {frame_stride}")
print(f"num_poses : {num_poses}")
print(f"confidences (detect,pres,track): {min_pose_detection_confidence}, "
f"{min_pose_presence_confidence}, {min_tracking_confidence}")
print(f"outputs_subdir_name : {outputs_subdir_name}")
print(f"make_annotated_video : {make_annotated_video}")
print(f"enable_filtering : {enable_filtering}")
if enable_filtering:
print(f" visibility_thresh : {visibility_thresh}")
print(f" hampel_window / nsigmas : {hampel_window} / {hampel_nsigmas}")
print(f" rolling_window : {rolling_window}")
else:
print(" ↳ Filtering is OFF (fast mode: no smoothing or visibility threshold applied)")
print("=============================\n")
===== Parameter Summary =====
MODEL_VARIANT : heavy
MODEL_PATH : models/pose_landmarker_heavy.task
frame_stride : 1
num_poses : 1
confidences (detect,pres,track): 0.5, 0.5, 0.5
outputs_subdir_name : outputs
make_annotated_video : True
enable_filtering : False
↳ Filtering is OFF (fast mode: no smoothing or visibility threshold applied)
=============================
# =========================================================
# 6. Function: Extract pose landmarks from a video file
# - Writes RAW outputs according to the new folder rules
# - Filtering (if enabled) happens in the NEXT block
# =========================================================
# Reuse landmark names if already defined; else define here.
try:
landmark_index_to_name
except NameError:
POSE_LANDMARK_NAMES = [
"nose","left_eye_inner","left_eye","left_eye_outer",
"right_eye_inner","right_eye","right_eye_outer",
"left_ear","right_ear","mouth_left","mouth_right",
"left_shoulder","right_shoulder","left_elbow","right_elbow",
"left_wrist","right_wrist","left_pinky","right_pinky",
"left_index","right_index","left_thumb","right_thumb",
"left_hip","right_hip","left_knee","right_knee",
"left_ankle","right_ankle","left_heel","right_heel",
"left_foot_index","right_foot_index",
]
landmark_index_to_name = {i: n for i, n in enumerate(POSE_LANDMARK_NAMES)}
from pathlib import Path
from typing import Optional, Union, Dict
def extract_pose_from_video(
video_path: Union[str, Path],
model_path: Union[str, Path],
make_annotated_video: bool = False,
frame_stride: int = 1,
num_poses: int = 1,
min_pose_detection_confidence: float = 0.5,
min_pose_presence_confidence: float = 0.5,
min_tracking_confidence: float = 0.5,
output_segmentation_masks: bool = False,
# If provided, write outputs here; else follow file/folder rules via resolve_outputs_dir(...)
base_outputs_dir: Optional[Union[str, Path]] = None,
) -> Dict[str, Optional[str]]:
"""
Extracts pose landmarks from a single video and saves:
- 2D CSV (RAW): <outputs>/<video_stem>_pose2d.csv
- 3D CSV (RAW): <outputs>/<video_stem>_pose3d.csv (if world landmarks available)
- MP4 (optional): <outputs>/<video_stem>_annotated.mp4
Output folder rules:
• If base_outputs_dir is given → use it.
• Else (single-file default) → <video_dir>/<outputs_subdir_name>/
(outputs_subdir_name is set in the Parameters cell).
NOTE: Any smoothing/jitter filtering is performed in the NEXT block.
"""
import cv2, numpy as np, pandas as pd
import mediapipe as mp
from mediapipe.tasks import python as mp_python
from mediapipe.tasks.python import vision as mp_vision
video_path = Path(video_path)
model_path = str(model_path)
# Determine output directory
if base_outputs_dir is not None:
out_dir = Path(base_outputs_dir)
else:
try:
out_dir = resolve_outputs_dir(video_path, outputs_subdir_name=outputs_subdir_name)
except NameError:
out_dir = video_path.parent / (outputs_subdir_name if 'outputs_subdir_name' in globals() else 'outputs')
out_dir.mkdir(parents=True, exist_ok=True)
stem = video_path.stem
csv2d = out_dir / f"{stem}_pose2d.csv"
csv3d = out_dir / f"{stem}_pose3d.csv"
mp4_out = out_dir / f"{stem}_annotated.mp4"
# Optional: echo filter toggle (if defined) for clarity
if 'enable_filtering' in globals():
print(f"[extract] enable_filtering = {enable_filtering} (filtering runs after extraction)")
# --- OpenCV video IO ---
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise FileNotFoundError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
if not fps or fps <= 1e-6:
fps = 30.0 # safe fallback
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
writer = None
if make_annotated_video:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(mp4_out), fourcc, fps / max(1, frame_stride), (width, height))
# --- MediaPipe Tasks (VIDEO mode) ---
BaseOptions = mp_python.BaseOptions
PoseLandmarker = mp_vision.PoseLandmarker
PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
RunningMode = mp_vision.RunningMode
options = PoseLandmarkerOptions(
base_options=BaseOptions(model_asset_path=model_path),
running_mode=RunningMode.VIDEO,
num_poses=num_poses,
min_pose_detection_confidence=min_pose_detection_confidence,
min_pose_presence_confidence=min_pose_presence_confidence,
min_tracking_confidence=min_tracking_confidence,
output_segmentation_masks=output_segmentation_masks,
)
# --- Helpers (image conversion + simple skeleton overlay) ---
def _mp_image_from_bgr(bgr):
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
return mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)
def _draw_skeleton(bgr, norm_landmarks, visibility_thresh: float = 0.5):
h, w = bgr.shape[:2]
pts = {}
for i, lm in enumerate(norm_landmarks):
vis = getattr(lm, "visibility", 1.0) or 0.0
if vis >= visibility_thresh:
x, y = int(lm.x * w), int(lm.y * h)
pts[i] = (x, y)
cv2.circle(bgr, (x, y), 2, (255, 255, 255), -1)
for a, b in [
(11,13),(13,15),(12,14),(14,16),(11,12),(23,24),(11,23),(12,24),
(23,25),(25,27),(24,26),(26,28),(27,29),(29,31),(28,30),(30,32)
]:
if a in pts and b in pts:
cv2.line(bgr, pts[a], pts[b], (255, 255, 255), 2)
rows2d, rows3d = [], []
with PoseLandmarker.create_from_options(options) as landmarker:
frame_idx = 0
while True:
ok, bgr = cap.read()
if not ok:
break
# Frame skipping for speed
if frame_stride > 1 and (frame_idx % frame_stride != 0):
frame_idx += 1
continue
# VIDEO mode requires monotonic ms timestamps
ts_ms = int((frame_idx / fps) * 1000.0)
mp_image = _mp_image_from_bgr(bgr)
result = landmarker.detect_for_video(mp_image, ts_ms)
for pose_id, nlands in enumerate(result.pose_landmarks):
# 2D normalized landmarks (+ visibility)
for li, lm in enumerate(nlands):
rows2d.append({
"video": video_path.name,
"frame": frame_idx,
"time_ms": ts_ms,
"landmark_index": li,
"landmark_name": landmark_index_to_name.get(li, str(li)),
"x": lm.x, "y": lm.y, "z": lm.z,
"visibility": getattr(lm, "visibility", np.nan),
})
# 3D world landmarks (meters), if available
if len(result.pose_world_landmarks) > pose_id:
wlands = result.pose_world_landmarks[pose_id]
for li, lm in enumerate(wlands):
rows3d.append({
"video": video_path.name,
"frame": frame_idx,
"time_ms": ts_ms,
"landmark_index": li,
"landmark_name": landmark_index_to_name.get(li, str(li)),
"x_m": lm.x, "y_m": lm.y, "z_m": lm.z,
"visibility": getattr(lm, "visibility", np.nan),
})
# Optional overlay
if writer is not None and len(nlands) > 0:
bgr_draw = bgr.copy()
_draw_skeleton(bgr_draw, nlands, visibility_thresh=0.5)
writer.write(bgr_draw)
frame_idx += 1
cap.release()
if writer is not None:
writer.release()
# --- Save RAW CSVs ---
import pandas as pd
pd.DataFrame(rows2d).to_csv(csv2d, index=False)
if rows3d:
pd.DataFrame(rows3d).to_csv(csv3d, index=False)
csv3d_str = str(csv3d)
else:
csv3d_str = None
# Return RAW paths; the next block may also produce a *_filtered.csv
return {
"csv2d": str(csv2d),
"csv3d": csv3d_str,
"annotated_mp4": (str(mp4_out) if make_annotated_video else None),
}
# --- Quick peek helper (unchanged) ---
def peek_csv(path, n=5):
import pandas as pd
df = pd.read_csv(path)
print(f"{path} → shape={df.shape}")
display(df.head(n))
return df
Input the path of the video or the folder containing videos¶
# =========================================
# 7. Paste your input path (file OR folder)
# =========================================
# Examples:
# input_path = "/path/to/video.mp4"
# input_path = "/path/to/folder_with_videos"
input_path = "/Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/Kevin_2022_Day5_CRNCH.mp4" # ← paste here (keep quotes)
Now, run the coe blcok below.¶
Pleae note that if you set the enable_filtering (the jitter filter) to True, it will take loger time.
# =========================================================
# 8. Run extraction and save outputs (CSV + annotated MP4s)
# - If input_path is a single video file: outputs → <video_dir>/<outputs_subdir_name>/
# - If input_path is a folder: outputs (shared) → <parent_of_folder>/<outputs_subdir_name>/
# - Writes a manifest CSV when processing a folder
# =========================================================
from pathlib import Path
import pandas as pd
if not input_path or not str(input_path).strip():
raise ValueError("Please set `input_path` in the previous cell.")
p = Path(input_path).expanduser().resolve()
# Ensure resolver exists (it was defined in §5.a)
try:
resolve_outputs_dir
except NameError:
# Minimal fallback (same logic as earlier)
VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".avi", ".mkv"}
def resolve_outputs_dir(input_path, outputs_subdir_name="outputs"):
ip = Path(input_path)
if ip.is_file() or ip.suffix.lower() in VIDEO_EXTS:
return ip.parent / outputs_subdir_name
else:
return ip.parent / outputs_subdir_name
VIDEO_EXTS = VIDEO_EXTS if "VIDEO_EXTS" in globals() else {".mp4", ".mov", ".m4v", ".avi", ".mkv"}
def _is_video_file(path: Path) -> bool:
return path.is_file() and path.suffix.lower() in VIDEO_EXTS
if _is_video_file(p):
# -------- Single video mode --------
base_out = resolve_outputs_dir(p, outputs_subdir_name)
base_out.mkdir(parents=True, exist_ok=True)
print(f"Single video detected.\nOutputs will be saved to: {base_out}")
outs = extract_pose_from_video(
video_path=str(p),
model_path=MODEL_PATH,
make_annotated_video=make_annotated_video,
frame_stride=frame_stride,
num_poses=num_poses,
min_pose_detection_confidence=min_pose_detection_confidence,
min_pose_presence_confidence=min_pose_presence_confidence,
min_tracking_confidence=min_tracking_confidence,
base_outputs_dir=base_out, # important
)
print("\n✔ Done.")
print("2D CSV :", outs.get("csv2d"))
print("3D CSV :", outs.get("csv3d"))
print("MP4 :", outs.get("annotated_mp4"))
else:
# -------- Folder mode --------
if not p.exists() or not p.is_dir():
raise NotADirectoryError(f"Not a directory: {p}")
# Shared outputs placed alongside the folder
base_out = resolve_outputs_dir(p, outputs_subdir_name)
base_out.mkdir(parents=True, exist_ok=True)
print(f"Folder detected. Outputs will be saved to: {base_out}")
# Find videos (non-recursive by default; flip to rglob for recursive)
videos = sorted([str(f) for f in p.iterdir() if _is_video_file(f)])
if not videos:
# Try recursive as a helpful fallback
videos = sorted([str(f) for f in p.rglob("*") if _is_video_file(f)])
if videos:
print(f"Found {len(videos)} video(s) (recursive search).")
else:
raise FileNotFoundError(f"No supported video files found in: {p}")
records = []
for i, vp in enumerate(videos, 1):
print(f"[{i}/{len(videos)}] {vp}")
try:
outs = extract_pose_from_video(
video_path=vp,
model_path=MODEL_PATH,
make_annotated_video=make_annotated_video,
frame_stride=frame_stride,
num_poses=num_poses,
min_pose_detection_confidence=min_pose_detection_confidence,
min_pose_presence_confidence=min_pose_presence_confidence,
min_tracking_confidence=min_tracking_confidence,
base_outputs_dir=base_out, # important
)
records.append({"video": vp, **outs, "status": "ok", "error": ""})
except Exception as e:
records.append({"video": vp, "csv2d": None, "csv3d": None,
"annotated_mp4": None, "status": "error", "error": str(e)})
manifest = base_out / "outputs_manifest.csv"
pd.DataFrame.from_records(records).to_csv(manifest, index=False)
print(f"\n✔ Batch complete. Manifest saved to: {manifest}")
Single video detected.
Outputs will be saved to: /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs
[extract] enable_filtering = False (filtering runs after extraction)
I0000 00:00:1761594301.579539 53872720 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M2 Max
W0000 00:00:1761594301.656664 54157257 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761594301.732664 54157256 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
✔ Done.
2D CSV : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_pose2d.csv
3D CSV : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_pose3d.csv
MP4 : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_annotated.mp4
Notes
2D normalized coordinates:
x,y∈[0,1]relative to image width/height (values can be outside the range if the estimated point is out-of-frame).zis depthlike (negative is closer).3D world coordinates:
x,y,zare in meters in a world coordinate space centered near the hips.visibility: confidence for each landmark’s presence in the frame.
7. Optional: compute simple joint angles¶
We will explore this in he next guide¶
Once you have landmarks, you can compute feature engineering targets like elbow or knee angles. Below is a tiny utility to compute an angle between three named landmarks per frame.
def _angle_between(a, b, c):
# a,b,c are 2D points (x,y) or 3D (x,y,z) — here we'll use 2D image coords
a, b, c = np.array(a), np.array(b), np.array(c)
ba = a - b
bc = c - b
cosang = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-9)
cosang = np.clip(cosang, -1.0, 1.0)
return np.degrees(np.arccos(cosang))
def compute_joint_angle_csv(csv2d_path: str, joint=("left_shoulder","left_elbow","left_wrist")) -> pd.DataFrame:
df = pd.read_csv(csv2d_path)
# wide pivot: columns like x_left_shoulder, y_left_shoulder, etc.
wide = df.pivot_table(index=["video","frame","time_ms"], columns="landmark_name", values=["x","y"])
# helper to get a point
def P(name):
return np.c_[wide["x"][name].values, wide["y"][name].values]
A,B,C = P(joint[0]), P(joint[1]), P(joint[2])
angles = np.array([_angle_between(a,b,c) for a,b,c in zip(A,B,C)])
out = pd.DataFrame({
"video": wide.index.get_level_values("video"),
"frame": wide.index.get_level_values("frame"),
"time_ms": wide.index.get_level_values("time_ms"),
f"angle_{'_'.join(joint)}": angles
})
return out
# Example (after extraction):
# angle_df = compute_joint_angle_csv("outputs/yourvideo_pose2d.csv", ("left_shoulder","left_elbow","left_wrist"))
# angle_df.head()
8. Notes & best practices¶
Timestamps matter: in
VIDEOmode you must passtimestamp_msthat increases with frames; we compute it from frame index and FPS.Tracking saves compute: in
VIDEO/LIVE_STREAMthe task performs pose tracking so the full model isn’t re-run every frame (helps latency).Out-of-frame landmarks: 2D normalized
x,ycan be outside[0,1]if a joint is off‑screen; usevisibilityto filter.Model choice: start with full, switch to lite for underpowered laptops or large batches, use heavy when you need the highest accuracy and can afford the speed.
Stride: a cheap speedup is
frame_stride=2(½ the frames) or higher.Ethics & consent: if students process videos of people, teach consent, privacy, and secure storage.
Troubleshooting¶
If you see
NoneTypefor results, ensure the model path exists and your video actually contains a person.If you get slowdowns or memory pressure, try
frame_stride=2or the"lite"model.On some platforms OpenCV MP4 writing may need codecs; if a saved video is empty, try a different
fourcc(e.g.,cv2.VideoWriter_fourcc(*"avc1")) or installopencv-python-headlessalternatives.
Happy exploring!