This notebook takes 2D pose landmarks and computes biomechanical joint angles per frame. It also includes confidence labels, plotting utilities, and optional batch processing.
Goal¶
Compute biomechanical joint angles from pose landmark data.
You define each angle with three landmarks (A, B, C) where the angle is measured at B (angle ABC).
This notebook supports both:
long format pose tables (
landmark_name,x,y, optionalvisibility)wide format pose tables (
<landmark>_x,<landmark>_y, optional<landmark>_visibility)
1) Imports and Environment Check¶
# Optional package install (uncomment only if needed).
# %pip install plotly seaborn# Imports and environment check
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly
import plotly.express as px
try:
import seaborn as sns # noqa: F401
HAS_SEABORN = True
except Exception:
HAS_SEABORN = False
print("Python:", sys.version.split()[0])
print("Pandas:", pd.__version__)
print("NumPy:", np.__version__)
print("Plotly:", plotly.__version__)
print("Seaborn available:", HAS_SEABORN)Python: 3.12.12
Pandas: 2.3.3
NumPy: 2.3.5
Plotly: 6.6.0
Seaborn available: True
2) Load Pose CSV¶
# Load pose CSV (wide or long format).
pose2d_csv_path = "/Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/LS100_FInalData_1_final_kinematics_healed_id_1.csv"
df = pd.read_csv(pose2d_csv_path)
print("Loaded:", pose2d_csv_path)
print("Shape:", df.shape)
print("Columns (first 25):", list(df.columns)[:25])
if {"landmark_name", "x", "y"}.issubset(df.columns):
print("Detected long-format pose data.")
else:
x_cols = [c for c in df.columns if c.endswith("_x")]
y_cols = [c for c in df.columns if c.endswith("_y")]
print("Detected wide-format pose data.")
print("Wide columns:", len(x_cols), "x-columns and", len(y_cols), "y-columns")
df.head(3)Loaded: /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/LS100_FInalData_1_final_kinematics_healed_id_1.csv
Shape: (433, 40)
Columns (first 25): ['frame', 'raw_id', 'nose_x', 'nose_y', 'left_eye_x', 'left_eye_y', 'right_eye_x', 'right_eye_y', 'left_ear_x', 'left_ear_y', 'right_ear_x', 'right_ear_y', 'left_shoulder_x', 'left_shoulder_y', 'right_shoulder_x', 'right_shoulder_y', 'left_elbow_x', 'left_elbow_y', 'right_elbow_x', 'right_elbow_y', 'left_wrist_x', 'left_wrist_y', 'right_wrist_x', 'right_wrist_y', 'left_hip_x']
Detected wide-format pose data.
Wide columns: 18 x-columns and 18 y-columns
3) Define Custom Angles¶
Provide angle definitions as landmark triplets A-B-C where the angle is measured at B.
Example:
angle_defs_h1 = [
{"name": "left_elbow", "A": "left_shoulder", "B": "left_elbow", "C": "left_wrist"},
]
angle_defs_h2 = [
{"name": "right_knee", "A": "right_hip", "B": "right_knee", "C": "right_ankle"},
]# Define angle definitions for each human.
angle_defs_h1 = [
{"name": "left_elbow", "A": "left_shoulder", "B": "left_elbow", "C": "left_wrist"},
{"name": "left_hip", "A": "left_shoulder", "B": "left_hip", "C": "left_knee"},
]
angle_defs_h2 = [
{"name": "right_elbow", "A": "right_shoulder", "B": "right_elbow", "C": "right_wrist"},
{"name": "right_hip", "A": "right_shoulder", "B": "right_hip", "C": "right_knee"},
]
# Mapping used later for per-human computation.
angle_defs_by_human = {
"human_1": angle_defs_h1,
"human_2": angle_defs_h2,
}Computation Angles¶
Angle Computation Function¶
The following function calculates the angles you defined.
Please change the
visibility_threshaccording to your desire. This will determine the ‘confidence’ in each angle in each frame.
“good” → all three landmarks ≥ threshold
“low” → exactly one landmark < threshold
“least” → two or three landmarks < threshold
# Confidence threshold for angle quality labels.
visibility_thresh = 0.5
def normalize_pose_dataframe(df_in: pd.DataFrame, source_name: str = "video_01") -> pd.DataFrame:
"""Return pose data in long format with at least:
video, frame, time_ms, landmark_name, x, y, visibility (optional), final_id (optional).
"""
df_in = df_in.copy()
# Already long format.
if {"landmark_name", "x", "y"}.issubset(df_in.columns):
df_long = df_in
else:
x_cols = [c for c in df_in.columns if c.endswith("_x")]
landmarks = [c[:-2] for c in x_cols if f"{c[:-2]}_y" in df_in.columns]
if not landmarks:
raise ValueError(
"Could not detect pose landmarks. Need either long columns (landmark_name/x/y) "
"or wide columns like <landmark>_x and <landmark>_y."
)
measurement_cols = set()
for lm in landmarks:
measurement_cols.add(f"{lm}_x")
measurement_cols.add(f"{lm}_y")
if f"{lm}_visibility" in df_in.columns:
measurement_cols.add(f"{lm}_visibility")
id_cols = [c for c in df_in.columns if c not in measurement_cols]
parts = []
for lm in landmarks:
use_cols = id_cols + [f"{lm}_x", f"{lm}_y"]
has_vis = f"{lm}_visibility" in df_in.columns
if has_vis:
use_cols.append(f"{lm}_visibility")
part = df_in[use_cols].copy()
rename_map = {f"{lm}_x": "x", f"{lm}_y": "y"}
if has_vis:
rename_map[f"{lm}_visibility"] = "visibility"
part = part.rename(columns=rename_map)
part["landmark_name"] = lm
parts.append(part)
df_long = pd.concat(parts, ignore_index=True)
if "video" not in df_long.columns:
df_long["video"] = source_name
if "frame" not in df_long.columns:
df_long["frame"] = np.arange(len(df_long), dtype=int)
if "time_ms" not in df_long.columns:
# Default assumption: 30 fps if time is absent.
df_long["time_ms"] = (pd.to_numeric(df_long["frame"], errors="coerce").fillna(0) * (1000.0 / 30.0))
if "visibility" not in df_long.columns:
df_long["visibility"] = 1.0
df_long["frame"] = pd.to_numeric(df_long["frame"], errors="coerce").fillna(0).astype(int)
df_long["time_ms"] = pd.to_numeric(df_long["time_ms"], errors="coerce").fillna(0.0)
df_long["x"] = pd.to_numeric(df_long["x"], errors="coerce")
df_long["y"] = pd.to_numeric(df_long["y"], errors="coerce")
df_long["visibility"] = pd.to_numeric(df_long["visibility"], errors="coerce").fillna(0.0)
keep = [c for c in ["video", "final_id", "frame", "time_ms", "landmark_name", "x", "y", "visibility"] if c in df_long.columns]
return df_long[keep].copy()
def _wide_xyv(df2d: pd.DataFrame) -> pd.DataFrame:
"""
Pivot long pose table to wide table with x_*, y_*, v_* columns.
"""
index_cols = [c for c in ["video", "final_id", "frame", "time_ms"] if c in df2d.columns]
if "frame" not in index_cols:
raise ValueError("compute_angles needs a 'frame' column after normalization.")
w_x = df2d.pivot_table(index=index_cols, columns="landmark_name", values="x", aggfunc="mean")
w_y = df2d.pivot_table(index=index_cols, columns="landmark_name", values="y", aggfunc="mean")
if "visibility" in df2d.columns:
w_v = df2d.pivot_table(index=index_cols, columns="landmark_name", values="visibility", aggfunc="mean").fillna(0.0)
else:
w_v = pd.DataFrame(1.0, index=w_x.index, columns=w_x.columns)
w_x.columns = [f"x_{c}" for c in w_x.columns]
w_y.columns = [f"y_{c}" for c in w_y.columns]
w_v.columns = [f"v_{c}" for c in w_v.columns]
return pd.concat([w_x, w_y, w_v], axis=1).sort_index()
def _angle_at_B(A_pts: np.ndarray, B_pts: np.ndarray, C_pts: np.ndarray) -> np.ndarray:
"""Compute angle at B for A-B-C point triplets in degrees."""
BA = A_pts - B_pts
BC = C_pts - B_pts
denom = (np.linalg.norm(BA, axis=1) * np.linalg.norm(BC, axis=1)) + 1e-9
cosang = (BA * BC).sum(1) / denom
cosang = np.clip(cosang, -1.0, 1.0)
return np.degrees(np.arccos(cosang))
def compute_angles(df2d: pd.DataFrame, defs: list, visibility_thresh: float = 0.5) -> pd.DataFrame:
"""Return index columns + angle_<name> and confidence_<name> columns."""
w = _wide_xyv(df2d)
out = pd.DataFrame(index=w.index)
for d in defs:
A, B, C = d["A"], d["B"], d["C"]
name = d["name"]
cols_needed = [
f"x_{A}", f"y_{A}", f"x_{B}", f"y_{B}", f"x_{C}", f"y_{C}",
f"v_{A}", f"v_{B}", f"v_{C}",
]
missing = [c for c in cols_needed if c not in w.columns]
if missing:
print(f"Warning: missing columns for angle '{name}': {missing}. Skipping.")
continue
A_pts = np.c_[w[f"x_{A}"].values, w[f"y_{A}"].values]
B_pts = np.c_[w[f"x_{B}"].values, w[f"y_{B}"].values]
C_pts = np.c_[w[f"x_{C}"].values, w[f"y_{C}"].values]
ang = _angle_at_B(A_pts, B_pts, C_pts)
vA = w[f"v_{A}"].fillna(0.0).values
vB = w[f"v_{B}"].fillna(0.0).values
vC = w[f"v_{C}"].fillna(0.0).values
below = (vA < visibility_thresh).astype(int) + (vB < visibility_thresh).astype(int) + (vC < visibility_thresh).astype(int)
conf = np.where(below == 0, "good", np.where(below == 1, "low", "least"))
out[f"angle_{name}"] = ang
out[f"confidence_{name}"] = conf
return out.reset_index()4) Normalize Data and Compute Angles¶
# Normalize once so downstream code always receives long-format pose rows.
source_name = Path(pose2d_csv_path).stem
df = normalize_pose_dataframe(df, source_name=source_name)
print("Normalized shape:", df.shape)
print("Normalized columns:", list(df.columns))
print("Detected humans:", sorted(df["final_id"].dropna().unique()) if "final_id" in df.columns else ["unknown"])
angles_parts = []
if "final_id" in df.columns and df["final_id"].notna().any():
for fid, g in df.groupby("final_id", dropna=True):
defs = angle_defs_by_human.get(fid, angle_defs_h1)
out = compute_angles(g, defs, visibility_thresh=visibility_thresh)
out["final_id"] = fid
angles_parts.append(out)
else:
out = compute_angles(df, angle_defs_h1, visibility_thresh=visibility_thresh)
out["final_id"] = "unknown"
angles_parts.append(out)
angles_df = pd.concat(angles_parts, ignore_index=True).sort_values(["final_id", "frame"]).reset_index(drop=True)
print("Angles shape:", angles_df.shape)
angles_df.head()Normalized shape: (7794, 8)
Normalized columns: ['video', 'final_id', 'frame', 'time_ms', 'landmark_name', 'x', 'y', 'visibility']
Detected humans: ['human_1']
Angles shape: (433, 8)
5) Save and Inspect Output¶
pose2d_csv = Path(pose2d_csv_path)
out_dir = pose2d_csv.parent
angles_out = out_dir / f"{pose2d_csv.stem}_angles.csv"
angles_df.to_csv(angles_out, index=False)
print("Saved angles to:", angles_out)
preview = pd.read_csv(angles_out)
print("Saved shape:", preview.shape)
preview.head()Saved angles to: /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/LS100_FInalData_1_final_kinematics_healed_id_1_angles.csv
Saved shape: (433, 8)
6) Plot Computed Angles Over Time¶
This section visualizes computed angles by frame (or time when available). You can choose one backend: Matplotlib, Seaborn, or Plotly.
Confidence overlays:
Yellow: low confidence (one landmark below threshold)
Orange: least confidence (two or more landmarks below threshold)
Tip: if multiple humans are present, the plotting cell draws a separate chart per human and per angle.
# Plot all angle columns with confidence overlays.
# Choose backend: "matplotlib", "seaborn", or "plotly".
plot_backend = "plotly"
selected_video = None # set to a specific video string, or None
selected_human = None # set to a specific final_id string, or None
if "angles_df" not in globals():
raise RuntimeError("angles_df not found. Run the angle computation cells first.")
angle_cols = [c for c in angles_df.columns if c.startswith("angle_")]
if not angle_cols:
raise ValueError("No angle columns found (expected columns starting with 'angle_').")
x_col = "time_ms" if "time_ms" in angles_df.columns else "frame"
x_label = "Time (ms)" if x_col == "time_ms" else "Frame"
def _spans_from_labels(frame_series, label_series, label_value):
"""Return list of contiguous (x0, x1) spans where label == label_value."""
frames = frame_series.to_numpy()
labels = (label_series.to_numpy() == label_value)
spans = []
if frames.size == 0:
return spans
start = None
last_frame = None
for f, ok in zip(frames, labels):
if ok and start is None:
start = f
if ok:
last_frame = f
if (not ok) and (start is not None):
spans.append((start, last_frame))
start = None
if start is not None:
spans.append((start, last_frame))
return spans
df_plot = angles_df.copy()
if selected_video is not None and "video" in df_plot.columns:
df_plot = df_plot[df_plot["video"] == selected_video]
if selected_human is not None and "final_id" in df_plot.columns:
df_plot = df_plot[df_plot["final_id"] == selected_human]
if df_plot.empty:
raise ValueError("No rows left after filtering. Check selected_video/selected_human.")
backend = plot_backend.strip().lower()
if "final_id" in df_plot.columns:
groups = list(df_plot.groupby("final_id", dropna=False))
else:
groups = [("all", df_plot)]
if backend == "matplotlib":
import matplotlib.pyplot as plt
for human_id, g in groups:
g = g.sort_values(x_col)
for col in angle_cols:
conf_col = "confidence_" + col.replace("angle_", "", 1)
plt.figure(figsize=(10, 4))
plt.plot(g[x_col], g[col])
plt.xlabel(x_label)
plt.ylabel(f"{col} (deg)")
plt.title(f"{col} over time | {human_id} | matplotlib")
ax = plt.gca()
if conf_col in g.columns:
for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "low"):
ax.axvspan(x0, x1, color="yellow", alpha=0.25, linewidth=0)
for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "least"):
ax.axvspan(x0, x1, color="orange", alpha=0.25, linewidth=0)
plt.tight_layout()
plt.show()
elif backend == "seaborn":
try:
import seaborn as sns
except Exception as e:
raise ImportError("Seaborn is not installed. Install seaborn or choose another backend.") from e
import matplotlib.pyplot as plt
for human_id, g in groups:
g = g.sort_values(x_col)
for col in angle_cols:
conf_col = "confidence_" + col.replace("angle_", "", 1)
plt.figure(figsize=(10, 4))
sns.lineplot(data=g, x=x_col, y=col)
plt.xlabel(x_label)
plt.ylabel(f"{col} (deg)")
plt.title(f"{col} over time | {human_id} | seaborn")
ax = plt.gca()
if conf_col in g.columns:
for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "low"):
ax.axvspan(x0, x1, color="yellow", alpha=0.25, linewidth=0)
for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "least"):
ax.axvspan(x0, x1, color="orange", alpha=0.25, linewidth=0)
plt.tight_layout()
plt.show()
elif backend == "plotly":
try:
import plotly.express as px
except Exception as e:
raise ImportError("Plotly is not installed. Install plotly or choose another backend.") from e
for human_id, g in groups:
g = g.sort_values(x_col)
for col in angle_cols:
conf_col = "confidence_" + col.replace("angle_", "", 1)
fig = px.line(
g,
x=x_col,
y=col,
title=f"{col} over time | {human_id} | plotly",
labels={x_col: x_label, col: f"{col} (deg)"},
)
if conf_col in g.columns:
for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "low"):
fig.add_vrect(x0=x0, x1=x1, fillcolor="yellow", opacity=0.25, line_width=0)
for x0, x1 in _spans_from_labels(g[x_col], g[conf_col], "least"):
fig.add_vrect(x0=x0, x1=x1, fillcolor="orange", opacity=0.25, line_width=0)
fig.update_layout(
xaxis_rangeslider_visible=True,
hovermode="x unified",
margin=dict(l=40, r=20, t=60, b=40),
)
fig.show()
else:
raise ValueError("Unknown plot_backend. Use 'matplotlib', 'seaborn', or 'plotly'.")7) Batch Angle Computation for a Folder of Pose CSV Files¶
This section processes all matching CSV files in a directory. For each file it:
normalizes schema to long format
computes angles per human
saves one per-file output
appends a row to a batch manifest with status and errors
# Batch run on a directory of pose CSV files.
# For each input file: normalize -> compute per human -> save -> log status.
from pathlib import Path
import time
import traceback
pose2d_dir = r"/Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv" # change to your directory path
pattern = "*.csv" # change if needed, e.g. "*_pose2d_filtered.csv"
if not pose2d_dir or not str(pose2d_dir).strip():
raise ValueError("Please set pose2d_dir to a valid directory path.")
pose2d_dir = Path(pose2d_dir).expanduser().resolve()
if not pose2d_dir.exists() or not pose2d_dir.is_dir():
raise NotADirectoryError(f"Not a directory: {pose2d_dir}")
required_globals = [
"compute_angles",
"normalize_pose_dataframe",
"visibility_thresh",
"angle_defs_h1",
"angle_defs_h2",
"angle_defs_by_human",
]
missing = [g for g in required_globals if g not in globals()]
if missing:
raise RuntimeError(
f"Missing prior definitions: {missing}. Run earlier cells first."
)
# --- Discover files
files = sorted(pose2d_dir.glob(pattern))
if not files:
raise FileNotFoundError(f"No files matching pattern '{pattern}' in {pose2d_dir}")
print(f"Found {len(files)} file(s) in {pose2d_dir} matching '{pattern}'.")
# --- Process each file
records = []
t_batch0 = time.time()
for i, fpath in enumerate(files, 1):
row = {
"input_csv": str(fpath),
"output_csv": None,
"status": "ok",
"error": "",
"n_rows_in": None,
"n_rows_out": None,
"elapsed_s": None,
}
print(f"[{i}/{len(files)}] {fpath.name}")
try:
t0 = time.time()
df_in = pd.read_csv(fpath)
row["n_rows_in"] = len(df_in)
df_long = normalize_pose_dataframe(df_in, source_name=fpath.stem)
parts = []
if "final_id" in df_long.columns and df_long["final_id"].notna().any():
for fid, g in df_long.groupby("final_id", dropna=True):
defs = angle_defs_by_human.get(fid, angle_defs_h1)
out = compute_angles(g, defs, visibility_thresh=visibility_thresh)
out["final_id"] = fid
parts.append(out)
else:
out = compute_angles(df_long, angle_defs_h1, visibility_thresh=visibility_thresh)
out["final_id"] = "unknown"
parts.append(out)
angles_df_batch = pd.concat(parts, ignore_index=True).sort_values(["final_id", "frame"]).reset_index(drop=True)
out_csv = fpath.with_name(f"{fpath.stem}_angles.csv")
angles_df_batch.to_csv(out_csv, index=False)
row["output_csv"] = str(out_csv)
row["n_rows_out"] = len(angles_df_batch)
row["elapsed_s"] = round(time.time() - t0, 2)
print(f" -> Saved: {out_csv.name} ({row['n_rows_out']} rows, {row['elapsed_s']}s)")
except Exception as e:
row["status"] = "error"
row["error"] = f"{e.__class__.__name__}: {e}"
# Optional: keep a short traceback in logs for debugging
tb = traceback.format_exc().splitlines()[-3:]
print(" ! Error:", row["error"])
print(" ! Traceback (last lines):", "; ".join(tb))
records.append(row)
manifest = pose2d_dir / "batch_angles_manifest.csv"
pd.DataFrame.from_records(records).to_csv(manifest, index=False)
print(f"\nBatch complete. Manifest saved to: {manifest}")
print(f"Total elapsed: {time.time() - t_batch0:.1f}s")Found 2 file(s) in /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv matching '*.csv'.
[1/2] LS100_FInalData_1_final_kinematics_healed_id_1.csv
-> Saved: LS100_FInalData_1_final_kinematics_healed_id_1_angles.csv (433 rows, 0.03s)
[2/2] LS100_FInalData_1_final_kinematics_healed_id_2.csv
-> Saved: LS100_FInalData_1_final_kinematics_healed_id_2_angles.csv (432 rows, 0.03s)
Batch complete. Manifest saved to: /Users/souvikmandal/Documents/S06_Teaching_Mentoring_Talks/LS100/2026_Sem01/media/Henry/test_videos/final_2ndorder_data_csv/batch_angles_manifest.csv
Total elapsed: 0.1s
Conclusion¶
Congratulations! You’ve now completed a full pose analysis workflow in Python:
Extracted 2D body landmark data from videos using MediaPipe (Notebook 1).
Computed joint angles and derived confidence metrics based on landmark visibility.
Visualized these angles interactively and in batches across multiple recordings.
What You’ve Learned¶
How pose landmarks are represented as normalized 2D coordinates.
How to calculate geometric angles (∠ABC) for any combination of body joints.
How to assess data reliability using a visibility-based confidence system (
good,low,least).How to produce meaningful visualizations and batch-process datasets efficiently.
Next Steps¶
In the next notebook, we will:
Derive custom biomechanical indices (e.g., symmetry or smoothness metrics).
Explore statistical summaries and comparisons across individuals or sessions.
Remember: Confidence flags and visualization overlays are not just for aesthetics—
they teach you to interpret pose-estimation data critically, separating signal from noise.