diff --git a/src/figures/Confusion matrix – scale + svm.png b/src/figures/Confusion matrix – scale + svm.png new file mode 100644 index 0000000..6eea955 Binary files /dev/null and b/src/figures/Confusion matrix – scale + svm.png differ diff --git a/src/figures/Part D – Validation accuracy across models and feature representations.png b/src/figures/Part D – Validation accuracy across models and feature representations.png new file mode 100644 index 0000000..99a5c3b Binary files /dev/null and b/src/figures/Part D – Validation accuracy across models and feature representations.png differ diff --git a/src/figures/feature_distributions_overlap_2_vs_5.png b/src/figures/feature_distributions_overlap_2_vs_5.png new file mode 100644 index 0000000..278b8f8 Binary files /dev/null and b/src/figures/feature_distributions_overlap_2_vs_5.png differ diff --git a/src/figures/pca_scatter_PCA_(2D)_projection_–_all_classes.png b/src/figures/pca_scatter_PCA_(2D)_projection_–_all_classes.png new file mode 100644 index 0000000..412ceac Binary files /dev/null and b/src/figures/pca_scatter_PCA_(2D)_projection_–_all_classes.png differ diff --git a/src/figures/pca_scatter_PCA_(2D)_projection_–_class_2_vs_class_5.png b/src/figures/pca_scatter_PCA_(2D)_projection_–_class_2_vs_class_5.png new file mode 100644 index 0000000..ab9faab Binary files /dev/null and b/src/figures/pca_scatter_PCA_(2D)_projection_–_class_2_vs_class_5.png differ diff --git a/src/figures/separability_best_features_2_vs_5.png b/src/figures/separability_best_features_2_vs_5.png new file mode 100644 index 0000000..ba5feda Binary files /dev/null and b/src/figures/separability_best_features_2_vs_5.png differ diff --git a/src/figures/separability_worst_features_2_vs_5.png b/src/figures/separability_worst_features_2_vs_5.png new file mode 100644 index 0000000..131ab4e Binary files /dev/null and b/src/figures/separability_worst_features_2_vs_5.png differ diff --git a/src/partD.py b/src/partD.py index bdf8c5c..22ad12c 100644 --- a/src/partD.py +++ b/src/partD.py @@ -1,5 +1,5 @@ # ------------------------------------------------------------ -# Part D - TV Dataset Classifier +# Part D - TV Dataset Classifier (Benchmark & Final Prediction) # Pattern Recognition – Semester Assignment # # Author: @@ -7,41 +7,53 @@ # cchoutou@ece.auth.gr # # Description: -# This module implements a complete classification pipeline -# for the high-dimensional TV dataset (Part D): -# - Loading training and test data -# - Basic preprocessing (scaling, optional dimensionality reduction) -# - Training a supervised classifier -# - Evaluating on a validation split -# - Predicting labels for the provided test set -# - Saving labels to labelsX.npy as required by the assignment -# -# Notes: -# The exact choice of classifier and preprocessing steps can -# be modified. The current skeleton uses a RandomForest model -# as a robust default for high-dimensional data. +# This module implements Part D of the assignment: +# - Load the high-dimensional TV dataset (224 features) +# - Benchmark multiple representative classifiers under the same splits +# - Compare performance under different feature representations +# (e.g., scaling vs scaling+PCA) +# - Demonstrate that performance limitations relate to the feature space, +# not merely the classifier choice +# - Train a final chosen model on the full training set +# - Predict labels for the provided test set and save to labelsX.npy # ------------------------------------------------------------ -from typing import Tuple +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Sequence, Tuple, Iterable + +import os +import sys +from itertools import product import numpy as np import pandas as pd +import matplotlib as mpl +import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split +from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler -from sklearn.ensemble import RandomForestClassifier -# from sklearn.decomposition import PCA # Optional, if you decide to use PCA +from sklearn.decomposition import PCA + +from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report + +from sklearn.naive_bayes import GaussianNB +from sklearn.linear_model import LogisticRegression +from sklearn.svm import SVC, LinearSVC +from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier +from sklearn.neighbors import KNeighborsClassifier +from sklearn.neural_network import MLPClassifier from toolbox import load_csv, datasetTV, datasetTest - # -------------------------------------------------- -# Data loading +# Dataset loading (facade) # -------------------------------------------------- def load_tv_training() -> Tuple[np.ndarray, np.ndarray]: """ - Loads the TV training dataset (Part D) and splits it into - features and labels. + Loads the TV training dataset and splits into features and labels. Returns ------- @@ -49,7 +61,7 @@ def load_tv_training() -> Tuple[np.ndarray, np.ndarray]: X_train (ndarray, shape (N_train, D)): Training feature matrix. y_train (ndarray, shape (N_train,)): - Training class labels (1..5). + Training labels. """ df = load_csv(datasetTV, header=None) data = df.values @@ -60,191 +72,1219 @@ def load_tv_training() -> Tuple[np.ndarray, np.ndarray]: def load_tv_test() -> np.ndarray: """ - Loads the TV test dataset (Part D) without labels. + Loads the TV test dataset (no labels). Returns ------- X_test (ndarray, shape (N_test, D)): - Test feature matrix (no labels). + Test feature matrix. """ df = load_csv(datasetTest, header=None) - X_test = df.values - return X_test + return df.values # -------------------------------------------------- -# Preprocessing +# Preprocessing factory + facade # -------------------------------------------------- -def preprocess_features( - X_train: np.ndarray, - X_test: np.ndarray | None = None, -) -> Tuple[np.ndarray, np.ndarray | None, StandardScaler]: +def build_preprocessor(preprocess_spec: Dict[str, Any]) -> Pipeline: """ - Applies basic preprocessing to the feature matrices. - By default, standardizes features (zero mean, unit variance). + Builds a sklearn Pipeline preprocessor from a preprocess spec. + + Parameters + ---------- + preprocess_spec : dict + + Returns + ------- + preprocessor : Pipeline + """ + if preprocess_spec.get("type") != "pipeline": + raise ValueError(f"Unknown preprocess type: {preprocess_spec.get('type')}") + + steps_cfg = preprocess_spec.get("steps", []) + steps = [] + + for step_cfg in steps_cfg: + stype = step_cfg["type"] + params = step_cfg.get("params", {}) + + if stype == "scaler": + steps.append(("scaler", StandardScaler(**params))) + elif stype == "pca": + steps.append(("pca", PCA(**params))) + else: + raise ValueError(f"Unknown preprocess step: {stype}") + + return Pipeline(steps) + +def preprocess_features( + X_train: np.ndarray, X_other: Optional[np.ndarray], preprocess_spec: Dict[str, Any] +) -> Tuple[np.ndarray, Optional[np.ndarray], Pipeline]: + """ + Applies preprocessing to feature matrices using a selected spec. Parameters ---------- X_train : ndarray, shape (N_train, D) - Training features. - X_test : ndarray, shape (N_test, D) or None - Test features, if available. + Training features (used to fit preprocessing). + X_other : ndarray or None + Validation/test features (transformed using the fitted preprocessing). + preprocess_spec : dict + Preprocessing specification. Returns ------- tuple: X_train_proc (ndarray): Preprocessed training features. - X_test_proc (ndarray or None): - Preprocessed test features (if X_test is not None). - scaler (StandardScaler): - Fitted scaler object (can be reused later). + X_other_proc (ndarray or None): + Preprocessed validation/test features. + preprocessor (Pipeline): + Fitted preprocessor object. """ - scaler = StandardScaler() - X_train_proc = scaler.fit_transform(X_train) + preprocessor = build_preprocessor(preprocess_spec) + X_train_proc = preprocessor.fit_transform(X_train) - if X_test is not None: - X_test_proc = scaler.transform(X_test) - else: - X_test_proc = None + if X_other is None: + return X_train_proc, None, preprocessor - # If later θέλεις PCA: - # pca = PCA(n_components=some_k) - # X_train_proc = pca.fit_transform(X_train_proc) - # if X_test_proc is not None: - # X_test_proc = pca.transform(X_test_proc) + X_other_proc = preprocessor.transform(X_other) + return X_train_proc, X_other_proc, preprocessor - return X_train_proc, X_test_proc, scaler + +# -------------------------------------------------- +# Model factory + facade +# -------------------------------------------------- +def build_model(model_spec: Dict[str, Any]): + """ + Builds a classifier from a model spec. + + Parameters + ---------- + model_spec : dict + + Returns + ------- + model : + sklearn-like classifier with fit/predict + """ + mtype = model_spec.get("type") + params = model_spec.get("params", {}) + + if mtype == "gaussian_nb": + return GaussianNB(**params) + + if mtype == "logreg": + return LogisticRegression(**params) + + if mtype == "svm": + return SVC(**params) + + if mtype == "linear_svm": + return LinearSVC(**params) + + if mtype == "random_forest": + return RandomForestClassifier(**params) + + if mtype == "mlp": + return MLPClassifier(**params) + + if mtype == "knn": + return KNeighborsClassifier(**params) + + if mtype == "adaboost": + return AdaBoostClassifier(**params) + + raise ValueError(f"Unknown model type: {mtype}") # -------------------------------------------------- # Model training & evaluation # -------------------------------------------------- -def train_classifier(X_train: np.ndarray, y_train: np.ndarray) -> RandomForestClassifier: +def train_classifier(X_train: np.ndarray, y_train: np.ndarray, model_spec: Dict[str, Any]): """ - Trains a supervised classifier on the given features and labels. - - Currently uses a RandomForestClassifier as a robust default, - but this can be replaced with any other model. + Trains a classifier chosen by model_spec. Parameters ---------- - X_train : ndarray, shape (N_train, D) - y_train : ndarray, shape (N_train,) + X_train : ndarray + y_train : ndarray + model_spec : dict Returns ------- - model (RandomForestClassifier): + model : Trained classifier. """ - model = RandomForestClassifier( - n_estimators=200, - max_depth=None, - random_state=0, - n_jobs=-1, - ) + model = build_model(model_spec) model.fit(X_train, y_train) return model -def evaluate_classifier( - model, - X_val: np.ndarray, - y_val: np.ndarray, -) -> float: +def evaluate_classifier(model, X_val: np.ndarray, y_val: np.ndarray) -> float: """ Evaluates a trained classifier on a validation set. - Parameters - ---------- - model : - Any scikit-learn-like classifier with .predict method. - X_val : ndarray, shape (N_val, D) - y_val : ndarray, shape (N_val,) - Returns ------- acc : float - Classification accuracy on the validation set. + Validation accuracy. """ y_pred = model.predict(X_val) - acc = float(np.mean(y_pred == y_val)) - return acc + return float(np.mean(y_pred == y_val)) # -------------------------------------------------- # Prediction & saving labels # -------------------------------------------------- -def predict_labels( - model, - X_test: np.ndarray, -) -> np.ndarray: +def predict_labels(model, X_test: np.ndarray) -> np.ndarray: """ - Predicts labels for the TV test set. - - Parameters - ---------- - model : - Trained classifier. - X_test : ndarray, shape (N_test, D) + Predicts labels for the provided test set. Returns ------- - labels (ndarray, shape (N_test,)): - Predicted class labels for each test sample. + labels : ndarray + Predicted labels. """ - labels = model.predict(X_test) - return labels.astype(int) + return model.predict(X_test).astype(int) def save_labels(labels: np.ndarray, filename: str = "labelsX.npy") -> None: """ - Saves predicted labels to a .npy file as required by the assignment. - - Parameters - ---------- - labels : ndarray, shape (N_test,) - Predicted class labels. - filename : str - Output filename (default: "labelsX.npy"). + Saves labels to a .npy file (assignment requirement). """ np.save(filename, labels) print(f"Saved labels to {filename} with shape {labels.shape}") # -------------------------------------------------- -# Main pipeline for Part D +# Benchmark helpers # -------------------------------------------------- -if __name__ == "__main__": - # 1. Load training and test sets - X_train_raw, y_train = load_tv_training() - X_test_raw = load_tv_test() +@dataclass +class ExperimentResult: + preprocess_key: str + model_key: str + val_accuracy: float + y_val: np.ndarray + y_val_pred: np.ndarray - # 2. Train/validation split on the training data + +def run_experiment( + X_train_raw: np.ndarray, + y_train: np.ndarray, + preprocess_key: str, + model_key: str, + test_size: float = 0.2, + seed: int = 0, +) -> ExperimentResult: + """ + Runs a single experiment: + split -> preprocess -> train -> eval + + Returns + ------- + result : ExperimentResult + """ + X_tr, X_val, y_tr, y_val = train_test_split( + X_train_raw, + y_train, + test_size=test_size, + random_state=seed, + stratify=y_train, + ) + + preprocess_spec = PREPROCESS_SPECS[preprocess_key] + X_tr_proc, X_val_proc, _ = preprocess_features(X_tr, X_val, preprocess_spec) + + model_spec = MODEL_SPECS[model_key] + model = train_classifier(X_tr_proc, y_tr, model_spec) + + y_val_pred = model.predict(X_val_proc) + val_acc = float(np.mean(y_val_pred == y_val)) + + return ExperimentResult( + preprocess_key=preprocess_key, + model_key=model_key, + val_accuracy=val_acc, + y_val=y_val, + y_val_pred=y_val_pred.astype(int), + ) + + +def run_benchmark( + X_train_raw: np.ndarray, + y_train: np.ndarray, + preprocess_keys: Sequence[str], + model_keys: Sequence[str], + seed: int = 0, +) -> List[ExperimentResult]: + """ + Runs the full benchmark grid. + + Returns + ------- + results : list of ExperimentResult + """ + results: List[ExperimentResult] = [] + + for pkey in preprocess_keys: + for mkey in model_keys: + r = run_experiment( + X_train_raw=X_train_raw, + y_train=y_train, + preprocess_key=pkey, + model_key=mkey, + seed=seed, + ) + results.append(r) + print(f"[{pkey:>12}] [{mkey:>10}] val_acc={r.val_accuracy:.4f}") + + return results + + +def results_to_dataframe(results: List[ExperimentResult]) -> pd.DataFrame: + """ + Converts results list into a pandas DataFrame. + + Returns + ------- + df : DataFrame + """ + rows = [] + for r in results: + rows.append( + { + "preprocess": r.preprocess_key, + "model": r.model_key, + "val_accuracy": r.val_accuracy, + } + ) + return pd.DataFrame(rows).sort_values(by=["preprocess", "val_accuracy"], ascending=[True, False]) + + +# -------------------------------------------------- +# Plotting +# -------------------------------------------------- +def plot_accuracy_bars(df: pd.DataFrame, title: str) -> None: + """ + Bar plot of validation accuracy per (preprocess, model). + """ + plt.figure(figsize=(12, 6)) + + # Build grouped bars by preprocess + preprocesses = df["preprocess"].unique() + models = df["model"].unique() + + # We plot per preprocess: one bar per model + x = np.arange(len(models)) + width = 0.8 / len(preprocesses) + + for i, p in enumerate(preprocesses): + sub = df[df["preprocess"] == p].set_index("model").reindex(models) + plt.bar(x + i * width, sub["val_accuracy"].values, width=width, label=p) + + plt.xticks(x + width * (len(preprocesses) - 1) / 2, models, rotation=0) + plt.ylabel("Validation accuracy") + plt.title(title) + plt.grid(True, axis="y", alpha=0.3) + plt.legend() + plt.tight_layout() + plt.show(block=False) + plt.savefig(f"figures/" + title + ".png", dpi=300) + plt.close() + + +def plot_confusion(y_true: np.ndarray, y_pred: np.ndarray, title: str) -> None: + """ + Confusion matrix plot. + """ + cm = confusion_matrix(y_true, y_pred) + disp = ConfusionMatrixDisplay(confusion_matrix=cm) + fig, ax = plt.subplots(figsize=(7, 6)) + disp.plot(ax=ax, cmap="Blues", colorbar=True) + ax.set_title(title) + plt.tight_layout() + plt.show(block=False) + plt.savefig(f"figures/" + title + ".png", dpi=300) + plt.close() + + +def plot_pca_scatter_2d( + X: np.ndarray, + y: np.ndarray, + title: str, + classes_to_show: Optional[Sequence[int]] = None, +) -> None: + """ + Projects data to 2D using PCA and plots a scatter colored by class. + + Parameters + ---------- + X : ndarray, shape (N, D) + y : ndarray, shape (N,) + title : str + classes_to_show : optional sequence of class labels + If provided, plot only these classes. + """ + if classes_to_show is not None: + mask = np.isin(y, np.array(classes_to_show)) + Xp = X[mask] + yp = y[mask] + else: + Xp = X + yp = y + + pca2 = PCA(n_components=2, random_state=0) + Z = pca2.fit_transform(Xp) + + classes = np.unique(yp) + cmap = plt.get_cmap("tab10", len(classes)) + + plt.figure(figsize=(10, 6)) + for i, c in enumerate(classes): + m = (yp == c) + plt.scatter(Z[m, 0], Z[m, 1], s=10, alpha=0.6, c=[cmap(i)], label=f"class {c}") + + plt.title(title) + plt.xlabel("PC1") + plt.ylabel("PC2") + plt.grid(True, alpha=0.3) + plt.legend(framealpha=0.9) + plt.tight_layout() + plt.savefig( + f"figures/pca_scatter_{title.replace(' ', '_').replace('/', '')}.png", + dpi=300, + bbox_inches="tight", + ) + plt.show(block=False) + plt.pause(0.001) + plt.close() + + + +def plot_feature_separability( + d_scores: np.ndarray, + top_k: int = 10, + title: str = "Per-feature separability between classes 2 and 5", +) -> Tuple[np.ndarray, np.ndarray]: + """ + Plots the most and least separable features according to d_scores. + + Returns + ------- + best_idx : ndarray + Indices of top-k best separating features. + worst_idx : ndarray + Indices of top-k worst separating features. + """ + D = d_scores.shape[0] + idx_sorted = np.argsort(d_scores) # ascending + + worst_idx = idx_sorted[:top_k] + best_idx = idx_sorted[-top_k:][::-1] + + # Plot best + plt.figure(figsize=(10, 4)) + plt.bar([str(i) for i in best_idx], d_scores[best_idx]) + plt.title(title + " (Top features)") + plt.xlabel("Feature index") + plt.ylabel("d-score") + plt.grid(True, axis="y", alpha=0.3) + plt.tight_layout() + plt.savefig( + "figures/separability_best_features_2_vs_5.png", + dpi=300, + bbox_inches="tight", + ) + plt.show(block=False) + plt.pause(0.001) + plt.close() + + # Plot worst + plt.figure(figsize=(10, 4)) + plt.bar([str(i) for i in worst_idx], d_scores[worst_idx]) + plt.title(title + " (Worst features – strong overlap)") + plt.xlabel("Feature index") + plt.ylabel("d-score") + plt.grid(True, axis="y", alpha=0.3) + plt.tight_layout() + plt.savefig( + "figures/separability_worst_features_2_vs_5.png", + dpi=300, + bbox_inches="tight", + ) + plt.show(block=False) + plt.pause(0.001) + plt.close() + + return best_idx, worst_idx + + +def plot_feature_distributions_grid( + X2: np.ndarray, + X5: np.ndarray, + feature_indices: Sequence[int], + n_cols: int = 3, + bins: int = 30, + title: str = "Feature distributions (classes 2 vs 5)", +) -> None: + """ + Plots hist overlays for selected feature indices for classes 2 and 5. + """ + feature_indices = list(feature_indices) + n = len(feature_indices) + n_rows = int(np.ceil(n / n_cols)) + + plt.figure(figsize=(4 * n_cols, 3 * n_rows)) + for k, j in enumerate(feature_indices): + ax = plt.subplot(n_rows, n_cols, k + 1) + ax.hist(X2[:, j], bins=bins, density=True, alpha=0.5, label="class 2") + ax.hist(X5[:, j], bins=bins, density=True, alpha=0.5, label="class 5") + ax.set_title(f"Feature {j}") + ax.grid(True, alpha=0.3) + + if k == 0: + ax.legend(framealpha=0.9) + + plt.suptitle(title) + plt.tight_layout() + plt.savefig( + "figures/feature_distributions_overlap_2_vs_5.png", + dpi=150, + bbox_inches="tight", + ) + plt.show(block=False) + plt.pause(0.001) + plt.close() + + + +def tune_one_config( + X_raw: np.ndarray, + y: np.ndarray, + preprocess_spec: Dict[str, Any], + preprocess_name: str, + model_key: str, + param_grid: Dict[str, List[Any]], + cv: int = 5, + seed: int = 0, +) -> Tuple[pd.DataFrame, Dict[str, Any]]: + """ + Brute-force CV tuning for a single (preprocess, model) configuration. + + Returns + ------- + df : DataFrame with columns: + preprocess, model, params, mean_acc, std_acc + best : dict with keys: + best_params, best_mean_acc, best_std_acc + """ + combos = expand_param_grid(param_grid) + folds = stratified_kfold_indices(y, n_splits=cv, seed=seed) + + rows = [] + best = { + "best_params": None, + "best_mean_acc": -1.0, + "best_std_acc": None, + } + + for params in combos: + accs = [] + + for tr_idx, va_idx in folds: + X_tr_raw, X_va_raw = X_raw[tr_idx], X_raw[va_idx] + y_tr, y_va = y[tr_idx], y[va_idx] + + # 1) Fit preprocess on train fold, transform train+val + prep = build_preprocessor(preprocess_spec) + X_tr = prep.fit_transform(X_tr_raw) + X_va = prep.transform(X_va_raw) + + # 2) Build model with params and train + base_model_spec = MODEL_SPECS[model_key] + model_spec = { + "type": base_model_spec["type"], + "params": {**base_model_spec.get("params", {}), **params}, + } + model = build_model(model_spec) + model.fit(X_tr, y_tr) + + # 3) Evaluate fold accuracy + y_hat = model.predict(X_va) + acc = float(np.mean(y_hat == y_va)) + accs.append(acc) + + # --- progress print --- + combo_i = combos.index(params) + 1 # 1..len(combos) + fold_i = accs.__len__() + running_mean = float(np.mean(accs)) + if fold_i == 5: + print( + f"[{preprocess_name} | {model_key}] " + f"combo {combo_i:>3}/{len(combos)} " + f"mean={running_mean:.4f} " + f"params={params}", + flush=True, + ) + + mean_acc = float(np.mean(accs)) + std_acc = float(np.std(accs)) + + rows.append({ + "preprocess": preprocess_name, + "model": model_key, + "params": params, + "mean_acc": mean_acc, + "std_acc": std_acc, + }) + + if mean_acc > best["best_mean_acc"]: + best["best_mean_acc"] = mean_acc + best["best_std_acc"] = std_acc + best["best_params"] = params + + df = pd.DataFrame(rows).sort_values("mean_acc", ascending=False).reset_index(drop=True) + return df, best + + + +# -------------------------------------------------- +# Final training + prediction (using chosen/best config) +# -------------------------------------------------- + +def final_training_for_all_best_configs( + X_train_raw: np.ndarray, + y_train: np.ndarray, + X_test_raw: np.ndarray, + best_per_config: Dict[str, Any], + seed: int = 0, +) -> Dict[str, np.ndarray]: + """ + For each tuned config (best_per_config), train on a train/val split + to plot a confusion matrix, then train on full training set and + predict test labels. + + NOTE: + This version matches your current schema: + entry = { + 'name': ..., + 'preprocess_spec': {...}, + 'preprocess_name': 'scale' / ..., + 'model': 'svm' / ..., + 'params': {...}, + 'mean_acc': ..., + 'std_acc': ... + } + + Returns + ------- + preds_per_config : dict + Mapping config_name -> predicted labels for X_test_raw + """ + preds_per_config: Dict[str, np.ndarray] = {} + + # One fixed split for comparable confusion matrices X_tr, X_val, y_tr, y_val = train_test_split( X_train_raw, y_train, test_size=0.2, - random_state=0, + random_state=seed, stratify=y_train, ) - # 3. Preprocess features (scaling, optional PCA) - X_tr_proc, X_val_proc, scaler = preprocess_features(X_tr, X_val) + for name, entry in best_per_config.items(): + preprocess_name = entry["preprocess_name"] + preprocess_spec = entry["preprocess_spec"] + model_key = entry["model"] + params = entry["params"] - # 4. Train classifier - model = train_classifier(X_tr_proc, y_tr) + # --- (1) confusion matrix on the same validation split --- + X_tr_p, X_val_p, _ = preprocess_features(X_tr, X_val, preprocess_spec) - # 5. Evaluate on validation set - val_acc = evaluate_classifier(model, X_val_proc, y_val) - print(f"Validation accuracy: {val_acc:.4f}") + base_model_spec = MODEL_SPECS[model_key] + model_spec = { + "type": base_model_spec["type"], + "params": {**base_model_spec.get("params", {}), **params}, + } - # 6. Retrain on full training set (optional but συνήθως καλό) - X_full_proc, X_test_proc, _ = preprocess_features(X_train_raw, X_test_raw) - final_model = train_classifier(X_full_proc, y_train) + model = train_classifier(X_tr_p, y_tr, model_spec) + y_val_pred = model.predict(X_val_p).astype(int) - # 7. Predict labels for official test set - labels = predict_labels(final_model, X_test_proc) + plot_confusion( + y_val, + y_val_pred, + title=f"Confusion matrix (tuned) – {preprocess_name} + {model_key}", + ) - # 8. Save labels to labelsX.npy - save_labels(labels, filename="labelsX.npy") + # --- (2) train on full training set, predict test, save separate .npy --- + X_train_p, X_test_p, _ = preprocess_features(X_train_raw, X_test_raw, preprocess_spec) + + model_full = train_classifier(X_train_p, y_train, model_spec) + y_test_pred = predict_labels(model_full, X_test_p) + + safe_name = f"{preprocess_name}_{model_key}".replace(" ", "_").replace("/", "_") + out_path = f"labelsX_{safe_name}.npy" + np.save(out_path, y_test_pred) + print(f"[FINAL] {safe_name}: saved {out_path} shape={y_test_pred.shape}") + + preds_per_config[name] = y_test_pred + + return preds_per_config + + + + +def train_final_and_predict( + X_train_raw: np.ndarray, + y_train: np.ndarray, + X_test_raw: np.ndarray, + best_overall: Dict[str, Any], + labels_path: str = "labelsX.npy", +) -> np.ndarray: + """ + Trains best model on full training set and predicts labels for test set. + Saves labels to .npy. + """ + preprocess_spec = best_overall["preprocess_spec"] + model_key = best_overall["model"] + params = best_overall["params"] + + prep = build_preprocessor(preprocess_spec) + X_train = prep.fit_transform(X_train_raw) + X_test = prep.transform(X_test_raw) + + base_model_spec = MODEL_SPECS[model_key] + model_spec = { + "type": base_model_spec["type"], + "params": {**base_model_spec.get("params", {}), **params}, + } + + model = build_model(model_spec) + model.fit(X_train, y_train) + + y_test_pred = model.predict(X_test).astype(int) + np.save(labels_path, y_test_pred) + print(f"Saved labels to {labels_path} with shape {y_test_pred.shape}") + + return y_test_pred + + + + + +# -------------------------------------------------- +# Helpers +# -------------------------------------------------- +def effect_size_per_feature(X2: np.ndarray, X5: np.ndarray, eps: float = 1e-12) -> np.ndarray: + """ + Computes a simple per-feature separability score between two classes. + + Score (Cohen-like d): + d_j = |mu2 - mu5| / sqrt( (var2 + var5)/2 ) + + Larger d => better separation (less overlap). + Smaller d => stronger overlap. + + Returns + ------- + d : ndarray, shape (D,) + Per-feature separability scores. + """ + mu2 = np.mean(X2, axis=0) + mu5 = np.mean(X5, axis=0) + + var2 = np.var(X2, axis=0) + var5 = np.var(X5, axis=0) + + pooled = np.sqrt(0.5 * (var2 + var5) + eps) + d = np.abs(mu2 - mu5) / pooled + return d + + +def expand_param_grid(param_grid: Dict[str, List[Any]]) -> List[Dict[str, Any]]: + """ + Converts {"C":[1,10], "gamma":[0.1,0.01]} to a list of dict combinations. + """ + keys = list(param_grid.keys()) + values = [param_grid[k] for k in keys] + combos = [] + for vals in product(*values): + combos.append({k: v for k, v in zip(keys, vals)}) + return combos + + +from sklearn.model_selection import StratifiedKFold + +def stratified_kfold_indices(y: np.ndarray, n_splits: int, seed: int = 0): + skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed) + return list(skf.split(np.zeros_like(y), y)) + + + +# -------------------------------------------------- +# Investigation phase +# -------------------------------------------------- +def investigation_phase( + X_train_raw: np.ndarray, + y_train: np.ndarray, + preprocess_keys: Sequence[str], + model_keys: Sequence[str], + seed: int = 0, + top_k: int = 3, +) -> Tuple[List[ExperimentResult], pd.DataFrame, List[str]]: + """ + Runs a coarse benchmark to identify the best-performing models. + + Returns + ------- + results : list of ExperimentResult + Raw experimental results for all configurations. + df_results : DataFrame + Validation accuracy for all (preprocess, model) combinations. + shortlisted_models : list of str + Top-k model keys based on best observed validation accuracy. + """ + results = run_benchmark( + X_train_raw=X_train_raw, + y_train=y_train, + preprocess_keys=preprocess_keys, + model_keys=model_keys, + seed=seed, + ) + + df = results_to_dataframe(results) + + best_per_model = ( + df.groupby("model")["val_accuracy"] + .max() + .sort_values(ascending=False) + ) + + shortlisted_models = list(best_per_model.head(top_k).index) + + print("\n=== Investigation summary ===") + print(best_per_model.to_string()) + print(f"\nSelected top-{top_k} models for further analysis: {shortlisted_models}") + + return results, df, shortlisted_models + + + +# -------------------------------------------------- +# Visualization phase +# -------------------------------------------------- +def visualization_phase( + results: List[ExperimentResult], + df: pd.DataFrame, +): + """ + Visualizes the key findings of the investigation phase. + + Currently includes: + - Grouped bar plot of validation accuracies + - Confusion matrix of the best-performing configuration + """ + # 1) Accuracy comparison plot + plot_accuracy_bars( + df, + title="Part D – Validation accuracy across models and feature representations", + ) + + # 2) Identify best configuration overall + best_row = df.iloc[df["val_accuracy"].argmax()] + best_preprocess = str(best_row["preprocess"]) + best_model = str(best_row["model"]) + best_acc = float(best_row["val_accuracy"]) + + print( + f"\nBest configuration overall:" + f" preprocess={best_preprocess}, model={best_model}, val_acc={best_acc:.4f}" + ) + + # 3) Confusion matrix for that configuration + best_result = None + for r in results: + if ( + r.preprocess_key == best_preprocess + and r.model_key == best_model + and abs(r.val_accuracy - best_acc) < 1e-12 + ): + best_result = r + break + + if best_result is None: + raise RuntimeError("Best result not found in experiment results.") + + plot_confusion( + best_result.y_val, + best_result.y_val_pred, + title=f"Confusion matrix – {best_preprocess} + {best_model}", + ) + + print("\nClassification report (best config):") + print(classification_report(best_result.y_val, best_result.y_val_pred)) + + +# -------------------------------------------------- +# Phase: Demonstrate the problem (overlap 2 vs 5) +# -------------------------------------------------- +def problem_demonstration_phase( + X_train_raw: np.ndarray, + y_train: np.ndarray, + class_a: int = 2, + class_b: int = 5, + top_k: int = 9, +) -> None: + """ + Demonstrates the core difficulty of the dataset by showing class overlap + between two specific classes (default: 2 and 5). + + Outputs: + - PCA 2D scatter (all classes) + - PCA 2D scatter (only class_a vs class_b) + - Per-feature separability bar plots (best/worst) + - Distribution grid for selected features + """ + # 1) PCA scatter - all classes + plot_pca_scatter_2d( + X_train_raw, + y_train, + title="PCA (2D) projection – all classes", + classes_to_show=None, + ) + + # 2) PCA scatter - only the problematic pair + plot_pca_scatter_2d( + X_train_raw, + y_train, + title=f"PCA (2D) projection – class {class_a} vs class {class_b}", + classes_to_show=[class_a, class_b], + ) + + # 3) Compute separability scores per feature (only for the pair) + X_a = X_train_raw[y_train == class_a] + X_b = X_train_raw[y_train == class_b] + + d_scores = effect_size_per_feature(X_a, X_b) + + best_idx, worst_idx = plot_feature_separability( + d_scores, + top_k=top_k, + title=f"Separability between classes {class_a} and {class_b}", + ) + + # 4) Show distributions for worst features (strong overlap) + plot_feature_distributions_grid( + X_a, X_b, + feature_indices=worst_idx, + title=f"Most overlapping features – classes {class_a} vs {class_b}", + ) + + # (Optional) show also best features + plot_feature_distributions_grid( + X_a, X_b, + feature_indices=best_idx, + title=f"Most separating features – classes {class_a} vs {class_b}", + ) + +def tuning_phase( + X_train_raw: np.ndarray, + y_train: np.ndarray, + tuning_specs: List[Dict[str, Any]], + seed: int = 0, +) -> Tuple[pd.DataFrame, Dict[str, Any], Dict[str, Any]]: + """ + Tunes multiple configurations defined in tuning_specs. + + Each tuning spec must contain: + - name: str + - preprocess: dict (same format as values of PREPROCESS_SPECS) + - preprocess_name: str (for logs/df, e.g. "scale" or "scale_pca_75") + - model: str (key in MODEL_SPECS) + - param_grid: dict + - cv: int (optional) + + Returns + ------- + df_all : DataFrame with all tried combinations for all configs + best_per_config : dict: spec_name -> best dict (+ preprocess/model) + best_overall : dict with keys: + preprocess_spec, preprocess_name, model, params, mean_acc, std_acc, name + """ + all_dfs = [] + best_per_config: Dict[str, Any] = {} + + best_overall = { + "name": None, + "preprocess_spec": None, + "preprocess_name": None, + "model": None, + "params": None, + "mean_acc": -1.0, + "std_acc": None, + } + + for spec in tuning_specs: + name = spec["name"] + preprocess_spec = spec["preprocess"] # <-- dict + preprocess_name = spec["preprocess_name"] # <-- string label for df/logs + model_key = spec["model"] + grid = spec["param_grid"] + cv = int(spec.get("cv", 5)) + + print(f"\n[TUNING] {name} (cv={cv}) ...") + df_cfg, best_cfg = tune_one_config( + X_raw=X_train_raw, + y=y_train, + preprocess_spec=preprocess_spec, + preprocess_name=preprocess_name, + model_key=model_key, + param_grid=grid, + cv=cv, + seed=seed, + ) + + df_cfg.insert(0, "config", name) + all_dfs.append(df_cfg) + + best_entry = { + "name": name, + "preprocess_spec": preprocess_spec, + "preprocess_name": preprocess_name, + "model": model_key, + "params": best_cfg["best_params"], + "mean_acc": best_cfg["best_mean_acc"], + "std_acc": best_cfg["best_std_acc"], + } + best_per_config[name] = best_entry + + print( + f" best mean_acc={best_entry['mean_acc']:.4f} " + f"(std={best_entry['std_acc']:.4f}) params={best_entry['params']}" + ) + + if best_entry["mean_acc"] > best_overall["mean_acc"]: + best_overall = dict(best_entry) + + df_all = ( + pd.concat(all_dfs, ignore_index=True) + .sort_values("mean_acc", ascending=False) + .reset_index(drop=True) + ) + + print("\n=== Tuning summary (best overall) ===") + print(best_overall) + + return df_all, best_per_config, best_overall + + + +# -------------------------------------------------- +# Experiment specifications (poor-man dependency injection) +# -------------------------------------------------- + +# Preprocessing specs: +# Each spec is a dict that describes a Pipeline of steps. +PREPROCESS_SPECS: Dict[str, Dict[str, Any]] = { + "scale": { + "type": "pipeline", + "steps": [ + {"type": "scaler", "params": {}}, + ], + }, + # Keep variance ratio as in your colleagues' exploration (low retained variance can still help) + "scale_pca_66": { + "type": "pipeline", + "steps": [ + {"type": "scaler", "params": {}}, + {"type": "pca", "params": {"n_components": 0.66}}, + ], + }, + "scale_pca_75": { + "type": "pipeline", + "steps": [ + {"type": "scaler", "params": {}}, + {"type": "pca", "params": {"n_components": 0.75}}, + ], + }, + "scale_pca_85": { + "type": "pipeline", + "steps": [ + {"type": "scaler", "params": {}}, + {"type": "pca", "params": {"n_components": 0.85}}, + ], + }, +} + +# Model specs: +# Each spec is a dict with "type" + estimator kwargs in "params". +MODEL_SPECS: Dict[str, Dict[str, Any]] = { + "gnb": { + "type": "gaussian_nb", + "params": {}, + }, + "rf": { + "type": "random_forest", + "params": { + "n_estimators": 400, "max_depth": None, "random_state": 0, "n_jobs": -1, + }, + }, + "logreg": { + "type": "logreg", + "params": { + "max_iter": 4000, + "C": 1.0, + "solver": "lbfgs", + #"multi_class": "auto", + }, + }, + "svm": { + "type": "svm", + "params": { + "kernel": "rbf", "C": 10.0, "gamma": "scale", + }, + }, + "linear_svm": { + "type": "linear_svm", + "params": { + "C": 1.0, "max_iter": 20000, + }, + }, + "mlp": { + "type": "mlp", + "params": { + "hidden_layer_sizes": (128, 64), + "activation": "relu", + "solver": "adam", + "max_iter": 2000, + "random_state": 0, + }, + }, + "knn": { + "type": "knn", + "params": { + "n_neighbors": 11, + "weights": "distance", + "p": 2, + }, + }, + "adaboost": { + "type": "adaboost", + "params": { + "n_estimators": 200, "learning_rate": 0.5, "random_state": 0, + }, + }, +} + +TUNING_SPECS = [ + # { + # "name": "scale + rf", + # "preprocess_name": "scale", + # "preprocess": PREPROCESS_SPECS["scale"], + # "model": "rf", + # "param_grid": { + # "n_estimators": [400, 800, 1200, 1400], #[200, 400, 800], + # "max_depth": [None], #[None, 20, 40, 80], + # "max_features": ["sqrt"], #["sqrt", "log2", 0.5], + # "min_samples_split": [2, 4, 8, 10],#[2, 5, 10], + # "min_samples_leaf": [1, 2, 4], #[1, 2, 4], + # }, + # "cv": 5, + # }, + { + "name": "scale + mlp", + "preprocess_name": "scale", + "preprocess": PREPROCESS_SPECS["scale"], + "model": "mlp", + "param_grid": { + "hidden_layer_sizes": [(128, ), (128, 64), (256, 128), (128, 64, 32)], + "alpha": [1e-5, 1e-4, 1e-3], + "learning_rate_init": [1e-3, 0.01, 0.02], + "activation": ["relu"], #["relu", "tanh"], + # "max_iter": [2000], + "solver": ["adam"], #["adam", "sgd"], + }, + "cv": 5, + }, + { + "name": "scale_pca_85 + knn", + "preprocess_name": "scale_pca_85", + "preprocess": PREPROCESS_SPECS["scale_pca_85"], + "model": "knn", + "param_grid": { + "n_neighbors": [7, 8, 9, 10, 11, 15, 31, 42], + "weights": ["uniform", "distance"], + "p": [1, 2], + }, + "cv": 5, + }, + { + "name": "scale + svm", + "preprocess_name": "scale", + "preprocess": PREPROCESS_SPECS["scale"], + "model": "svm", + "param_grid": { + "kernel": ["rbf", "poly"], + "C": [3, 4, 5, 5.5, 6, 10], + "degree": [2, 3, 5], + "gamma": ["scale", "auto"], + "class_weight": [None], + }, + "cv": 5, + }, +] + +# -------------------------------------------------- +# Main +# -------------------------------------------------- +if __name__ == "__main__": + os.makedirs("figures", exist_ok=True) + if len(sys.argv) > 1: + param = sys.argv[1] + else: + param = None + + # 1) Load data + X_train_raw, y_train = load_tv_training() + X_test_raw = load_tv_test() + + if param == "phase1" or param == "all": + # Phase 1: script + preprocess_keys = ["scale", "scale_pca_66", "scale_pca_75", "scale_pca_85"] + model_keys = ["gnb", "rf", "logreg", "linear_svm", "svm", "mlp", "knn", "adaboost"] + + # Phase 1.1: investigation + results, df, shortlisted_models = investigation_phase( + X_train_raw, y_train, preprocess_keys, model_keys, seed=0, top_k=3 + ) + + # Phase 1.2: visualization + visualization_phase(results, df) + + # Phase 1,3: problem demo + problem_demonstration_phase(X_train_raw, y_train, class_a=2, class_b=5, top_k=9) + + if param == "phase2" or param == "all": + + # Phase 2.1: Tuning + df_tune, best_per_cfg, best_overall = tuning_phase( + X_train_raw, y_train, TUNING_SPECS, seed=0 + ) + + # Optional: save tuning table for the report + df_tune.to_csv("figures/tuning_results.csv", index=False) + + # Phase 2.2: Final training for each tuned best config + confusion matrices + preds_per_cfg = final_training_for_all_best_configs( + X_train_raw=X_train_raw, + y_train=y_train, + X_test_raw=X_test_raw, + best_per_config=best_per_cfg, + seed=0, + ) + + # (Optional) also train/predict only for the best overall and save as the official submission file + y_test_pred = train_final_and_predict( + X_train_raw, y_train, X_test_raw, best_overall, labels_path="labelsX.npy" + )