Source code for mofdscribe.splitters.utils

# -*- coding: utf-8 -*-
"""Helper functions for the splitters.

Some of these methods might also be useful for constructing nested cross-validation loops.
"""
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
from loguru import logger
from numpy.typing import ArrayLike
from pandas.api.types import infer_dtype
from scipy.spatial.distance import cdist
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm


# todo: can we do here something with numba?
# for numba, we would need to do some rewrite as there is no support
# for numpy.delete
[docs]def kennard_stone_sampling( X: ArrayLike, # noqa: N803 scale: bool = True, centrality_measure: str = "mean", metric: Union[Callable, str] = "euclidean", ) -> List[int]: """Run the Kennard-Stone sampling algorithm [KennardStone]. The algorithm selects samples with uniform converage. The initial samples are biased towards the boundaries of the dataset. .. note:: You also might this algorithm useful for creating a "diverse" sample of points, e.g., to initalize an active learnign loop. .. warning:: This algorithm has a high computational complexity. It is not recommended for large datasets. Args: X (ArrayLike): Input feature matrix. scale (bool): If True, apply z-score normalization prior to running the sampling. Defaults to True. centrality_measure (str): The first sample is selected to be maximally distanct from this value. It can be one of "mean", "median", "random". In case of "random" we simply select a random point. In the case of "mean" and "median" the initial point is maximally distanct from the mean and median of the feature matrix, respectively. Defaults to "mean". metric (Union[Callable, str]): The distance metric to use. If a string, the distance function can be ‘braycurtis’, ‘canberra’, ‘chebyshev’, ‘cityblock’, ‘correlation’, ‘cosine’, ‘dice’, ‘euclidean’, ‘hamming’, ‘jaccard’, ‘jensenshannon’, ‘kulsinski’, ‘kulczynski1’, ‘mahalanobis’, ‘matching’, ‘minkowski’, ‘rogerstanimoto’, ‘russellrao’, ‘seuclidean’, ‘sokalmichener’, ‘sokalsneath’, ‘sqeuclidean’, ‘yule’. See :py:meth:`scipy.spatial.distance.cdist`. Defaults to "euclidean". Raises: ValueError: If non-implemented centrality measure is used. Returns: List[int]: indices sorted by their Max-Min distance. """ if scale: scaler = StandardScaler() X = scaler.fit_transform(X) # noqa: N806 original_X = X.copy() # noqa: N806 if centrality_measure == "mean": distance_to_center = cdist(X, X.mean(axis=0).reshape(1, -1), metric=metric) elif centrality_measure == "median": distance_to_center = cdist(X, np.median(X, axis=0).reshape(1, -1), metric=metric) elif centrality_measure == "random": distance_to_center = cdist(X, X[np.random.choice(X.shape[0])].reshape(1, -1), metric=metric) else: raise ValueError(f"Unknown centrality measure: {centrality_measure}") index_farthest = np.argmax(distance_to_center) index_selected = [index_farthest] index_remaining = np.arange(len(X)) X = np.delete(X, index_selected, axis=0) # noqa: N806 index_remaining = np.delete(index_remaining, index_selected, axis=0) # we will sometimes need to do quite a few steps. # for all practical datasets this will exceeed python's recursion limit. # we therefore use a while loop to avoid this. with tqdm(total=len(X) - 1) as pbar: while len(index_remaining): samples_selected = original_X[index_selected] min_distance_to_samples_selected = np.min(cdist(samples_selected, X, metric=metric)) index_farthest = np.argmax(min_distance_to_samples_selected) index_selected.append(index_remaining[index_farthest]) X = np.delete(X, index_farthest, axis=0) # noqa: N806 index_remaining = np.delete(index_remaining, index_farthest, 0) pbar.update(1) return index_selected
[docs]def pca_kmeans( X: np.ndarray, # noqa: N803 scaled: bool, n_pca_components: Union[int, str], n_clusters: int, random_state: Optional[Union[int, np.random.RandomState]] = None, pca_kwargs: Optional[Dict[str, Any]] = None, kmeans_kwargs: Optional[Dict[str, Any]] = None, ) -> np.ndarray: """Run principal component analysis (PCA) followed by K-means clustering on the data. Uses sklearn's implementation of PCA, and k-means. Args: X (np.ndarray): Input data scaled (bool): If True, use standard scaling for clustering n_pca_components (Union[int, str]): number of principal components to keep n_clusters (int): number of clusters random_state (Optional[Union[int, np.random.RandomState]], optional): Random state for sklearn. Defaults to None. pca_kwargs (Dict[str, Any], optional): Additional keyword arguments for sklearn's :py:class:`sklearn.decomposition.PCA`. Defaults to None. kmeans_kwargs (Dict[str, Any], optional): Additional keyword arguments for sklearn's :py:class:`sklearn.clustering.KMeans`. Defaults to None. Returns: np.ndarray: Cluster indices. """ if scaled: X = StandardScaler().fit_transform(X) # noqa: N806 if n_pca_components is not None: pca = PCA(n_components=n_pca_components, **(pca_kwargs or {})) X_pca = pca.fit_transform(X) # noqa: N806 else: X_pca = X # noqa: N806 kmeans = KMeans(n_clusters=n_clusters, random_state=random_state, **(kmeans_kwargs or {})) return kmeans.fit_predict(X_pca)
[docs]def is_categorical(x: Union[float, int, np.typing.ArrayLike]) -> bool: """Return true if x is categorial or composed of integers.""" return infer_dtype(x) == "category" or infer_dtype(x) == "integer"
[docs]def stratified_train_test_partition( idxs: Sequence[int], stratification_col: np.typing.ArrayLike, train_size: float, valid_size: float, test_size: float, shuffle: bool = True, random_state: Optional[Union[int, np.random.RandomState]] = None, q: Sequence[float] = (0, 0.25, 0.5, 0.75, 1), ) -> Tuple[np.array, np.array, np.array]: """Perform a stratified train/test split. .. seealso:: * :py:meth:`mofdscribe.splitters.utils.grouped_stratified_train_test_partition`: performs an grouped stratified train/test split * :py:meth:`mofdscribe.splitters.utils.grouped_train_valid_test_partition`: performs an grouped un-stratified train/test split Args: idxs (Sequence[int]): Indices of points to split stratification_col (np.typing.ArrayLike): Data used for stratification. If it is categorical (see :py:meth:`mofdscribe.splitters.utils.is_categorical`) then we directly use it for stratification. Otherwise, we use quantile binning. train_size (float): Size of the training set as fraction. valid_size (float): Size of the validation set as fraction. test_size (float): Size of the test set as fraction. shuffle (bool): If True, perform a shuffled split. Defaults to True. random_state (Union[int, np.random.RandomState], optional): Random state for the suffler. Defaults to None. q (Sequence[float], optional): List of quantiles used for quantile binning. Defaults to (0, 0.25, 0.5, 0.75, 1). Returns: Tuple[np.array, np.array, np.array]: Train, validation, test indices. """ if stratification_col is not None: if is_categorical(stratification_col): stratification_col = stratification_col else: logger.warning( "Stratifying on non-categorical data. " "Note that there is still discussion on the usefullness of this method." ) stratification_col = quantile_binning(stratification_col, q=q) train_size, valid_size, test_size = get_train_valid_test_sizes( len(stratification_col), train_size, valid_size, test_size ) train_idx, test_idx, train_strat, _ = train_test_split( idxs, stratification_col, train_size=train_size + valid_size, test_size=test_size, shuffle=shuffle, random_state=random_state, stratify=stratification_col, ) if valid_size > 0: train_idx, valid_idx = train_test_split( train_idx, train_size=train_size, shuffle=shuffle, random_state=random_state, stratify=train_strat, ) else: valid_idx = [] return np.array(train_idx), np.array(valid_idx), np.array(test_idx)
[docs]def grouped_stratified_train_test_partition( stratification_col: np.typing.ArrayLike, group_col: np.typing.ArrayLike, train_size: float, valid_size: float, test_size: float, shuffle: bool = True, random_state: Optional[Union[int, np.random.RandomState]] = None, q: Sequence[float] = (0, 0.25, 0.5, 0.75, 1), center: Callable = np.median, ) -> Tuple[np.array, np.array, np.array]: """Return grouped stratified train-test partition. First, we compute the most common stratification category / centrality measure of the stratification column for every group. Then, we perform a stratified train/test partition on the groups. We then "expand" by concatenating the indices belonging to each group. .. warning:: Note that this won't work well if the number of groups and datapoints is small. It will also cause issues if the number of datapoints in the groups is very imbalanced. .. seealso:: * :py:meth:`mofdscribe.splitters.utils.stratified_train_test_partition`: performs an un-grouped stratified train/test split * :py:meth:`mofdscribe.splitters.utils.grouped_train_valid_test_partition`: performs an grouped un-stratified train/test split Args: stratification_col (np.typing.ArrayLike): Data used for stratification. If it is categorical (see :py:meth:`mofdscribe.splitters.utils.is_categorical`) then we directly use it for stratification. Otherwise, we use quantile binning. group_col (np.typing.ArrayLike): Data used for grouping. train_size (float): Size of the training set as fraction. valid_size (float): Size of the validation set as fraction. test_size (float): Size of the test set as fraction. shuffle (bool): If True, perform a shuffled split. Defaults to True. random_state (Union[int, np.random.RandomState], optional): Random state for the suffler. Defaults to None. q (Sequence[float], optional): List of quantiles used for quantile binning. Defaults to [0, 0.25, 0.5, 0.75, 1]. center (Callable): Aggregation function to compute a measure of centrality of all the points in a group such that this can then be used for stratification. This is only used for continuos inputs. For categorical inputs, we always use the mode. Returns: Tuple[np.array, np.array, np.array]: Train, validation, test indices. """ groups = np.unique(group_col) categorical = is_categorical(stratification_col) category_for_group = [] # for each group, get the mode if categorical: for group in groups: category_for_group.append(np.mode(stratification_col[group_col == group])) else: for group in groups: category_for_group.append(center(stratification_col[group_col == group])) # if we do not have categories, we now need to discretize the stratification_col if not categorical: category_for_group = quantile_binning(category_for_group, q=q) train_size, valid_size, test_size = get_train_valid_test_sizes( len(category_for_group), train_size, valid_size, test_size ) # now we can do the split train_groups, test_groups, train_cat, _ = train_test_split( groups, category_for_group, train_size=train_size + valid_size, test_size=test_size, stratify=category_for_group, shuffle=shuffle, random_state=random_state, ) if valid_size > 0: train_groups, valid_groups = train_test_split( train_groups, train_size=train_size, stratify=train_cat, shuffle=shuffle, random_state=random_state, ) # now, get the original indices train_indices = np.where(np.isin(group_col, train_groups))[0] if valid_size > 0: valid_indices = np.where(np.isin(group_col, valid_groups))[0] else: valid_indices = [] test_indices = np.where(np.isin(group_col, test_groups))[0] return train_indices, valid_indices, test_indices
[docs]def get_train_valid_test_sizes( size: int, train_size: float, valid_size: float, test_size: float ) -> Tuple[int, int, int]: """Compute the number of points in every split.""" train_size = int(np.floor(train_size * size)) valid_size = int(np.ceil(valid_size * size)) test_size = int(size - train_size - valid_size) return train_size, valid_size, test_size
[docs]def grouped_train_valid_test_partition( groups: np.typing.ArrayLike, train_size: float, valid_size: float, test_size: float, shuffle: bool = True, random_state: Optional[Union[int, np.random.RandomState]] = None, ) -> Tuple[np.array, Optional[np.array], np.array]: """Perform a grouped train/test split without stratification. .. seealso:: * :py:meth:`mofdscribe.splitters.utils.stratified_train_test_partition`: performs an un-grouped stratified train/test split * :py:meth:`mofdscribe.splitters.utils.grouped_stratified_train_test_partition`: performs an grouped stratified train/test split Args: groups (np.typing.ArrayLike): Data used for grouping. train_size (float): Size of the training set as fraction. valid_size (float): Size of the validation set as fraction. test_size (float): Size of the test set as fraction. shuffle (bool): If True, perform a shuffled split. Defaults to True. random_state (Union[int, np.random.RandomState], optional): Random state for the suffler. Defaults to None. Returns: Tuple[np.array, Optional[np.array], np.array]: Train, validation, test indices. """ train_indices = [] valid_indices: Optional[List[Any]] = [] test_indices = [] unique_groups = np.unique(groups) train_size, valid_size, test_size = get_train_valid_test_sizes( len(unique_groups), train_size, valid_size, test_size ) train_groups, test_groups = train_test_split( unique_groups, train_size=train_size + valid_size, test_size=test_size, shuffle=shuffle, random_state=random_state, ) if valid_size > 0: train_groups, valid_groups = train_test_split( train_groups, train_size=train_size, shuffle=shuffle, random_state=random_state, ) # now, get the original indices train_indices = np.where(np.isin(groups, train_groups))[0] if valid_size > 0: valid_indices = np.where(np.isin(groups, valid_groups))[0] else: valid_indices = None test_indices = np.where(np.isin(groups, test_groups))[0] return train_indices, valid_indices, test_indices
[docs]def quantile_binning(values: np.typing.ArrayLike, q: Sequence[float]) -> np.array: """Use :py:meth:`pandas.qcut` to bin the values based on quantiles.""" values = pd.qcut(values, q, labels=np.arange(len(q) - 1)).astype(int) return values
[docs]def check_fraction(train_fraction: float, valid_fraction: float, test_fraction: float) -> None: """Check that the fractions are all between 0 and 1 and that they sum up to 1.""" for name, fraction in [ ("train fraction", train_fraction), ("valid fraction", valid_fraction), ("test fraction", test_fraction), ]: if not (fraction <= 1) & (fraction >= 0): raise ValueError( f"{name} is {fraction}. However, train/valid/test fractions must be between 0 and 1." ) logger.debug( f"Using fractions: train: {train_fraction}, valid: {valid_fraction}, test: {test_fraction}" ) if not (train_fraction + valid_fraction + test_fraction) == 1: raise ValueError("Train, valid, test fractions must sum to 1.")
[docs]def no_group_warn(groups: Optional[np.typing.ArrayLike]) -> None: """Raise warning if groups is None.""" if groups is None: logger.warning( "You are not using a grouped split." " However, for retricular materials, grouping is typically a good idea to avoid data leakage." )
def downsample_splits(splits, sample_frac): downsampled = [] for split in splits: downsampled.append(np.random.choice(split, int(len(split) * sample_frac))) return tuple(downsampled) def sort_arrays_by_len(arrays, sort=True): if sort: arrays = [np.array(array) for array in arrays] arrays.sort(key=len, reverse=True) return tuple(arrays) else: return tuple(arrays)