Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

This notebook extracts a rich set of musical features from audio tracks and produces:

  • Timestamped annotations (beats/BPM, rhythm structure, chords, melody F0, loudness, instrument probabilities, etc.).

  • Per-track JSON metadata written to ./outputs/metadata/.

  • Embeddings & tabular features written to ./outputs/features/.

  • Clustering & 2D visualization of track similarity (UMAP + HDBSCAN, with a K-Means fallback).

Folder structure: place your audio files under ./audio/ (e.g., WAV/MP3).
Recommended formats: 44.1 kHz or 48 kHz, mono or stereo.


# --- Install dependencies (uncomment if needed) --------------------------------
# If you are running locally or in Colab, uncomment and run this cell once.
# It pins nothing strictly to keep compatibility broad for teaching.
#
# %pip install -q numpy pandas soundfile matplotlib librosa umap-learn hdbscan scikit-learn
# %pip install -q openl3 crepe musicnn torch torchaudio --upgrade
# Optional (better downbeat tracking):
# %pip install -q madmom
# Optional (source separation; heavy):
# %pip install -q spleeter demucs

import os, glob, json, math, itertools, warnings
from dataclasses import dataclass, asdict
from typing import List, Dict, Any

import numpy as np
import pandas as pd
import soundfile as sf

import librosa
import librosa.display

# ML / clustering
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

# Dimensionality reduction & clustering
import umap.umap_ as umap
try:
    from hdbscan import HDBSCAN
    _HAVE_HDBSCAN = True
except Exception as _:
    _HAVE_HDBSCAN = False

# Embeddings & models
import openl3

# Pitch tracking (melody)
import crepe

# Music tagging (instrument/mood/genre tags)
from musicnn.extractor import extractor as musicnn_extractor

import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")

# ---------------- User-configurable paths & parameters ----------------
AUDIO_DIR = "./audio"                # put your tracks here
OUTPUT_DIR = "./outputs"
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(f"{OUTPUT_DIR}/metadata", exist_ok=True)
os.makedirs(f"{OUTPUT_DIR}/features", exist_ok=True)
os.makedirs(f"{OUTPUT_DIR}/figures", exist_ok=True)

# Analysis parameters
TARGET_SR = 44100                     # analyzed sampling rate
HOP_LENGTH = 512                      # STFT hop length for many features
EMBED_HOP = 1.0                       # seconds per OpenL3 embedding frame
INSTRUMENT_TAGS_OF_INTEREST = [
    # keep short & high-level for robustness (musicnn tags vary slightly)
    "guitar", "piano", "drums", "bass", "violin", "cello", "saxophone",
    "trumpet", "flute", "strings", "synth", "organ", "voice", "vocals"
]
INSTRUMENT_THRESHOLD = 0.35           # probability threshold per frame
RANDOM_SEED = 7

np.random.seed(RANDOM_SEED)
/Users/souvikmandal/AudioVenv/lib/python3.11/site-packages/resampy/filters.py:32: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
  import pkg_resources
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[2], line 29
     26 import openl3
     28 # Pitch tracking (melody)
---> 29 import crepe
     31 # Music tagging (instrument/mood/genre tags)
     32 from musicnn.extractor import extractor as musicnn_extractor

ModuleNotFoundError: No module named 'crepe'

def load_audio_mono(path, sr=TARGET_SR):
    y, srx = librosa.load(path, sr=sr, mono=True)
    return y, sr

def frame_times(n_frames, sr=TARGET_SR, hop_length=HOP_LENGTH, center=True):
    frames = np.arange(n_frames)
    return librosa.frames_to_time(frames, sr=sr, hop_length=hop_length, n_fft=None)

def summarize_series(x: np.ndarray) -> Dict[str, float]:
    x = np.asarray(x).reshape(-1)
    return {
        "mean": float(np.nanmean(x)),
        "std": float(np.nanstd(x)),
        "min": float(np.nanmin(x)),
        "max": float(np.nanmax(x)),
        "median": float(np.nanmedian(x))
    }

def compute_beats(y, sr):
    tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
    beat_times = librosa.frames_to_time(beat_frames, sr=sr)
    # instantaneous tempo curve from beat intervals
    inst_tempo = None
    if len(beat_times) > 1:
        ibi = np.diff(beat_times)         # seconds
        inst_tempo = 60.0 / ibi           # bpm
    return float(np.atleast_1d(tempo)[0]), beat_times, inst_tempo

def compute_tempogram(y, sr):
    oenv = librosa.onset.onset_strength(y=y, sr=sr, hop_length=HOP_LENGTH)
    tempogram = librosa.feature.tempogram(onset_envelope=oenv, sr=sr, hop_length=HOP_LENGTH)
    # derive a dominant tempo curve by picking max lag per frame
    tempi = librosa.tempo_frequencies(tempogram.shape[0], sr=sr, hop_length=HOP_LENGTH)
    idx = np.argmax(tempogram, axis=0)
    local_tempo = tempi[idx]
    times = frame_times(tempogram.shape[1], sr=sr, hop_length=HOP_LENGTH)
    return {"times": times.tolist(), "local_tempo_bpm": local_tempo.tolist()}

def compute_downbeats_optional(audio_path):
    """Downbeat tracking via madmom if available; else return []."""
    try:
        from madmom.features.downbeats import RNNDownBeatProcessor, DBNDownBeatTrackingProcessor
        act = RNNDownBeatProcessor()(audio_path)
        dbn = DBNDownBeatTrackingProcessor(beats_per_bar=[3,4], fps=100)
        beats = dbn(act)  # Nx2 [time, beat_idx]; 1 marks downbeat
        downbeat_times = [float(t) for t, b in beats if int(b) == 1]
        return downbeat_times
    except Exception:
        return []

# Build major/minor chord templates over chroma (12 bins)
_MAJOR = np.zeros((12, 12))
_MINOR = np.zeros((12, 12))
for root in range(12):
    _MAJOR[root, [root, (root+4)%12, (root+7)%12]] = 1.0
    _MINOR[root, [root, (root+3)%12, (root+7)%12]] = 1.0

CHORD_LABELS = [f"{n}" for n in ["C","C#","D","D#","E","F","F#","G","G#","A","A#","B"]]
MAJ_LBL = [f"{p}:maj" for p in CHORD_LABELS]
MIN_LBL = [f"{p}:min" for p in CHORD_LABELS]
ALL_CHORD_LABELS = MAJ_LBL + MIN_LBL

def chord_segments(y, sr, beat_frames):
    # Compute chroma and average per beat
    chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=HOP_LENGTH)
    if beat_frames is None or len(beat_frames) < 2:
        # fallback to fixed windows
        beat_frames = np.arange(chroma.shape[1])
    beat_sync = librosa.util.sync(chroma, beat_frames, aggregate=np.mean)
    seg_times = librosa.frames_to_time(np.append(beat_frames, beat_frames[-1]+1), sr=sr)

    # Correlate with templates
    chords = []
    for i in range(beat_sync.shape[1]):
        v = beat_sync[:, i] / (np.linalg.norm(beat_sync[:, i]) + 1e-8)
        maj_scores = (_MAJOR @ v).max(axis=1)
        min_scores = (_MINOR @ v).max(axis=1)
        j = int(np.argmax(np.r_[maj_scores, min_scores]))
        label = ALL_CHORD_LABELS[j]
        # confidence: softmax-like
        top = float(np.max(np.r_[maj_scores, min_scores]))
        conf = float(top)
        chords.append({
            "start": float(seg_times[i]),
            "end": float(seg_times[i+1]),
            "label": label,
            "confidence": conf
        })
    # Merge consecutive identical labels
    merged = []
    for c in chords:
        if merged and merged[-1]["label"] == c["label"]:
            merged[-1]["end"] = c["end"]
            merged[-1]["confidence"] = max(merged[-1]["confidence"], c["confidence"])
        else:
            merged.append(c)
    return merged

def estimate_melody_f0(y, sr, step_ms=10, viterbi=True):
    # CREPE expects 16 kHz but handles resampling internally.
    # Returns arrays (time_s, f0_hz, confidence)
    time, frequency, confidence, _ = crepe.predict(y, sr, viterbi=viterbi, step_size=step_ms)
    return {"times": time.tolist(), "f0_hz": frequency.tolist(), "confidence": confidence.tolist()}

def loudness_rms(y, sr):
    S = np.abs(librosa.stft(y, hop_length=HOP_LENGTH))**2
    rms = librosa.feature.rms(S=S)[0]
    times = frame_times(len(rms), sr=sr, hop_length=HOP_LENGTH)
    return {"times": times.tolist(), "rms": rms.tolist(), "summary": summarize_series(rms)}

def timbre_features(y, sr):
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20, hop_length=HOP_LENGTH)
    spec_centroid = librosa.feature.spectral_centroid(y=y, sr=sr, hop_length=HOP_LENGTH)[0]
    spec_bw = librosa.feature.spectral_bandwidth(y=y, sr=sr, hop_length=HOP_LENGTH)[0]
    rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr, hop_length=HOP_LENGTH)[0]
    zcr = librosa.feature.zero_crossing_rate(y=y, hop_length=HOP_LENGTH)[0]
    tonnetz = librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr)

    feats = {
        "mfcc_mean": np.mean(mfcc, axis=1).tolist(),
        "mfcc_std": np.std(mfcc, axis=1).tolist(),
        "spectral_centroid": summarize_series(spec_centroid),
        "spectral_bandwidth": summarize_series(spec_bw),
        "spectral_rolloff": summarize_series(rolloff),
        "zcr": summarize_series(zcr),
        "tonnetz_mean": np.mean(tonnetz, axis=1).tolist(),
        "tonnetz_std": np.std(tonnetz, axis=1).tolist(),
    }
    return feats

def instrument_taggram(audio_path, tags_of_interest=INSTRUMENT_TAGS_OF_INTEREST, threshold=INSTRUMENT_THRESHOLD):
    taggram, tags = musicnn_extractor(audio_path, model='MTT_musicnn')
    # taggram shape: [time_steps, n_tags]
    tag_idx = {t: i for i, t in enumerate(tags)}
    # Build a time vector from audio duration
    y, sr = librosa.load(audio_path, sr=TARGET_SR, mono=True)
    duration = librosa.get_duration(y=y, sr=sr)
    T = taggram.shape[0]
    times = np.linspace(0, duration, num=T, endpoint=False)

    # Collect per-time instrument probabilities
    per_time = []
    present = set()
    for ti in range(T):
        frame = {"time": float(times[ti])}
        for inst in tags_of_interest:
            # musicnn vocabulary not fixed; pick best matching tag(s)
            candidates = [k for k in tags if inst in k]
            if not candidates:
                continue
            prob = float(np.max([taggram[ti, tag_idx[c]] for c in candidates]))
            frame[inst] = prob
            if prob >= threshold:
                present.add(inst)
        per_time.append(frame)

    return {"times": [float(t) for t in times], "per_time": per_time, "present_instruments": sorted(list(present))}

def compute_openl3_embeddings(y, sr, hop_s=EMBED_HOP, input_repr='mel256', content_type='music', embedding_size=512):
    emb, ts = openl3.get_audio_embedding(y, sr, hop_size=hop_s, input_repr=input_repr,
                                         content_type=content_type, embedding_size=embedding_size)
    return {"times": ts.tolist(), "embeddings": emb.tolist(), "mean_embedding": np.mean(emb, axis=0).tolist()}

def process_track(audio_path: str) -> Dict[str, Any]:
    y, sr = load_audio_mono(audio_path, sr=TARGET_SR)
    duration = librosa.get_duration(y=y, sr=sr)

    bpm, beat_times, inst_tempo = compute_beats(y, sr)
    tempo_curve = compute_tempogram(y, sr)
    downbeats = compute_downbeats_optional(audio_path)

    # Beat frames for chord alignment
    beat_frames = librosa.time_to_frames(beat_times, sr=sr, hop_length=HOP_LENGTH) if len(beat_times) else None
    chords = chord_segments(y, sr, beat_frames)

    melody = estimate_melody_f0(y, sr)

    loud = loudness_rms(y, sr)
    timbre = timbre_features(y, sr)

    inst = instrument_taggram(audio_path)

    emb = compute_openl3_embeddings(y, sr)

    meta = {
        "track": {
            "path": audio_path,
            "filename": os.path.basename(audio_path),
            "duration_sec": float(duration),
            "sample_rate": int(sr)
        },
        "tempo": {"global_bpm": float(bpm),
                  "beat_times": [float(t) for t in beat_times],
                  "instant_bpm": [] if inst_tempo is None else [float(x) for x in inst_tempo]},
        "rhythm": {"tempo_curve": tempo_curve,
                   "downbeat_times": downbeats},
        "chords": chords,
        "melody_f0": melody,
        "loudness": loud,
        "timbre": timbre,
        "instruments": inst,
        "embeddings": {"openl3": {"mean": emb["mean_embedding"], "times": emb["times"]}},  # not storing full per-frame emb to keep JSON small
    }
    return meta

audio_files = sorted([p for ext in ("*.wav","*.mp3","*.flac","*.ogg","*.m4a") for p in glob.glob(os.path.join(AUDIO_DIR, ext))])
print(f"Found {len(audio_files)} audio files under {AUDIO_DIR}")

all_rows = []
for p in audio_files:
    print(f"Processing: {p}")
    meta = process_track(p)
    # Save metadata JSON
    out_json = os.path.join(OUTPUT_DIR, "metadata", os.path.splitext(os.path.basename(p))[0] + ".json")
    with open(out_json, "w") as f:
        json.dump(meta, f)
    # Aggregate a track-level feature vector
    vec = {
        "track": meta["track"]["filename"],
        "duration_sec": meta["track"]["duration_sec"],
        "tempo_bpm": meta["tempo"]["global_bpm"]
    }
    # Add timbre summaries
    for k, v in meta["timbre"].items():
        if isinstance(v, dict):  # summary
            for sname, sval in v.items():
                vec[f"{k}_{sname}"] = float(sval)
        else:  # list (e.g., mfcc stats)
            if isinstance(v, list) and len(v) == 20:
                for i, val in enumerate(v):
                    vec[f"{k}_{i}"] = float(val)
    # Add instrument presence as binary/mean probs
    inst_present = set(meta["instruments"]["present_instruments"])
    for inst in INSTRUMENT_TAGS_OF_INTEREST:
        vec[f"inst_{inst}"] = 1.0 if inst in inst_present else 0.0
    # Add OpenL3 mean embedding (512D)
    mean_emb = meta["embeddings"]["openl3"]["mean"]
    for i, val in enumerate(mean_emb):
        vec[f"openl3_{i}"] = float(val)
    all_rows.append(vec)

df = pd.DataFrame(all_rows)
df.to_parquet(os.path.join(OUTPUT_DIR, "features", "track_features.parquet"), index=False)
df.to_csv(os.path.join(OUTPUT_DIR, "features", "track_features.csv"), index=False)
print(f"Saved features for {len(df)} tracks.")

# Load features back (you can skip this if running in one go)
feat_path = os.path.join(OUTPUT_DIR, "features", "track_features.parquet")
if os.path.exists(feat_path):
    df = pd.read_parquet(feat_path)
else:
    df = pd.read_csv(os.path.join(OUTPUT_DIR, "features", "track_features.csv"))

id_cols = ["track", "duration_sec", "tempo_bpm"]
X = df.drop(columns=id_cols).values

# Scale -> PCA (for speed) -> UMAP to 2D
scaler = StandardScaler()
Xz = scaler.fit_transform(X)

pca = PCA(n_components=min(64, Xz.shape[1]))
Xp = pca.fit_transform(Xz)

reducer = umap.UMAP(n_components=2, random_state=RANDOM_SEED)
X2 = reducer.fit_transform(Xp)

# Clustering
labels = None
if _HAVE_HDBSCAN:
    clusterer = HDBSCAN(min_cluster_size=3, min_samples=None)
    labels = clusterer.fit_predict(Xp)
else:
    km = KMeans(n_clusters=min(6, max(2, len(df)//3)), random_state=RANDOM_SEED, n_init="auto")
    labels = km.fit_predict(Xp)

out = df[id_cols].copy()
out["cluster"] = labels
out["umap_x"] = X2[:,0]
out["umap_y"] = X2[:,1]

out.to_csv(os.path.join(OUTPUT_DIR, "features", "clusters_umap2d.csv"), index=False)

# --- Plot 2D scatter
plt.figure(figsize=(8,6))
for lab in sorted(set(labels)):
    idx = labels == lab
    plt.scatter(out.loc[idx, "umap_x"], out.loc[idx, "umap_y"], label=f"cluster {lab}")
for i, row in out.iterrows():
    plt.text(row["umap_x"], row["umap_y"], str(i), fontsize=8)
plt.title("Track Similarity (UMAP)")
plt.xlabel("UMAP 1")
plt.ylabel("UMAP 2")
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "figures", "umap_clusters.png"), dpi=150)
plt.show()

out.head()

# Inspect one track's JSON metadata
import pprint, glob
jfiles = sorted(glob.glob(os.path.join(OUTPUT_DIR, "metadata", "*.json")))
if jfiles:
    with open(jfiles[0], "r") as f:
        meta = json.load(f)
    print("Showing snippet of", os.path.basename(jfiles[0]))
    pprint.pprint({k: meta[k] for k in ["track","tempo","rhythm","chords"]}, compact=True, width=120)
else:
    print("No JSON files yet. Run the batch cell after adding audio files under ./audio")