Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

This notebook takes 2D pose landmarks and computes biomechanical joint angles per frame. It also includes confidence labels, plotting utilities, and optional batch processing.

Goal

Compute biomechanical joint angles from pose landmark data.

You define each angle with three landmarks (A, B, C) where the angle is measured at B (angle ABC). This notebook supports both:

  • long format pose tables (landmark_name, x, y, optional visibility)

  • wide format pose tables (<landmark>_x, <landmark>_y, optional <landmark>_visibility)

1) Imports and Environment Check

# Optional package install (uncomment only if needed).
# %pip install plotly seaborn
# Imports and environment check
import sys
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly
import plotly.express as px

try:
    import seaborn as sns  # noqa: F401
    HAS_SEABORN = True
except Exception:
    HAS_SEABORN = False

print("Python:", sys.version.split()[0])
print("Pandas:", pd.__version__)
print("NumPy:", np.__version__)
print("Plotly:", plotly.__version__)
print("Seaborn available:", HAS_SEABORN)
Python: 3.12.12
Pandas: 2.3.3
NumPy: 2.3.5
Plotly: 6.6.0
Seaborn available: True

2) Load Pose CSV

# Load pose CSV (wide or long format).
pose2d_csv_path = "/Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/LS100_FInalData_1_final_kinematics_healed_id_1.csv"

df = pd.read_csv(pose2d_csv_path)
print("Loaded:", pose2d_csv_path)
print("Shape:", df.shape)
print("Columns (first 25):", list(df.columns)[:25])

if {"landmark_name", "x", "y"}.issubset(df.columns):
    print("Detected long-format pose data.")
else:
    x_cols = [c for c in df.columns if c.endswith("_x")]
    y_cols = [c for c in df.columns if c.endswith("_y")]
    print("Detected wide-format pose data.")
    print("Wide columns:", len(x_cols), "x-columns and", len(y_cols), "y-columns")

df.head(3)
Loaded: /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/LS100_FInalData_1_final_kinematics_healed_id_1.csv
Shape: (433, 40)
Columns (first 25): ['frame', 'raw_id', 'nose_x', 'nose_y', 'left_eye_x', 'left_eye_y', 'right_eye_x', 'right_eye_y', 'left_ear_x', 'left_ear_y', 'right_ear_x', 'right_ear_y', 'left_shoulder_x', 'left_shoulder_y', 'right_shoulder_x', 'right_shoulder_y', 'left_elbow_x', 'left_elbow_y', 'right_elbow_x', 'right_elbow_y', 'left_wrist_x', 'left_wrist_y', 'right_wrist_x', 'right_wrist_y', 'left_hip_x']
Detected wide-format pose data.
Wide columns: 18 x-columns and 18 y-columns
Loading...

3) Define Custom Angles

Provide angle definitions as landmark triplets A-B-C where the angle is measured at B.

Example:

angle_defs_h1 = [
    {"name": "left_elbow", "A": "left_shoulder", "B": "left_elbow", "C": "left_wrist"},
]

angle_defs_h2 = [
    {"name": "right_knee", "A": "right_hip", "B": "right_knee", "C": "right_ankle"},
]
# Define angle definitions for each human.
angle_defs_h1 = [
    {"name": "left_elbow", "A": "left_shoulder", "B": "left_elbow", "C": "left_wrist"},
    {"name": "left_hip", "A": "left_shoulder", "B": "left_hip", "C": "left_knee"},
]

angle_defs_h2 = [
    {"name": "right_elbow", "A": "right_shoulder", "B": "right_elbow", "C": "right_wrist"},
    {"name": "right_hip", "A": "right_shoulder", "B": "right_hip", "C": "right_knee"},
]

# Mapping used later for per-human computation.
angle_defs_by_human = {
    "human_1": angle_defs_h1,
    "human_2": angle_defs_h2,
}

Computation Angles

Angle Computation Function

  • The following function calculates the angles you defined.

  • Please change the visibility_thresh according to your desire. This will determine the ‘confidence’ in each angle in each frame.

“good” → all three landmarks ≥ threshold
“low” → exactly one landmark < threshold
“least” → two or three landmarks < threshold

# Confidence threshold for angle quality labels.
visibility_thresh = 0.5

def normalize_pose_dataframe(df_in: pd.DataFrame, source_name: str = "video_01") -> pd.DataFrame:
    """Return pose data in long format with at least:
    video, frame, time_ms, landmark_name, x, y, visibility (optional), final_id (optional).
    """
    df_in = df_in.copy()

    # Already long format.
    if {"landmark_name", "x", "y"}.issubset(df_in.columns):
        df_long = df_in
    else:
        x_cols = [c for c in df_in.columns if c.endswith("_x")]
        landmarks = [c[:-2] for c in x_cols if f"{c[:-2]}_y" in df_in.columns]
        if not landmarks:
            raise ValueError(
                "Could not detect pose landmarks. Need either long columns (landmark_name/x/y) "
                "or wide columns like <landmark>_x and <landmark>_y."
            )

        measurement_cols = set()
        for lm in landmarks:
            measurement_cols.add(f"{lm}_x")
            measurement_cols.add(f"{lm}_y")
            if f"{lm}_visibility" in df_in.columns:
                measurement_cols.add(f"{lm}_visibility")

        id_cols = [c for c in df_in.columns if c not in measurement_cols]

        parts = []
        for lm in landmarks:
            use_cols = id_cols + [f"{lm}_x", f"{lm}_y"]
            has_vis = f"{lm}_visibility" in df_in.columns
            if has_vis:
                use_cols.append(f"{lm}_visibility")

            part = df_in[use_cols].copy()
            rename_map = {f"{lm}_x": "x", f"{lm}_y": "y"}
            if has_vis:
                rename_map[f"{lm}_visibility"] = "visibility"
            part = part.rename(columns=rename_map)
            part["landmark_name"] = lm
            parts.append(part)

        df_long = pd.concat(parts, ignore_index=True)

    if "video" not in df_long.columns:
        df_long["video"] = source_name

    if "frame" not in df_long.columns:
        df_long["frame"] = np.arange(len(df_long), dtype=int)

    if "time_ms" not in df_long.columns:
        # Default assumption: 30 fps if time is absent.
        df_long["time_ms"] = (pd.to_numeric(df_long["frame"], errors="coerce").fillna(0) * (1000.0 / 30.0))

    if "visibility" not in df_long.columns:
        df_long["visibility"] = 1.0

    df_long["frame"] = pd.to_numeric(df_long["frame"], errors="coerce").fillna(0).astype(int)
    df_long["time_ms"] = pd.to_numeric(df_long["time_ms"], errors="coerce").fillna(0.0)
    df_long["x"] = pd.to_numeric(df_long["x"], errors="coerce")
    df_long["y"] = pd.to_numeric(df_long["y"], errors="coerce")
    df_long["visibility"] = pd.to_numeric(df_long["visibility"], errors="coerce").fillna(0.0)

    keep = [c for c in ["video", "final_id", "frame", "time_ms", "landmark_name", "x", "y", "visibility"] if c in df_long.columns]
    return df_long[keep].copy()

def _wide_xyv(df2d: pd.DataFrame) -> pd.DataFrame:
    """
    Pivot long pose table to wide table with x_*, y_*, v_* columns.
    """
    index_cols = [c for c in ["video", "final_id", "frame", "time_ms"] if c in df2d.columns]
    if "frame" not in index_cols:
        raise ValueError("compute_angles needs a 'frame' column after normalization.")

    w_x = df2d.pivot_table(index=index_cols, columns="landmark_name", values="x", aggfunc="mean")
    w_y = df2d.pivot_table(index=index_cols, columns="landmark_name", values="y", aggfunc="mean")
    if "visibility" in df2d.columns:
        w_v = df2d.pivot_table(index=index_cols, columns="landmark_name", values="visibility", aggfunc="mean").fillna(0.0)
    else:
        w_v = pd.DataFrame(1.0, index=w_x.index, columns=w_x.columns)

    w_x.columns = [f"x_{c}" for c in w_x.columns]
    w_y.columns = [f"y_{c}" for c in w_y.columns]
    w_v.columns = [f"v_{c}" for c in w_v.columns]

    return pd.concat([w_x, w_y, w_v], axis=1).sort_index()

def _angle_at_B(A_pts: np.ndarray, B_pts: np.ndarray, C_pts: np.ndarray) -> np.ndarray:
    """Compute angle at B for A-B-C point triplets in degrees."""
    BA = A_pts - B_pts
    BC = C_pts - B_pts
    denom = (np.linalg.norm(BA, axis=1) * np.linalg.norm(BC, axis=1)) + 1e-9
    cosang = (BA * BC).sum(1) / denom
    cosang = np.clip(cosang, -1.0, 1.0)
    return np.degrees(np.arccos(cosang))

def compute_angles(df2d: pd.DataFrame, defs: list, visibility_thresh: float = 0.5) -> pd.DataFrame:
    """Return index columns + angle_<name> and confidence_<name> columns."""
    w = _wide_xyv(df2d)
    out = pd.DataFrame(index=w.index)

    for d in defs:
        A, B, C = d["A"], d["B"], d["C"]
        name = d["name"]

        cols_needed = [
            f"x_{A}", f"y_{A}", f"x_{B}", f"y_{B}", f"x_{C}", f"y_{C}",
            f"v_{A}", f"v_{B}", f"v_{C}",
        ]
        missing = [c for c in cols_needed if c not in w.columns]
        if missing:
            print(f"Warning: missing columns for angle '{name}': {missing}. Skipping.")
            continue

        A_pts = np.c_[w[f"x_{A}"].values, w[f"y_{A}"].values]
        B_pts = np.c_[w[f"x_{B}"].values, w[f"y_{B}"].values]
        C_pts = np.c_[w[f"x_{C}"].values, w[f"y_{C}"].values]
        ang = _angle_at_B(A_pts, B_pts, C_pts)

        vA = w[f"v_{A}"].fillna(0.0).values
        vB = w[f"v_{B}"].fillna(0.0).values
        vC = w[f"v_{C}"].fillna(0.0).values
        below = (vA < visibility_thresh).astype(int) + (vB < visibility_thresh).astype(int) + (vC < visibility_thresh).astype(int)
        conf = np.where(below == 0, "good", np.where(below == 1, "low", "least"))

        out[f"angle_{name}"] = ang
        out[f"confidence_{name}"] = conf

    return out.reset_index()

4) Normalize Data and Compute Angles

# Normalize once so downstream code always receives long-format pose rows.
source_name = Path(pose2d_csv_path).stem
df = normalize_pose_dataframe(df, source_name=source_name)

print("Normalized shape:", df.shape)
print("Normalized columns:", list(df.columns))
print("Detected humans:", sorted(df["final_id"].dropna().unique()) if "final_id" in df.columns else ["unknown"])

angles_parts = []

if "final_id" in df.columns and df["final_id"].notna().any():
    for fid, g in df.groupby("final_id", dropna=True):
        defs = angle_defs_by_human.get(fid, angle_defs_h1)
        out = compute_angles(g, defs, visibility_thresh=visibility_thresh)
        out["final_id"] = fid
        angles_parts.append(out)
else:
    out = compute_angles(df, angle_defs_h1, visibility_thresh=visibility_thresh)
    out["final_id"] = "unknown"
    angles_parts.append(out)

angles_df = pd.concat(angles_parts, ignore_index=True).sort_values(["final_id", "frame"]).reset_index(drop=True)

print("Angles shape:", angles_df.shape)
angles_df.head()
Normalized shape: (7794, 8)
Normalized columns: ['video', 'final_id', 'frame', 'time_ms', 'landmark_name', 'x', 'y', 'visibility']
Detected humans: ['human_1']
Angles shape: (433, 8)
Loading...

5) Save and Inspect Output

pose2d_csv = Path(pose2d_csv_path)
out_dir = pose2d_csv.parent
angles_out = out_dir / f"{pose2d_csv.stem}_angles.csv"

angles_df.to_csv(angles_out, index=False)
print("Saved angles to:", angles_out)

preview = pd.read_csv(angles_out)
print("Saved shape:", preview.shape)
preview.head()
Saved angles to: /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/LS100_FInalData_1_final_kinematics_healed_id_1_angles.csv
Saved shape: (433, 8)
Loading...

6) Plot Computed Angles Over Time

This section visualizes computed angles by frame (or time when available). You can choose one backend: Matplotlib, Seaborn, or Plotly.

Confidence overlays:

  • Yellow: low confidence (one landmark below threshold)

  • Orange: least confidence (two or more landmarks below threshold)

Tip: if multiple humans are present, the plotting cell draws a separate chart per human and per angle.

# Plot all angle columns with confidence overlays.
# Choose backend: "matplotlib", "seaborn", or "plotly".
plot_backend = "plotly"
selected_video = None   # set to a specific video string, or None
selected_human = None   # set to a specific final_id string, or None

if "angles_df" not in globals():
    raise RuntimeError("angles_df not found. Run the angle computation cells first.")

angle_cols = [c for c in angles_df.columns if c.startswith("angle_")]
if not angle_cols:
    raise ValueError("No angle columns found (expected columns starting with 'angle_').")

x_col = "time_ms" if "time_ms" in angles_df.columns else "frame"
x_label = "Time (ms)" if x_col == "time_ms" else "Frame"

def _spans_from_labels(frame_series, label_series, label_value):
    """Return list of contiguous (x0, x1) spans where label == label_value."""
    frames = frame_series.to_numpy()
    labels = (label_series.to_numpy() == label_value)
    spans = []
    if frames.size == 0:
        return spans

    start = None
    last_frame = None
    for f, ok in zip(frames, labels):
        if ok and start is None:
            start = f
        if ok:
            last_frame = f
        if (not ok) and (start is not None):
            spans.append((start, last_frame))
            start = None
    if start is not None:
        spans.append((start, last_frame))
    return spans

df_plot = angles_df.copy()
if selected_video is not None and "video" in df_plot.columns:
    df_plot = df_plot[df_plot["video"] == selected_video]
if selected_human is not None and "final_id" in df_plot.columns:
    df_plot = df_plot[df_plot["final_id"] == selected_human]
if df_plot.empty:
    raise ValueError("No rows left after filtering. Check selected_video/selected_human.")

backend = plot_backend.strip().lower()

if "final_id" in df_plot.columns:
    groups = list(df_plot.groupby("final_id", dropna=False))
else:
    groups = [("all", df_plot)]

if backend == "matplotlib":
    import matplotlib.pyplot as plt

    for human_id, g in groups:
        g = g.sort_values(x_col)
        for col in angle_cols:
            conf_col = "confidence_" + col.replace("angle_", "", 1)
            plt.figure(figsize=(10, 4))
            plt.plot(g[x_col], g[col])
            plt.xlabel(x_label)
            plt.ylabel(f"{col} (deg)")
            plt.title(f"{col} over time | {human_id} | matplotlib")
            ax = plt.gca()

            if conf_col in g.columns:
                for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "low"):
                    ax.axvspan(x0, x1, color="yellow", alpha=0.25, linewidth=0)
                for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "least"):
                    ax.axvspan(x0, x1, color="orange", alpha=0.25, linewidth=0)

            plt.tight_layout()
            plt.show()

elif backend == "seaborn":
    try:
        import seaborn as sns
    except Exception as e:
        raise ImportError("Seaborn is not installed. Install seaborn or choose another backend.") from e
    import matplotlib.pyplot as plt

    for human_id, g in groups:
        g = g.sort_values(x_col)
        for col in angle_cols:
            conf_col = "confidence_" + col.replace("angle_", "", 1)
            plt.figure(figsize=(10, 4))
            sns.lineplot(data=g, x=x_col, y=col)
            plt.xlabel(x_label)
            plt.ylabel(f"{col} (deg)")
            plt.title(f"{col} over time | {human_id} | seaborn")
            ax = plt.gca()

            if conf_col in g.columns:
                for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "low"):
                    ax.axvspan(x0, x1, color="yellow", alpha=0.25, linewidth=0)
                for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "least"):
                    ax.axvspan(x0, x1, color="orange", alpha=0.25, linewidth=0)

            plt.tight_layout()
            plt.show()

elif backend == "plotly":
    try:
        import plotly.express as px
    except Exception as e:
        raise ImportError("Plotly is not installed. Install plotly or choose another backend.") from e

    for human_id, g in groups:
        g = g.sort_values(x_col)
        for col in angle_cols:
            conf_col = "confidence_" + col.replace("angle_", "", 1)
            fig = px.line(
                g,
                x=x_col,
                y=col,
                title=f"{col} over time | {human_id} | plotly",
                labels={x_col: x_label, col: f"{col} (deg)"},
            )

            if conf_col in g.columns:
                for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "low"):
                    fig.add_vrect(x0=x0, x1=x1, fillcolor="yellow", opacity=0.25, line_width=0)
                for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "least"):
                    fig.add_vrect(x0=x0, x1=x1, fillcolor="orange", opacity=0.25, line_width=0)

            fig.update_layout(
                xaxis_rangeslider_visible=True,
                hovermode="x unified",
                margin=dict(l=40, r=20, t=60, b=40),
            )
            fig.show()

else:
    raise ValueError("Unknown plot_backend. Use 'matplotlib', 'seaborn', or 'plotly'.")
Loading...
Loading...

7) Batch Angle Computation for a Folder of Pose CSV Files

This section processes all matching CSV files in a directory. For each file it:

  • normalizes schema to long format

  • computes angles per human

  • saves one per-file output

  • appends a row to a batch manifest with status and errors

# Batch run on a directory of pose CSV files.
# For each input file: normalize -> compute per human -> save -> log status.
from pathlib import Path
import time
import traceback

pose2d_dir = r"/Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv"  # change to your directory path
pattern = "*.csv"   # change if needed, e.g. "*_pose2d_filtered.csv"

if not pose2d_dir or not str(pose2d_dir).strip():
    raise ValueError("Please set pose2d_dir to a valid directory path.")
pose2d_dir = Path(pose2d_dir).expanduser().resolve()
if not pose2d_dir.exists() or not pose2d_dir.is_dir():
    raise NotADirectoryError(f"Not a directory: {pose2d_dir}")

required_globals = [
    "compute_angles",
    "normalize_pose_dataframe",
    "visibility_thresh",
    "angle_defs_h1",
    "angle_defs_h2",
    "angle_defs_by_human",
]
missing = [g for g in required_globals if g not in globals()]
if missing:
    raise RuntimeError(
        f"Missing prior definitions: {missing}. Run earlier cells first."
    )

# --- Discover files
files = sorted(pose2d_dir.glob(pattern))
if not files:
    raise FileNotFoundError(f"No files matching pattern '{pattern}' in {pose2d_dir}")

print(f"Found {len(files)} file(s) in {pose2d_dir} matching '{pattern}'.")

# --- Process each file
records = []
t_batch0 = time.time()

for i, fpath in enumerate(files, 1):
    row = {
        "input_csv": str(fpath),
        "output_csv": None,
        "status": "ok",
        "error": "",
        "n_rows_in": None,
        "n_rows_out": None,
        "elapsed_s": None,
    }
    print(f"[{i}/{len(files)}] {fpath.name}")

    try:
        t0 = time.time()
        df_in = pd.read_csv(fpath)
        row["n_rows_in"] = len(df_in)

        df_long = normalize_pose_dataframe(df_in, source_name=fpath.stem)

        parts = []
        if "final_id" in df_long.columns and df_long["final_id"].notna().any():
            for fid, g in df_long.groupby("final_id", dropna=True):
                defs = angle_defs_by_human.get(fid, angle_defs_h1)
                out = compute_angles(g, defs, visibility_thresh=visibility_thresh)
                out["final_id"] = fid
                parts.append(out)
        else:
            out = compute_angles(df_long, angle_defs_h1, visibility_thresh=visibility_thresh)
            out["final_id"] = "unknown"
            parts.append(out)

        angles_df_batch = pd.concat(parts, ignore_index=True).sort_values(["final_id", "frame"]).reset_index(drop=True)

        out_csv = fpath.with_name(f"{fpath.stem}_angles.csv")
        angles_df_batch.to_csv(out_csv, index=False)

        row["output_csv"] = str(out_csv)
        row["n_rows_out"] = len(angles_df_batch)
        row["elapsed_s"] = round(time.time() - t0, 2)
        print(f"   -> Saved: {out_csv.name} ({row['n_rows_out']} rows, {row['elapsed_s']}s)")

    except Exception as e:
        row["status"] = "error"
        row["error"] = f"{e.__class__.__name__}: {e}"
        # Optional: keep a short traceback in logs for debugging
        tb = traceback.format_exc().splitlines()[-3:]
        print("   ! Error:", row["error"])
        print("   ! Traceback (last lines):", "; ".join(tb))

    records.append(row)

manifest = pose2d_dir / "batch_angles_manifest.csv"
pd.DataFrame.from_records(records).to_csv(manifest, index=False)
print(f"\nBatch complete. Manifest saved to: {manifest}")
print(f"Total elapsed: {time.time() - t_batch0:.1f}s")
Found 2 file(s) in /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv matching '*.csv'.
[1/2] LS100_FInalData_1_final_kinematics_healed_id_1.csv
   -> Saved: LS100_FInalData_1_final_kinematics_healed_id_1_angles.csv (433 rows, 0.03s)
[2/2] LS100_FInalData_1_final_kinematics_healed_id_2.csv
   -> Saved: LS100_FInalData_1_final_kinematics_healed_id_2_angles.csv (432 rows, 0.03s)

Batch complete. Manifest saved to: /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/batch_angles_manifest.csv
Total elapsed: 0.1s

Conclusion

Congratulations! You’ve now completed a full pose analysis workflow in Python:

  1. Extracted 2D body landmark data from videos using MediaPipe (Notebook 1).

  2. Computed joint angles and derived confidence metrics based on landmark visibility.

  3. Visualized these angles interactively and in batches across multiple recordings.

What You’ve Learned

  • How pose landmarks are represented as normalized 2D coordinates.

  • How to calculate geometric angles (∠ABC) for any combination of body joints.

  • How to assess data reliability using a visibility-based confidence system (good, low, least).

  • How to produce meaningful visualizations and batch-process datasets efficiently.

Next Steps

In the next notebook, we will:

  • Derive custom biomechanical indices (e.g., symmetry or smoothness metrics).

  • Explore statistical summaries and comparisons across individuals or sessions.

Remember: Confidence flags and visualization overlays are not just for aesthetics—
they teach you to interpret pose-estimation data critically, separating signal from noise.