|
| 1 | +import os |
| 2 | + |
| 3 | +import pytest |
| 4 | +import numpy as np |
| 5 | + |
| 6 | +from sklearn.model_selection import check_cv |
| 7 | +from sklearn.pipeline import make_pipeline |
| 8 | +from sklearn.preprocessing import StandardScaler |
| 9 | + |
| 10 | +from himalaya.backend import set_backend |
| 11 | +from himalaya.kernel_ridge import KernelRidgeCV |
| 12 | + |
| 13 | +from voxelwise_tutorials.delayer import Delayer |
| 14 | +from voxelwise_tutorials.io import load_hdf5_array |
| 15 | +from voxelwise_tutorials.io import get_data_home |
| 16 | +from voxelwise_tutorials.io import download_datalad |
| 17 | +from voxelwise_tutorials.utils import explainable_variance |
| 18 | +from voxelwise_tutorials.utils import generate_leave_one_run_out |
| 19 | + |
| 20 | +# use "cupy" or "torch_cuda" for faster computation with GPU |
| 21 | +backend = set_backend("numpy", on_error="warn") |
| 22 | + |
| 23 | +# Download the dataset |
| 24 | +subject = "S01" |
| 25 | +feature_spaces = ["motion_energy", "wordnet"] |
| 26 | +directory = get_data_home(dataset="shortclips") |
| 27 | +for file_name in [ |
| 28 | + "features/motion_energy.hdf", |
| 29 | + "features/wordnet.hdf", |
| 30 | + "mappers/S01_mappers.hdf", |
| 31 | + "responses/S01_responses.hdf", |
| 32 | +]: |
| 33 | + download_datalad(file_name, destination=directory, |
| 34 | + source="https://gin.g-node.org/gallantlab/shortclips") |
| 35 | + |
| 36 | + |
| 37 | +def run_model(X_train, X_test, Y_train, Y_test, run_onsets): |
| 38 | + ############## |
| 39 | + # define model |
| 40 | + n_samples_train = Y_train.shape[0] |
| 41 | + cv = generate_leave_one_run_out(n_samples_train, run_onsets, |
| 42 | + random_state=0, n_runs_out=1) |
| 43 | + cv = check_cv(cv) |
| 44 | + |
| 45 | + alphas = np.logspace(-4, 15, 20) |
| 46 | + |
| 47 | + model = make_pipeline( |
| 48 | + StandardScaler(with_mean=True, with_std=False), |
| 49 | + Delayer(delays=[1, 2, 3, 4]), |
| 50 | + KernelRidgeCV( |
| 51 | + kernel="linear", alphas=alphas, cv=cv, |
| 52 | + solver_params=dict(n_targets_batch=1000, n_alphas_batch=10)), |
| 53 | + ) |
| 54 | + |
| 55 | + ########### |
| 56 | + # run model |
| 57 | + model.fit(X_train, Y_train) |
| 58 | + test_scores = model.score(X_test, Y_test) |
| 59 | + |
| 60 | + test_scores = backend.to_numpy(test_scores) |
| 61 | + # cv_scores = backend.to_numpy(model[-1].cv_scores_) |
| 62 | + |
| 63 | + return test_scores |
| 64 | + |
| 65 | + |
| 66 | +@pytest.mark.parametrize('feature_space', feature_spaces) |
| 67 | +def test_model_fitting(feature_space): |
| 68 | + ########################################### |
| 69 | + # load the data |
| 70 | + |
| 71 | + # load X |
| 72 | + features_file = os.path.join(directory, 'features', |
| 73 | + feature_space + ".hdf") |
| 74 | + features = load_hdf5_array(features_file) |
| 75 | + X_train = features['X_train'] |
| 76 | + X_test = features['X_test'] |
| 77 | + |
| 78 | + # load Y |
| 79 | + responses_file = os.path.join(directory, 'responses', |
| 80 | + subject + "_responses.hdf") |
| 81 | + responses = load_hdf5_array(responses_file) |
| 82 | + Y_train = responses['Y_train'] |
| 83 | + Y_test_repeats = responses['Y_test'] |
| 84 | + run_onsets = responses['run_onsets'] |
| 85 | + |
| 86 | + ############################################# |
| 87 | + # select voxels based on explainable variance |
| 88 | + ev = explainable_variance(Y_test_repeats) |
| 89 | + mask = ev > 0.4 |
| 90 | + assert mask.sum() > 0 |
| 91 | + Y_train = Y_train[:, mask] |
| 92 | + Y_test = Y_test_repeats[:, :, mask].mean(0) |
| 93 | + |
| 94 | + ########################################### |
| 95 | + # fit a ridge model and compute test scores |
| 96 | + test_scores = run_model(X_train, X_test, Y_train, Y_test, run_onsets) |
| 97 | + assert np.percentile(test_scores, 95) > 0.05 |
| 98 | + assert np.percentile(test_scores, 99) > 0.15 |
| 99 | + assert np.percentile(test_scores, 100) > 0.35 |
0 commit comments