Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Pose Landmarks with MediaPipe — From Local Videos & Folders Using Python

Harvard University

This notebook is both a guided lesson and a working pipeline for detecting human pose landmarks from local video files or entire folders of videos using MediaPipe Tasks.

Goal

  1. Set up a clean Python 3.12 environment and verify required packages.

  2. Understand each step and terminologies.

  3. Download and select a Pose Landmarker model (lite / full / heavy) and understand accuracy–speed trade-offs.

  4. Read videos with OpenCV and run inference in RunningMode.VIDEO with correct timestamps.

  5. Export results as tidy CSVs for analysis: 2D image-normalized and 3D world landmarks.

  6. Create an annotated MP4 showing the skeleton overlay.

  7. Build intuition for visibility, image vs. world coordinates, and simple feature engineering (e.g., joint angles).

Built for learning: Along the way you’ll see short callouts explaining why each step exists (e.g., timestamps in VIDEO mode), how coordinate spaces differ, and how to tune speed vs. accuracy.

After completing this guide, you will be able to

  • Load one video—or loop through an entire folder—and extract the coordinates of the landmark bodypoints frame-by-frame.

  • Save two analysis-ready CSVs per video: one for 2D normalized landmarks and one for 3D world coordinates.

  • Produce an annotated MP4 with landmarks and connections overlaid.

  • Explain and adjust RunningMode.VIDEO, per-frame timestamps, visibility filtering, image vs. world coordinates, and model variants (lite/full/heavy).

Prerequisites

  • Python 3.12 virtual environment selected as the active Jupyter kernel. In case yo8u need help, please refer to the “LS100_Guide 3_Introduction to Pose Estimation Using MediaPipe.pdf” guide.

  • Installed packages: mediapipe opencv-python pandas numpy tqdm matplotlib seaborn

  • One or more local video files (e.g., .mp4) to test.

Ethics & consent

  • If processing videos of people, obtain consent and store data securely. Avoid uploading sensitive content to third-party services.

References for learners


0. Environment Setup and Verification (LS100 Standard)

Before running any code, make sure you’re using the LS100_PoseEstimation_MP kernel that was created in your Python 3.12 virtual environment. This section verifies your environment and installs all required packages.


What you should already have

✅ Python 3.12 installed

✅ Virtual environment activated ((MediaPipeEnv) should appear in your terminal)

✅ Kernel registered as LS100_PoseEstimation_MP

If you haven’t completed those steps, revisit the LS100_Guide 3_Introduction to Pose Estimation Using MediaPipe.pdf document.


Required packages

This notebook uses the following libraries:

  • mediapipe – pose landmark model and API

  • opencv-python – video I/O (input/output) and frame conversion

  • pandas & numpy – data handling and analysis

  • tqdm – progress bars for video processing

  • matplotlib & seaborn – visualization and data inspection

Run the next cell to ensure these are installed and to confirm the environment details.


Learning focus

  • Why virtual environments prevent version conflicts

  • Why we require Python 3.12 (MediaPipe Tasks currently supports Python 3.9–3.12 only)

  • How each library fits into the MediaPipe Pose pipeline


0. Environment setup

If running locally (VS Code/Jupyter), run the following cell once; it might take about a minute to run.

# ============================================
# 0. Environment Setup and Package Verification
# ============================================

import sys
import importlib
import subprocess

# ---- 1. Check Python version ----
py_version = sys.version_info
print(f"🧠 Python version: {py_version.major}.{py_version.minor}.{py_version.micro}")
if py_version < (3, 9) or py_version >= (3, 13):
    print("⚠️ MediaPipe Tasks officially supports Python 3.9–3.12.")
    print("⚠️ Please switch to Python 3.12 for this notebook (as used in LS100).")

# ---- 2. Define required packages ----
required_packages = [
    "mediapipe",
    "opencv-python",
    "pandas",
    "numpy",
    "tqdm",
    "matplotlib",
    "seaborn",
]

# ---- 3. Function to check and install ----
def install_if_missing(pkg):
    """
    Try importing the package; if not found, install it quietly.
    """
    try:
        importlib.import_module(pkg.split("==")[0])
        print(f"✅ {pkg} already installed")
    except ImportError:
        print(f"⬇️ Installing {pkg} ...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])

# ---- 4. Verify each dependency ----
for package in required_packages:
    install_if_missing(package)

# ---- 5. Print package versions for reproducibility ----
import mediapipe as mp
import cv2, pandas as pd, numpy as np, tqdm, matplotlib, seaborn

print("\n📦 Package versions:")
print(f"mediapipe      : {mp.__version__}")
print(f"opencv-python  : {cv2.__version__}")
print(f"pandas         : {pd.__version__}")
print(f"numpy          : {np.__version__}")
print(f"matplotlib     : {matplotlib.__version__}")
print(f"seaborn        : {seaborn.__version__}")

print("\n✅ Environment is ready to proceed!")
🧠 Python version: 3.12.12
✅ mediapipe already installed
⬇️ Installing opencv-python ...
✅ pandas already installed
✅ numpy already installed
✅ tqdm already installed
✅ matplotlib already installed
✅ seaborn already installed

📦 Package versions:
mediapipe      : 0.10.21
opencv-python  : 4.11.0
pandas         : 2.3.3
numpy          : 1.26.4
matplotlib     : 3.10.7
seaborn        : 0.13.2

✅ Environment is ready to proceed!

1. Imports & version checks


1. Imports and Version Verification

Now that your environment is ready, let’s import the main libraries used throughout this notebook.

This step helps confirm that:

  • The correct packages are installed inside your LS100 virtual environment

  • MediaPipe loads successfully (and we can access its Tasks API)

  • OpenCV, NumPy, and Pandas are working properly

If an import fails, it usually means you’re running the notebook in a different kernel (not the one you registered). You can fix that by selecting Kernel → Change Kernel → LS100_PoseEstimation_MP (or the name you chose).


# ======================================
# 1. Import Libraries and Verify Versions (fixed for MediaPipe >=0.10)
# ======================================

import os, cv2, numpy as np, pandas as pd, matplotlib, seaborn as sns
from tqdm import tqdm

import mediapipe as mp
from mediapipe.tasks import python as mp_python
from mediapipe.tasks.python import vision as mp_vision

print("✅ MediaPipe Tasks API imported successfully!\n")
print(f"mediapipe version : {mp.__version__}")
print(f"opencv version    : {cv2.__version__}")
print(f"pandas version    : {pd.__version__}")
print(f"numpy version     : {np.__version__}")

# Optional: check GPU availability
backend = "GPU" if cv2.cuda.getCudaEnabledDeviceCount() > 0 else "CPU"
print(f"⚙️ Running on {backend}")

# ---- Smoke test: confirm Tasks API symbols exist ----
BaseOptions = mp_python.BaseOptions
PoseLandmarker = mp_vision.PoseLandmarker
PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
RunningMode = mp_vision.RunningMode

print("\n MediaPipe Tasks API is available:")
print(f"- BaseOptions           : {BaseOptions is not None}")
print(f"- PoseLandmarker        : {PoseLandmarker is not None}")
print(f"- PoseLandmarkerOptions : {PoseLandmarkerOptions is not None}")
print(f"- RunningMode           : {RunningMode is not None}")
✅ MediaPipe Tasks API imported successfully!

mediapipe version : 0.10.21
opencv version    : 4.11.0
pandas version    : 2.3.3
numpy version     : 1.26.4
⚙️ Running on CPU

 MediaPipe Tasks API is available:
- BaseOptions           : True
- PoseLandmarker        : True
- PoseLandmarkerOptions : True
- RunningMode           : True

Notes

  • Why this matters: ensures that the environment is truly isolated and reproducible.

  • Discussion prompt: Can you tell why we check MediaPipe imports before running the pipeline? (to confirm the Tasks API is available and working).

  • TASK: Print mp.__file__ to confirm MediaPipe’s path. This helps you understand where packages live inside the venv.


2. How Pose Landmarker works

  • Running modes: IMAGE, VIDEO, LIVE_STREAM. For offline videos we use VIDEO and must pass a timestamp (ms) for each frame; the task uses tracking to avoid re-running the full model on every frame (reduces latency at the same accuracy settings).

  • Outputs:

    • 2D normalized landmarks in image coordinates (x,y in [0,1] relative to width/height; z is a depth-like value; visibility in [0,1]).

    • 3D world landmarks (meters, origin near hip center; handy for biomechanical features).

  • Variants: lite / full / heavy. Heavier models = more accurate, slower (see model card).

  • Accuracy vs speed knobs: num_poses (usually 1 for single-person), min_pose_detection_confidence, min_pose_presence_confidence, min_tracking_confidence, and frame stride (e.g., analyze every 2nd/3rd frame).

We’ll expose all of these transparently in helper functions below.

3. Download a Pose Landmarker model (.task bundle)

Choose one of: "lite", "full", "heavy" (default).
URLs follow Google’s published pattern; we try latest/… first and then fall back to version 1/….

You only need to download once; it will be cached under models/.

# ================================
# 3. Model Selection & Download
# ================================
import os
import pathlib
import urllib.request
import urllib.error

import mediapipe as mp
from mediapipe.tasks import python as mp_python
from mediapipe.tasks.python import vision as mp_vision

# ---- Where to save models ----
MODELS_DIR = pathlib.Path("models")
MODELS_DIR.mkdir(parents=True, exist_ok=True)

# ---- Official model URLs (latest, with fallback to v1) ----
MODEL_URLS = {
    "lite": [
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/latest/pose_landmarker_lite.task",
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/1/pose_landmarker_lite.task",
    ],
    "full": [
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_full/float16/latest/pose_landmarker_full.task",
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_full/float16/1/pose_landmarker_full.task",
    ],
    "heavy": [
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/latest/pose_landmarker_heavy.task",
        "https://storage.googleapis.com/mediapipe-models/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task",
    ],
}

def download_pose_model(variant: str = "heavy") -> str:
    """
    Download the selected model variant (.task) to MODELS_DIR.
    Returns the local file path.
    """
    variant = variant.lower().strip()
    assert variant in MODEL_URLS, f"Unknown variant '{variant}'. Choose: lite, full, heavy."

    out_path = MODELS_DIR / f"pose_landmarker_{variant}.task"
    if out_path.exists() and out_path.stat().st_size > 50_000:
        print(f"✔ Model already present: {out_path}")
        return str(out_path)

    last_err = None
    for url in MODEL_URLS[variant]:
        try:
            print(f"Downloading {variant} model from:\n  {url}")
            with urllib.request.urlopen(url, timeout=60) as r, open(out_path, "wb") as f:
                f.write(r.read())
            if out_path.stat().st_size <= 50_000:
                raise RuntimeError("Downloaded file seems too small; trying fallback...")
            print(f"✔ Saved to {out_path} ({out_path.stat().st_size/1e6:.2f} MB)")
            return str(out_path)
        except Exception as e:
            print(f"… failed: {e}")
            last_err = e
    raise RuntimeError(f"Could not download model for variant '{variant}'. Last error: {last_err}")

# ---- Choose your default model here ----
# If the previous cell set `selected_model`, use it; otherwise default to "heavy".
try:
    MODEL_VARIANT = selected_model.lower().strip()
except NameError:
    MODEL_VARIANT = "heavy"   # default

MODEL_PATH = download_pose_model(MODEL_VARIANT)

# ---- Verify we can initialize the Pose Landmarker (VIDEO mode) ----
BaseOptions = mp_python.BaseOptions
PoseLandmarker = mp_vision.PoseLandmarker
PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
RunningMode = mp_vision.RunningMode

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=MODEL_PATH),
    running_mode=RunningMode.VIDEO,
    num_poses=1,
    min_pose_detection_confidence=0.5,
    min_pose_presence_confidence=0.5,
    min_tracking_confidence=0.5,
    output_segmentation_masks=False,
)

try:
    with PoseLandmarker.create_from_options(options) as landmarker:
        print("✅ PoseLandmarker initialized successfully (VIDEO mode).")
        print(f"   Model: {MODEL_VARIANT} → {MODEL_PATH}")
except Exception as e:
    print("❌ Failed to initialize PoseLandmarker. Check the model file and MediaPipe version.")
    raise
✔ Model already present: models/pose_landmarker_heavy.task
✅ PoseLandmarker initialized successfully (VIDEO mode).
   Model: heavy → models/pose_landmarker_heavy.task
I0000 00:00:1761584076.985338 53872720 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M2 Max
W0000 00:00:1761584077.082610 53919524 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761584077.170321 53919532 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.

4. VIDEO mode: timestamps & inference loop

For offline videos, we must use RunningMode.VIDEO and pass a monotonic timestamp (ms) for each frame:

  • We read frames with OpenCV, compute timestamp_ms = int((frame_idx / fps) * 1000), and call landmarker.detect_for_video(mp_image, timestamp_ms).

  • The Task returns normalized 2D landmarks (x, y ∈ [0,1], z depth-like, plus visibility) and world 3D landmarks (x_m, y_m, z_m in meters).

  • We’ll save tidy CSV files for 2D and 3D landmarks.

  • We’ll also write an annotated MP4 by drawing a simple skeleton over each frame.

Parameters you can tune
  • MODEL_VARIANT (lite/full/heavy), num_poses (usually 1), frame_stride (skip frames for speed),

  • min_pose_detection_confidence, min_pose_presence_confidence, min_tracking_confidence.

5. Choose Your Parameters

Before running extraction, set the tunable parameters in the next cell.
These control model accuracy, processing speed, output organization, and post-processing filters (anti-jitter smoothing).


Model Variant

  • MODEL_VARIANT — choose one of:

    • lite → fastest but least accurate

    • full → balanced (medium accuracy & speed)

    • heavymost accurate (default; recommended for LS100 on modern hardware)

Changing MODEL_VARIANT automatically downloads the correct .task file to your local models/ folder if needed.


Inference Settings

  • frame_stride — process every k-th frame

    • 1 = every frame (maximum precision)

    • 2 = every other frame (faster)

    • 3+ = skip more frames (fastest, least temporal detail)

  • num_poses — number of people to detect per frame

    • Use 1 for single-person videos (default in LS100)

  • Confidence thresholds

    • min_pose_detection_confidence — confidence for detecting a pose

    • min_pose_presence_confidence — confidence that a person is visible

    • min_tracking_confidence — confidence for stable tracking across frames


Output Settings

  • make_annotated_video — if True, saves an annotated .mp4 showing the skeleton overlay.

  • outputs_subdir_name — defines where outputs are saved:

    • All CSVs and optional annotated MP4s are written to an outputs/ folder placed next to each input video (same directory).


Post-Processing Filters (Anti-Jitter)

After landmark extraction, you can smooth or clean the data:

  • visibility_thresh — discard landmarks with confidence below threshold

  • hampel_window / hampel_nsigmas — outlier removal using a Hampel filter

    • Removes sudden jumps and replaces them with local medians

  • rolling_window — rolling average smoother (reduces frame-to-frame jitter)

💡 Tip:

  • If you have a slow computer, yo can choose MODEL_VARIANT = "lite" or frame_stride = 2 to reduce load.

  • After extraction, apply filtering to clean up the 2D CSV before using it in analysis.


The Pose Landmarker returns:

  • 2D normalized landmarks: (x, y ∈ [0,1]), z (depth-like, unitless), visibility (0–1 confidence).

  • 3D world landmarks: (x_m, y_m, z_m) in meters.

Outputs:

  • CSV files for both 2D and 3D landmarks.

  • Optional annotated MP4 with the skeleton overlay.

# =========================================
# 5. Parameters — YOU CAN EDIT THIS BLOCK
# =========================================

# --- Model choice ---
MODEL_VARIANT = "heavy"          # options: "lite", "full", "heavy"

# --- Inference behavior ---
frame_stride = 1                 # 1=every frame; 2=every other; 3=every third, etc.
num_poses = 1                    # typically 1 for single-person videos

# Confidence thresholds
min_pose_detection_confidence = 0.5
min_pose_presence_confidence  = 0.5
min_tracking_confidence       = 0.5

# --- Output location ---
# If you input a single video file → outputs will be saved to: <video_dir>/<outputs_subdir_name>/
# If you input a folder path → outputs will be saved to: <parent_of_folder>/<outputs_subdir_name>/
outputs_subdir_name  = "outputs"
make_annotated_video = True      # set False to skip saving annotated MP4s

# --- Post-processing filters (applied AFTER extraction to the 2D CSV) ---
# NOTE: Filtering improves smoothness but is slower. Turn off to speed up runs.
enable_filtering  = True        # ← students toggle this (True/False)
visibility_thresh = 0.5          # keep rows where visibility >= this
hampel_window     = 7            # odd int (in frames); robust outlier window
hampel_nsigmas    = 3.0          # sensitivity for Hampel (higher = fewer outliers)
rolling_window    = 3            # odd int (in frames); centered rolling average for x,y



# ======================================================
# 5.a DO NOT EDIT BELOW — this ensures everything runs correctly
# ======================================================
from pathlib import Path

# --- Ensure filtering utility is available (only defines if missing) ---
try:
    apply_filters_to_pose2d
except NameError:
    import pandas as pd
    from pathlib import Path

    def _hampel(series: pd.Series, window: int, nsigmas: float) -> pd.Series:
        med = series.rolling(window, center=True, min_periods=1).median()
        diff = (series - med).abs()
        mad  = diff.rolling(window, center=True, min_periods=1).median()
        thr  = nsigmas * 1.4826 * mad.fillna(0)
        outlier = diff > thr
        return series.where(~outlier, med)

    def apply_filters_to_pose2d(csv2d_path: str,
                                visibility_thresh: float = 0.5,
                                rolling_window: int = 3,
                                hampel_window: int = 7,
                                hampel_nsigmas: float = 3.0) -> str:
        """
        Saves a filtered CSV next to the original as *_filtered.csv.
        Returns the filtered path.
        """
        df = pd.read_csv(csv2d_path)
        if "visibility" in df.columns:
            df = df[df["visibility"].fillna(0.0) >= visibility_thresh].copy()

        # Sort for stable rolling ops
        df = df.sort_values(["video","landmark_index","frame"])

        # Hampel (robust outlier removal), then rolling mean smooth
        for coord in ("x","y"):
            if coord in df.columns:
                df[coord] = (
                    df.groupby(["video","landmark_index"], group_keys=False)[coord]
                      .apply(lambda s: _hampel(s, hampel_window, hampel_nsigmas))
                      .rolling(rolling_window, center=True, min_periods=1).mean()
                )

        out_path = Path(csv2d_path).with_name(Path(csv2d_path).stem + "_filtered.csv")
        df.to_csv(out_path, index=False)
        return str(out_path)

# Ensure MODEL_PATH exists (downloaded earlier)
try:
    MODEL_PATH
except NameError:
    raise RuntimeError("MODEL_PATH not found. Please run the previous model download cell first.")

# --- Helper: Ensure odd window sizes for filters ---
def _ensure_odd(n: int) -> int:
    try:
        n = int(n)
    except Exception:
        return 3
    return n if n % 2 == 1 else n + 1

hampel_window  = _ensure_odd(hampel_window)
rolling_window = _ensure_odd(rolling_window)

# --- Helper: Resolve output folder location ---
VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".avi", ".mkv"}

def resolve_outputs_dir(input_path: str | Path, outputs_subdir_name: str = "outputs") -> Path:
    """
    If input is a file (has a known video extension):
        -> <file_dir>/<outputs_subdir_name>/
    If input is a folder (no extension):
        -> <parent_of_folder>/<outputs_subdir_name>/
    """
    p = Path(input_path)
    if p.is_file() or p.suffix.lower() in VIDEO_EXTS:
        return p.parent / outputs_subdir_name
    else:
        return p.parent / outputs_subdir_name

# --- Sanity check summary ---
print("\n===== Parameter Summary =====")
print(f"MODEL_VARIANT                  : {MODEL_VARIANT}")
print(f"MODEL_PATH                     : {MODEL_PATH}")
print(f"frame_stride                   : {frame_stride}")
print(f"num_poses                      : {num_poses}")
print(f"confidences (detect,pres,track): {min_pose_detection_confidence}, "
      f"{min_pose_presence_confidence}, {min_tracking_confidence}")
print(f"outputs_subdir_name            : {outputs_subdir_name}")
print(f"make_annotated_video           : {make_annotated_video}")
print(f"enable_filtering               : {enable_filtering}")
if enable_filtering:
    print(f"   visibility_thresh           : {visibility_thresh}")
    print(f"   hampel_window / nsigmas     : {hampel_window} / {hampel_nsigmas}")
    print(f"   rolling_window              : {rolling_window}")
else:
    print("   ↳ Filtering is OFF (fast mode: no smoothing or visibility threshold applied)")
print("=============================\n")


===== Parameter Summary =====
MODEL_VARIANT                  : heavy
MODEL_PATH                     : models/pose_landmarker_heavy.task
frame_stride                   : 1
num_poses                      : 1
confidences (detect,pres,track): 0.5, 0.5, 0.5
outputs_subdir_name            : outputs
make_annotated_video           : True
enable_filtering               : False
   ↳ Filtering is OFF (fast mode: no smoothing or visibility threshold applied)
=============================

# =========================================================
# 6. Function: Extract pose landmarks from a video file
#    - Writes RAW outputs according to the new folder rules
#    - Filtering (if enabled) happens in the NEXT block
# =========================================================

# Reuse landmark names if already defined; else define here.
try:
    landmark_index_to_name
except NameError:
    POSE_LANDMARK_NAMES = [
        "nose","left_eye_inner","left_eye","left_eye_outer",
        "right_eye_inner","right_eye","right_eye_outer",
        "left_ear","right_ear","mouth_left","mouth_right",
        "left_shoulder","right_shoulder","left_elbow","right_elbow",
        "left_wrist","right_wrist","left_pinky","right_pinky",
        "left_index","right_index","left_thumb","right_thumb",
        "left_hip","right_hip","left_knee","right_knee",
        "left_ankle","right_ankle","left_heel","right_heel",
        "left_foot_index","right_foot_index",
    ]
    landmark_index_to_name = {i: n for i, n in enumerate(POSE_LANDMARK_NAMES)}

from pathlib import Path
from typing import Optional, Union, Dict

def extract_pose_from_video(
    video_path: Union[str, Path],
    model_path: Union[str, Path],
    make_annotated_video: bool = False,
    frame_stride: int = 1,
    num_poses: int = 1,
    min_pose_detection_confidence: float = 0.5,
    min_pose_presence_confidence: float = 0.5,
    min_tracking_confidence: float = 0.5,
    output_segmentation_masks: bool = False,
    # If provided, write outputs here; else follow file/folder rules via resolve_outputs_dir(...)
    base_outputs_dir: Optional[Union[str, Path]] = None,
) -> Dict[str, Optional[str]]:
    """
    Extracts pose landmarks from a single video and saves:
      - 2D CSV (RAW):   <outputs>/<video_stem>_pose2d.csv
      - 3D CSV (RAW):   <outputs>/<video_stem>_pose3d.csv  (if world landmarks available)
      - MP4 (optional): <outputs>/<video_stem>_annotated.mp4

    Output folder rules:
      • If base_outputs_dir is given → use it.
      • Else (single-file default)   → <video_dir>/<outputs_subdir_name>/
        (outputs_subdir_name is set in the Parameters cell).

    NOTE: Any smoothing/jitter filtering is performed in the NEXT block.
    """
    import cv2, numpy as np, pandas as pd
    import mediapipe as mp
    from mediapipe.tasks import python as mp_python
    from mediapipe.tasks.python import vision as mp_vision

    video_path = Path(video_path)
    model_path = str(model_path)

    # Determine output directory
    if base_outputs_dir is not None:
        out_dir = Path(base_outputs_dir)
    else:
        try:
            out_dir = resolve_outputs_dir(video_path, outputs_subdir_name=outputs_subdir_name)
        except NameError:
            out_dir = video_path.parent / (outputs_subdir_name if 'outputs_subdir_name' in globals() else 'outputs')
    out_dir.mkdir(parents=True, exist_ok=True)

    stem = video_path.stem
    csv2d   = out_dir / f"{stem}_pose2d.csv"
    csv3d   = out_dir / f"{stem}_pose3d.csv"
    mp4_out = out_dir / f"{stem}_annotated.mp4"

    # Optional: echo filter toggle (if defined) for clarity
    if 'enable_filtering' in globals():
        print(f"[extract] enable_filtering = {enable_filtering} (filtering runs after extraction)")

    # --- OpenCV video IO ---
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise FileNotFoundError(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    if not fps or fps <= 1e-6:
        fps = 30.0  # safe fallback
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    writer = None
    if make_annotated_video:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(str(mp4_out), fourcc, fps / max(1, frame_stride), (width, height))

    # --- MediaPipe Tasks (VIDEO mode) ---
    BaseOptions = mp_python.BaseOptions
    PoseLandmarker = mp_vision.PoseLandmarker
    PoseLandmarkerOptions = mp_vision.PoseLandmarkerOptions
    RunningMode = mp_vision.RunningMode

    options = PoseLandmarkerOptions(
        base_options=BaseOptions(model_asset_path=model_path),
        running_mode=RunningMode.VIDEO,
        num_poses=num_poses,
        min_pose_detection_confidence=min_pose_detection_confidence,
        min_pose_presence_confidence=min_pose_presence_confidence,
        min_tracking_confidence=min_tracking_confidence,
        output_segmentation_masks=output_segmentation_masks,
    )

    # --- Helpers (image conversion + simple skeleton overlay) ---
    def _mp_image_from_bgr(bgr):
        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
        return mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)

    def _draw_skeleton(bgr, norm_landmarks, visibility_thresh: float = 0.5):
        h, w = bgr.shape[:2]
        pts = {}
        for i, lm in enumerate(norm_landmarks):
            vis = getattr(lm, "visibility", 1.0) or 0.0
            if vis >= visibility_thresh:
                x, y = int(lm.x * w), int(lm.y * h)
                pts[i] = (x, y)
                cv2.circle(bgr, (x, y), 2, (255, 255, 255), -1)
        for a, b in [
            (11,13),(13,15),(12,14),(14,16),(11,12),(23,24),(11,23),(12,24),
            (23,25),(25,27),(24,26),(26,28),(27,29),(29,31),(28,30),(30,32)
        ]:
            if a in pts and b in pts:
                cv2.line(bgr, pts[a], pts[b], (255, 255, 255), 2)

    rows2d, rows3d = [], []

    with PoseLandmarker.create_from_options(options) as landmarker:
        frame_idx = 0
        while True:
            ok, bgr = cap.read()
            if not ok:
                break

            # Frame skipping for speed
            if frame_stride > 1 and (frame_idx % frame_stride != 0):
                frame_idx += 1
                continue

            # VIDEO mode requires monotonic ms timestamps
            ts_ms = int((frame_idx / fps) * 1000.0)
            mp_image = _mp_image_from_bgr(bgr)
            result = landmarker.detect_for_video(mp_image, ts_ms)

            for pose_id, nlands in enumerate(result.pose_landmarks):
                # 2D normalized landmarks (+ visibility)
                for li, lm in enumerate(nlands):
                    rows2d.append({
                        "video": video_path.name,
                        "frame": frame_idx,
                        "time_ms": ts_ms,
                        "landmark_index": li,
                        "landmark_name": landmark_index_to_name.get(li, str(li)),
                        "x": lm.x, "y": lm.y, "z": lm.z,
                        "visibility": getattr(lm, "visibility", np.nan),
                    })

                # 3D world landmarks (meters), if available
                if len(result.pose_world_landmarks) > pose_id:
                    wlands = result.pose_world_landmarks[pose_id]
                    for li, lm in enumerate(wlands):
                        rows3d.append({
                            "video": video_path.name,
                            "frame": frame_idx,
                            "time_ms": ts_ms,
                            "landmark_index": li,
                            "landmark_name": landmark_index_to_name.get(li, str(li)),
                            "x_m": lm.x, "y_m": lm.y, "z_m": lm.z,
                            "visibility": getattr(lm, "visibility", np.nan),
                        })

                # Optional overlay
                if writer is not None and len(nlands) > 0:
                    bgr_draw = bgr.copy()
                    _draw_skeleton(bgr_draw, nlands, visibility_thresh=0.5)
                    writer.write(bgr_draw)

            frame_idx += 1

    cap.release()
    if writer is not None:
        writer.release()

    # --- Save RAW CSVs ---
    import pandas as pd
    pd.DataFrame(rows2d).to_csv(csv2d, index=False)
    if rows3d:
        pd.DataFrame(rows3d).to_csv(csv3d, index=False)
        csv3d_str = str(csv3d)
    else:
        csv3d_str = None

    # Return RAW paths; the next block may also produce a *_filtered.csv
    return {
        "csv2d": str(csv2d),
        "csv3d": csv3d_str,
        "annotated_mp4": (str(mp4_out) if make_annotated_video else None),
    }

# --- Quick peek helper (unchanged) ---
def peek_csv(path, n=5):
    import pandas as pd
    df = pd.read_csv(path)
    print(f"{path} → shape={df.shape}")
    display(df.head(n))
    return df

Input the path of the video or the folder containing videos

# =========================================
# 7. Paste your input path (file OR folder)
# =========================================
# Examples:
# input_path = "/path/to/video.mp4"
# input_path = "/path/to/folder_with_videos"

input_path = "/Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/Kevin_2022_Day5_CRNCH.mp4"  # ← paste here (keep quotes)

Now, run the coe blcok below.

Pleae note that if you set the enable_filtering (the jitter filter) to True, it will take loger time.

# =========================================================
# 8. Run extraction and save outputs (CSV + annotated MP4s)
#    - If input_path is a single video file: outputs → <video_dir>/<outputs_subdir_name>/
#    - If input_path is a folder: outputs (shared) → <parent_of_folder>/<outputs_subdir_name>/
#    - Writes a manifest CSV when processing a folder
# =========================================================
from pathlib import Path
import pandas as pd

if not input_path or not str(input_path).strip():
    raise ValueError("Please set `input_path` in the previous cell.")

p = Path(input_path).expanduser().resolve()

# Ensure resolver exists (it was defined in §5.a)
try:
    resolve_outputs_dir
except NameError:
    # Minimal fallback (same logic as earlier)
    VIDEO_EXTS = {".mp4", ".mov", ".m4v", ".avi", ".mkv"}
    def resolve_outputs_dir(input_path, outputs_subdir_name="outputs"):
        ip = Path(input_path)
        if ip.is_file() or ip.suffix.lower() in VIDEO_EXTS:
            return ip.parent / outputs_subdir_name
        else:
            return ip.parent / outputs_subdir_name

VIDEO_EXTS = VIDEO_EXTS if "VIDEO_EXTS" in globals() else {".mp4", ".mov", ".m4v", ".avi", ".mkv"}

def _is_video_file(path: Path) -> bool:
    return path.is_file() and path.suffix.lower() in VIDEO_EXTS

if _is_video_file(p):
    # -------- Single video mode --------
    base_out = resolve_outputs_dir(p, outputs_subdir_name)
    base_out.mkdir(parents=True, exist_ok=True)
    print(f"Single video detected.\nOutputs will be saved to: {base_out}")

    outs = extract_pose_from_video(
        video_path=str(p),
        model_path=MODEL_PATH,
        make_annotated_video=make_annotated_video,
        frame_stride=frame_stride,
        num_poses=num_poses,
        min_pose_detection_confidence=min_pose_detection_confidence,
        min_pose_presence_confidence=min_pose_presence_confidence,
        min_tracking_confidence=min_tracking_confidence,
        base_outputs_dir=base_out,  # important
    )
    print("\n✔ Done.")
    print("2D CSV :", outs.get("csv2d"))
    print("3D CSV :", outs.get("csv3d"))
    print("MP4    :", outs.get("annotated_mp4"))

else:
    # -------- Folder mode --------
    if not p.exists() or not p.is_dir():
        raise NotADirectoryError(f"Not a directory: {p}")

    # Shared outputs placed alongside the folder
    base_out = resolve_outputs_dir(p, outputs_subdir_name)
    base_out.mkdir(parents=True, exist_ok=True)
    print(f"Folder detected. Outputs will be saved to: {base_out}")

    # Find videos (non-recursive by default; flip to rglob for recursive)
    videos = sorted([str(f) for f in p.iterdir() if _is_video_file(f)])
    if not videos:
        # Try recursive as a helpful fallback
        videos = sorted([str(f) for f in p.rglob("*") if _is_video_file(f)])
        if videos:
            print(f"Found {len(videos)} video(s) (recursive search).")
        else:
            raise FileNotFoundError(f"No supported video files found in: {p}")

    records = []
    for i, vp in enumerate(videos, 1):
        print(f"[{i}/{len(videos)}] {vp}")
        try:
            outs = extract_pose_from_video(
                video_path=vp,
                model_path=MODEL_PATH,
                make_annotated_video=make_annotated_video,
                frame_stride=frame_stride,
                num_poses=num_poses,
                min_pose_detection_confidence=min_pose_detection_confidence,
                min_pose_presence_confidence=min_pose_presence_confidence,
                min_tracking_confidence=min_tracking_confidence,
                base_outputs_dir=base_out,  # important
            )
            records.append({"video": vp, **outs, "status": "ok", "error": ""})
        except Exception as e:
            records.append({"video": vp, "csv2d": None, "csv3d": None,
                            "annotated_mp4": None, "status": "error", "error": str(e)})

    manifest = base_out / "outputs_manifest.csv"
    pd.DataFrame.from_records(records).to_csv(manifest, index=False)
    print(f"\n✔ Batch complete. Manifest saved to: {manifest}")
Single video detected.
Outputs will be saved to: /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs
[extract] enable_filtering = False (filtering runs after extraction)
I0000 00:00:1761594301.579539 53872720 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M2 Max
W0000 00:00:1761594301.656664 54157257 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1761594301.732664 54157256 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.

✔ Done.
2D CSV : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_pose2d.csv
3D CSV : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_pose3d.csv
MP4    : /Users/souvikmandal/Documents/06_Teaching_Mentoring/LS100_comp_etho/2025/media/video/outputs/Kevin_2022_Day5_CRNCH_annotated.mp4

Notes

  • 2D normalized coordinates: x,y∈[0,1] relative to image width/height (values can be outside the range if the estimated point is out-of-frame). z is depthlike (negative is closer).

  • 3D world coordinates: x,y,z are in meters in a world coordinate space centered near the hips.

  • visibility: confidence for each landmark’s presence in the frame.

7. Optional: compute simple joint angles

We will explore this in he next guide

Once you have landmarks, you can compute feature engineering targets like elbow or knee angles. Below is a tiny utility to compute an angle between three named landmarks per frame.


def _angle_between(a, b, c):
    # a,b,c are 2D points (x,y) or 3D (x,y,z) — here we'll use 2D image coords
    a, b, c = np.array(a), np.array(b), np.array(c)
    ba = a - b
    bc = c - b
    cosang = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-9)
    cosang = np.clip(cosang, -1.0, 1.0)
    return np.degrees(np.arccos(cosang))

def compute_joint_angle_csv(csv2d_path: str, joint=("left_shoulder","left_elbow","left_wrist")) -> pd.DataFrame:
    df = pd.read_csv(csv2d_path)
    # wide pivot: columns like x_left_shoulder, y_left_shoulder, etc.
    wide = df.pivot_table(index=["video","frame","time_ms"], columns="landmark_name", values=["x","y"])
    # helper to get a point
    def P(name):
        return np.c_[wide["x"][name].values, wide["y"][name].values]
    A,B,C = P(joint[0]), P(joint[1]), P(joint[2])
    angles = np.array([_angle_between(a,b,c) for a,b,c in zip(A,B,C)])
    out = pd.DataFrame({
        "video": wide.index.get_level_values("video"),
        "frame": wide.index.get_level_values("frame"),
        "time_ms": wide.index.get_level_values("time_ms"),
        f"angle_{'_'.join(joint)}": angles
    })
    return out

# Example (after extraction):
# angle_df = compute_joint_angle_csv("outputs/yourvideo_pose2d.csv", ("left_shoulder","left_elbow","left_wrist"))
# angle_df.head()

8. Notes & best practices

  • Timestamps matter: in VIDEO mode you must pass timestamp_ms that increases with frames; we compute it from frame index and FPS.

  • Tracking saves compute: in VIDEO/LIVE_STREAM the task performs pose tracking so the full model isn’t re-run every frame (helps latency).

  • Out-of-frame landmarks: 2D normalized x,y can be outside [0,1] if a joint is off‑screen; use visibility to filter.

  • Model choice: start with full, switch to lite for underpowered laptops or large batches, use heavy when you need the highest accuracy and can afford the speed.

  • Stride: a cheap speedup is frame_stride=2 (½ the frames) or higher.

  • Ethics & consent: if students process videos of people, teach consent, privacy, and secure storage.


Troubleshooting

  • If you see NoneType for results, ensure the model path exists and your video actually contains a person.

  • If you get slowdowns or memory pressure, try frame_stride=2 or the "lite" model.

  • On some platforms OpenCV MP4 writing may need codecs; if a saved video is empty, try a different fourcc (e.g., cv2.VideoWriter_fourcc(*"avc1")) or install opencv-python-headless alternatives.

Happy exploring!