# -*- coding: utf-8 -*-
"""Classes that help performing cross-validation.
Our splitters attempt to reduce any potential for data leakage by using grouping by default--
and prioritizing grouping over stratficiation or exactly matching the requested train test ratio.
See also the `sklearn docs
<https://scikit-learn.org/stable/modules/cross_validation.html#cross-validation-iterators-for-grouped-data>`_.
.. warning::
Due to the grouping operations, the train/test ratios the methods produce will not exactly
match the one you requested.
For this reason, please get the length of the train/test/valid indices the methods produce.
"""
from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Union
import numpy as np
from loguru import logger
from sklearn.model_selection import GroupKFold, KFold, StratifiedGroupKFold, StratifiedKFold
from .utils import (
check_fraction,
downsample_splits,
grouped_stratified_train_test_partition,
grouped_train_valid_test_partition,
is_categorical,
kennard_stone_sampling,
no_group_warn,
pca_kmeans,
quantile_binning,
sort_arrays_by_len,
stratified_train_test_partition,
)
from ..datasets.dataset import AbstractStructureDataset
__all__ = (
"DensitySplitter",
"HashSplitter",
"TimeSplitter",
"BaseSplitter",
"KennardStoneSplitter",
"ClusterSplitter",
"LOCOCV",
"ClusterStratifiedSplitter",
)
[docs]class BaseSplitter:
"""A :code:`BaseSplitter` implements the basic logic for dataset partition as well as k-fold cross-validation.
Methods that inherit from this class typically implement the
* :code: `_get_stratification_col`: Should return an ArrayLike object of floats, categories, or ints.
If it is categorical data, the :code:`BaseSplitter` will handle the discretization.
* :code: `_get_groups`: Should return an ArrayLike object of categories (integers or strings)
methods.
Internally, the :code:`BaseSplitter` uses those to group and/or stratify the splits.
"""
# Those variables are needed to automatically set the number of groups
# if the users does not set them (default behavior).
_grouping_q = None
_set_grouping = False
def __init__(
self,
ds: AbstractStructureDataset,
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
stratification_col: Optional[Union[str, np.typing.ArrayLike]] = None,
center: callable = np.median,
q: Collection[float] = (0, 0.25, 0.5, 0.75, 1),
sort_by_len: bool = True,
):
"""Initialize a BaseSplitter.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Optional[Union[int, np.random.RandomState]], optional):
Random state for the shuffling. Defaults to None.
sample_frac (Optional[float], optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
stratification_col (Union[str, np.typing.ArrayLike], optional): Data used for stratification.
If it is categorical (see :py:meth:`mofdscribe.splitters.utils.is_categorical`)
then we directly use it for stratification. Otherwise, we use quantile binning.
Defaults to None.
center (callable): Aggregation function to compute a measure of centrality
of all the points in a group such that this can then be used for stratification.
This is only used for continuos inputs. For categorical inputs, we always use
the mode. Defaults to np.median.
q (Collection[float], optional): List of quantiles used for quantile binning.
Defaults to (0, 0.25, 0.5, 0.75, 1).
sort_by_len (bool): If True, sort the splits by length.
(Applies to the train/test/valid and train/test splits). Defaults to True.
"""
self._ds = ds
self._shuffle = shuffle
self._random_state = random_state
self._len = len(ds)
self._sample_frac = sample_frac
self._stratification_col = stratification_col
self._center = center
self._q = q
self._sort_by_len = sort_by_len
logger.debug(
f"Splitter settings | shuffle {self._shuffle}, "
f"random state {self._random_state}, sample frac {self._sample_frac}, q {self._q}"
)
def _get_idxs(self):
"""Return an array of indices. Length equals to the length of the dataset."""
idx = np.arange(self._len)
return idx
[docs] def train_test_split(self, frac_train: float = 0.7) -> Tuple[Collection[int], Collection[int]]:
"""Perform a train/test partition.
Args:
frac_train (float): Fraction of the data to use for the training set.
Defaults to 0.7.
Returns:
Tuple[Collection[int], Collection[int]]: Train indices, test indices
"""
if self._grouping_q is None or self._set_grouping:
self._set_grouping = True
self._grouping_q = np.linspace(0, 1, 3)
check_fraction(train_fraction=frac_train, valid_fraction=0, test_fraction=1 - frac_train)
groups = self._get_groups()
stratification_col = self._get_stratification_col()
idx = self._get_idxs()
no_group_warn(groups)
if groups is not None:
if stratification_col is not None:
logger.debug("Using grouped stratified partition")
train_idx, _, test_index = grouped_stratified_train_test_partition(
stratification_col[idx],
groups[idx],
frac_train,
0,
1 - frac_train,
shuffle=self._shuffle,
random_state=self._random_state,
center=self._center,
q=self._q,
)
else:
logger.debug("Using grouped partition")
train_idx, _, test_index = grouped_train_valid_test_partition(
groups[idx],
frac_train,
0,
1 - frac_train,
shuffle=self._shuffle,
random_state=self._random_state,
)
else:
stratification_col = stratification_col[idx] if stratification_col is not None else None
logger.debug("Using stratified partition")
train_idx, _, test_index = stratified_train_test_partition(
self._get_idxs(),
stratification_col,
train_size=frac_train,
valid_size=0,
test_size=1 - frac_train,
shuffle=self._shuffle,
random_state=self._random_state,
q=self._q,
)
if self._sample_frac < 1:
return sort_arrays_by_len(
downsample_splits([train_idx, test_index], self._sample_frac), self._sort_by_len
)
return sort_arrays_by_len([train_idx, test_index], self._sort_by_len)
[docs] def train_valid_test_split(
self, frac_train: float = 0.7, frac_valid: float = 0.1
) -> Tuple[Collection[int], Collection[int], Collection[int]]:
"""Perform a train/valid/test partition.
Args:
frac_train (float): Fraction of data to use for the training set.
Defaults to 0.7.
frac_valid (float): Fraction of data to use for the validation set.
Defaults to 0.1.
Returns:
Tuple[Collection[int], Collection[int], Collection[int]]: Training, validation, test set.
"""
if self._grouping_q is None or self._set_grouping:
self._set_grouping = True
self._grouping_q = np.linspace(0, 1, 4)
check_fraction(
train_fraction=frac_train,
valid_fraction=frac_valid,
test_fraction=1 - frac_train - frac_valid,
)
groups = self._get_groups()
stratification_col = self._get_stratification_col()
idx = self._get_idxs()
no_group_warn(groups)
if groups is not None:
if stratification_col is not None:
logger.debug("Using grouped stratified partition")
train_idx, valid_idx, test_index = grouped_stratified_train_test_partition(
stratification_col[idx],
groups[idx],
frac_train,
frac_valid,
1 - frac_train - frac_valid,
shuffle=self._shuffle,
random_state=self._random_state,
center=self._center,
q=self._q,
)
else:
logger.debug("Using grouped partition")
train_idx, valid_idx, test_index = grouped_train_valid_test_partition(
groups[idx],
frac_train,
frac_valid,
1 - frac_train - frac_valid,
shuffle=self._shuffle,
random_state=self._random_state,
)
else:
logger.debug("Using stratified partition")
stratification_col = stratification_col[idx] if stratification_col is not None else None
train_idx, valid_idx, test_index = stratified_train_test_partition(
self._get_idxs(),
stratification_col,
train_size=frac_train,
valid_size=frac_valid,
test_size=1 - frac_train - frac_valid,
shuffle=self._shuffle,
random_state=self._random_state,
q=self._q,
)
if self._sample_frac < 1:
return sort_arrays_by_len(
downsample_splits([train_idx, valid_idx, test_index], self._sample_frac),
self._sort_by_len,
)
return sort_arrays_by_len([train_idx, valid_idx, test_index], self._sort_by_len)
[docs] def k_fold(self, k: int = 5) -> Tuple[Collection[int], Collection[int]]:
"""Peform k-fold crossvalidation.
Args:
k (int): Number of folds. Defaults to 5.
Yields:
Tuple[Collection[int], Collection[int]]: Train indices, test indices.
"""
if self._grouping_q is None or self._set_grouping:
self._set_grouping = True
self._grouping_q = np.linspace(0, 1, k + 1)
groups = self._get_groups()
stratification_col = self._get_stratification_col()
no_group_warn(groups)
idx = self._get_idxs()
groups = groups[idx] if groups is not None else None
stratification_col = stratification_col[idx] if stratification_col is not None else None
if stratification_col is not None:
if not is_categorical(stratification_col):
stratification_col = quantile_binning(stratification_col, self._q)
if groups is not None:
kfold = StratifiedGroupKFold(
n_splits=k, shuffle=self._shuffle, random_state=self._random_state
)
else:
kfold = StratifiedKFold(
n_splits=k, shuffle=self._shuffle, random_state=self._random_state
)
else:
# this is not shuffled?
if groups is not None:
kfold = GroupKFold(n_splits=k)
else:
kfold = KFold(n_splits=k)
if groups is not None:
for train_index, test_index in kfold.split(idx, y=stratification_col, groups=groups):
if self._shuffle:
np.random.shuffle(train_index)
np.random.shuffle(test_index)
if self._sample_frac < 1:
yield downsample_splits([train_index, test_index], self._sample_frac)
else:
yield train_index, test_index
else:
for train_index, test_index in kfold.split(idx, y=stratification_col):
if self._shuffle:
np.random.shuffle(train_index)
np.random.shuffle(test_index)
if self._sample_frac < 1:
yield downsample_splits([train_index, test_index], self._sample_frac)
else:
yield train_index, test_index
def _get_groups(self) -> Collection[Union[int, str]]:
return None
def _get_stratification_col(self) -> Collection[Union[int, float]]:
if isinstance(self._stratification_col, str):
return self._ds._df[self._stratification_col].values
else:
return self._stratification_col
[docs]class HashSplitter(BaseSplitter):
"""Splitter that uses Weisfeiller-Lehman graph hashes [WL]_ to split the data in more stringent ways.
Note that the hashes we use do not allow for a meaningful measure of
similarity. That is, there is no way to measure the distance between two strings.
The only meaningful measure is if they are identical or not.
.. note::
Weisfeiller-Lehman graph hashes do not give a guarantee for graph-isomorphism.
That is, there might be identical hashes that do not correspond to isomorphic graphs.
.. note::
There are certain graphs that a Weisfeiller-Lehman test cannot distinguish [Bouritsas]_.
.. note::
We speak about Weisfeiller-Lehman hashes as they are the defaults for the mofdscribe datasets.
However, you can also overwrite this method with a custom hashing function.
"""
def __init__(
self,
ds: AbstractStructureDataset,
hash_type: str = "undecorated_scaffold_hash",
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
stratification_col: Optional[Union[str, np.typing.ArrayLike]] = None,
center=np.median,
q: Collection[float] = (0, 0.25, 0.5, 0.75, 1),
sort_by_len: bool = True,
) -> None:
"""Initialize a HashSplitter.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
hash_type (str): Hash type to use. Must be one of the
following:
* undecorated_scaffold_hash
* decorated_graph_hash
* decorated_scaffold_hash
* undecorated_graph_hash
Defaults to "undecorated_scaffold_hash".
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Union[int, np.random.RandomState], optional):
Random state for the shuffling. Defaults to None.
sample_frac (float, optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
stratification_col (Union[str, np.typing.ArrayLike], optional): Data used for stratification.
If it is categorical (see :py:meth:`mofdscribe.splitters.utils.is_categorical`)
then we directly use it for stratification. Otherwise, we use quantile binning.
Defaults to None.
center (callable, optional): Aggregation function to compute a measure of centrality
of all the points in a group such that this can then be used for stratification.
This is only used for continuos inputs. For categorical inputs, we always use
the mode. Defaults to np.median.
q (Collection[float], optional): List of quantiles used for quantile binning.
Defaults to (0, 0.25, 0.5, 0.75, 1).
sort_by_len (bool): If True, sort the splits by length.
(Applies to the train/test/valid and train/test splits). Defaults to True.
"""
self.hash_type = hash_type
super().__init__(
ds,
shuffle=shuffle,
random_state=random_state,
sample_frac=sample_frac,
stratification_col=stratification_col,
center=center,
q=q,
sort_by_len=sort_by_len,
)
def _get_hashes(self) -> Collection[str]:
"""Retrieve the list of hashes from the dataset
Raises:
ValueError: If the hash type is not one of the following:
* undecorated_scaffold_hash
* decorated_graph_hash
* decorated_scaffold_hash
* undecorated_graph_hash
Returns:
Collection[str]: list of hashes
"""
number_of_points = len(self._ds)
if self.hash_type == "undecorated_scaffold_hash":
hashes = self._ds.get_undecorated_scaffold_hashes(range(number_of_points))
elif self.hash_type == "decorated_graph_hash":
hashes = self._ds.get_decorated_graph_hashes(range(number_of_points))
elif self.hash_type == "decorated_scaffold_hash":
hashes = self._ds.get_decorated_scaffold_hashes(range(number_of_points))
elif self.hash_type == "undecorated_graph_hash":
hashes = self._ds.get_undecorated_graph_hashes(range(number_of_points))
else:
raise ValueError(f"Unknown hash type: {self.hash_type}")
return hashes
def _get_groups(self) -> Collection[int]:
return self._get_hashes()
[docs]class DensitySplitter(BaseSplitter):
"""Splitter that uses the density of the structures to split the data.
For this, we sort structures according to their density and then group the based on the density.
You can modify the number of groups using the :attr:`density_q` parameter, those values indicate
the quantiles which we use for the grouping.
This ensures that the validation is quite stringent as the different folds will have different densities.
The motivations for doing this are:
* density is often one of the most important descriptors for gas uptake properties.
* there is often is a very large difference in density distribution
between hypothetical and experimental databases.
"""
def __init__(
self,
ds: AbstractStructureDataset,
density_q: Optional[Collection[float]] = None,
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
stratification_col: Optional[Union[str, np.typing.ArrayLike]] = None,
center: callable = np.median,
q: Collection[float] = (0, 0.25, 0.5, 0.75, 1),
sort_by_len: bool = True,
) -> None:
"""Initialize the DensitySplitter class.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
density_q (Collection[float], optional): List of quantiles used for quantile binning for the density.
Defaults to None. If None, then we use two bins for test/train split, three for
validation/train/test split and k for k-fold.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Union[int, np.random.RandomState], optional):
Random state for the shuffling. Defaults to None.
sample_frac (float, optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
stratification_col (Union[str, np.typing.ArrayLike], optional): Data used for stratification.
If it is categorical (see :py:meth:`mofdscribe.splitters.utils.is_categorical`)
then we directly use it for stratification. Otherwise, we use quantile binning.
Defaults to None.
center (callable): Aggregation function to compute a measure of centrality
of all the points in a group such that this can then be used for stratification.
This is only used for continuos inputs. For categorical inputs, we always use
the mode. Defaults to np.median.
q (Collection[float], optional): List of quantiles used for quantile binning.
Defaults to (0, 0.25, 0.5, 0.75, 1]. Defaults to [0, 0.25, 0.5, 0.75, 1).
sort_by_len (bool): If True, sort the splits by length.
(Applies to the train/test/valid and train/test splits). Defaults to True.
"""
self._grouping_q = density_q
super().__init__(
ds=ds,
shuffle=shuffle,
random_state=random_state,
sample_frac=sample_frac,
stratification_col=stratification_col,
center=center,
q=q,
sort_by_len=sort_by_len,
)
def _get_groups(self) -> Collection[int]:
return quantile_binning(self._ds.get_densities(range(len(self._ds))), self._grouping_q)
[docs]class TimeSplitter(BaseSplitter):
"""This splitter sorts structures according to their publication date.
That is, the training set will contain structures that are "older" (have
been discovered earlier) than the ones in the test set.
This can mimick real-life model development conditions [MoleculeNet]_.
It has for instance also be used with ICSD data in [Palizhati]_
and been the focus of [Sheridan]_.
.. seealso:
* The `mp-time-split <https://github.com/sparks-baird/mp-time-split>`_ package
provides similar functionality for data from the materials project.
"""
def __init__(
self,
ds: AbstractStructureDataset,
year_q: Optional[Collection[float]] = None,
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
stratification_col: Optional[Union[str, np.typing.ArrayLike]] = None,
center: callable = np.median,
q: Collection[float] = (0, 0.25, 0.5, 0.75, 1),
sort_by_len: bool = True,
) -> None:
"""Initialize the TimeSplitter class.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
year_q (Collection[float]): List of quantiles used for quantile binning on the years.
Defaults to None. If None, then we use two bins for test/train split, three for
validation/train/test split and k for k-fold.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Union[int, np.random.RandomState], optional):
Random state for the shuffling. Defaults to None.
sample_frac (float, optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
stratification_col (Union[str, np.typing.ArrayLike], optional): Data used for stratification.
If it is categorical (see :py:meth:`mofdscribe.splitters.utils.is_categorical`)
then we directly use it for stratification. Otherwise, we use quantile binning.
Defaults to None.
center (callable): Aggregation function to compute a measure of centrality
of all the points in a group such that this can then be used for stratification.
This is only used for continuos inputs. For categorical inputs, we always use
the mode. Defaults to np.median.
q (Collection[float], optional): List of quantiles used for quantile binning.
Defaults to (0, 0.25, 0.5, 0.75, 1).
sort_by_len (bool): If True, sort the splits by length.
(Applies to the train/test/valid and train/test splits). Defaults to True.
"""
self._grouping_q = year_q
super().__init__(
ds=ds,
shuffle=shuffle,
random_state=random_state,
sample_frac=sample_frac,
stratification_col=stratification_col,
center=center,
q=q,
sort_by_len=sort_by_len,
)
def _get_groups(self) -> Collection[int]:
return quantile_binning(self._ds.get_years(range(len(self._ds))), self._grouping_q)
[docs]class KennardStoneSplitter(BaseSplitter):
"""Run the Kennard-Stone sampling algorithm [KennardStone]_.
The algorithm selects samples with uniform converage.
The initial samples are biased towards the boundaries of the dataset.
Hence, it might be biased by outliers.
This algorithm ensures a flat coverage of the dataset.
It is also known as CADEX algorithm and has been later refined
in the DUPLEX algorithm [Snee]_.
.. warning::
This splitter can be slow for large datasets as
it requires us to perform distance matrices N times for a dataset
with N structures.
.. warning::
Stratification is not supported for this splitter.
.. warning::
I couldn't find a good reference for the k-fold version of
this algorihm.
"""
def __init__(
self,
ds: AbstractStructureDataset,
feature_names: List[str],
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
scale: bool = True,
centrality_measure: str = "mean",
metric: Union[Callable, str] = "euclidean",
ascending: bool = False,
) -> None:
"""Construct a KennardStoneSplitter.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
feature_names (List[str]): Names of features to consider.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Union[int, np.random.RandomState], optional):
Random state for the shuffling. Defaults to None.
sample_frac (float, optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
scale (bool): If True, apply z-score normalization
prior to running the sampling. Defaults to True.
centrality_measure (str): The first sample is selected to be
maximally distanct from this value. It can be one of "mean", "median",
"random". In case of "random" we simply select a random point.
In the case of "mean" and "median" the initial point is maximally distanct
from the mean and median of the feature matrix, respectively.
Defaults to "mean".
metric (Union[Callable, str]): The distance metric to use.
If a string, the distance function can be ‘braycurtis’, ‘canberra’, ‘chebyshev’,
‘cityblock’, ‘correlation’, ‘cosine’, ‘dice’, ‘euclidean’, ‘hamming’, ‘jaccard’,
‘jensenshannon’, ‘kulsinski’, ‘kulczynski1’, ‘mahalanobis’, ‘matching’, ‘minkowski’,
‘rogerstanimoto’, ‘russellrao’, ‘seuclidean’, ‘sokalmichener’, ‘sokalsneath’, ‘sqeuclidean’,
‘yule’. Defaults to "euclidean".
ascending (bool): If True, sort samples in asceding distance to the center.
That is, the first samples (maximally distant to center) would be sampled last.
Defaults to False.
"""
self.feature_names = feature_names
self.scale = scale
self.centrality_measure = centrality_measure
self.metric = metric
self.ascending = ascending
self._sorted_indices = None
super().__init__(
ds=ds,
shuffle=shuffle,
random_state=random_state,
sample_frac=sample_frac,
center=None,
stratification_col=None,
q=None,
)
[docs] def get_sorted_indices(self, ds: AbstractStructureDataset) -> Collection[int]:
"""Return a list of indices, sorted by similarity using the Kennard-Stone algorithm.
The first sample will be maximally distant from the center.
Args:
ds (AbstractStructureDataset): A mofdscribe AbstractStructureDataset
Returns:
Collection[int]: Sorted indices.
"""
if self._sorted_indices is None:
feats = ds._df[self.feature_names].values
indices = kennard_stone_sampling(
feats,
scale=self.scale,
centrality_measure=self.centrality_measure,
metric=self.metric,
)
if self.ascending:
indices = indices[::-1]
self._sorted_indices = indices
return self._sorted_indices
[docs] def train_test_split(self, frac_train: float = 0.7) -> Tuple[Collection[int], Collection[int]]:
num_train_points = int(frac_train * len(self._ds))
if self._shuffle:
return (
np.random.permutation(self.get_sorted_indices(self._ds)[:num_train_points]),
np.random.permutation(self.get_sorted_indices(self._ds)[num_train_points:]),
)
return (
self.get_sorted_indices(self._ds)[:num_train_points],
self.get_sorted_indices(self._ds)[num_train_points:],
)
[docs] def train_valid_test_split(
self, frac_train: float = 0.7, frac_valid: float = 0.1
) -> Tuple[Collection[int], Collection[int], Collection[int]]:
num_train_points = int(frac_train * len(self._ds))
num_valid_points = int(frac_valid * len(self._ds))
if self._shuffle:
return (
np.random.permutation(self.get_sorted_indices(self._ds)[:num_train_points]),
np.random.permutation(
self.get_sorted_indices(self._ds)[
num_train_points : num_train_points + num_valid_points
]
),
np.random.permutation(
self.get_sorted_indices(self._ds)[num_train_points + num_valid_points :]
),
)
return (
self.get_sorted_indices(self._ds)[:num_train_points],
self.get_sorted_indices(self._ds)[
num_train_points : num_train_points + num_valid_points
],
self.get_sorted_indices(self._ds)[num_train_points + num_valid_points :],
)
[docs] def k_fold(self, k=5) -> Tuple[Collection[int], Collection[int]]:
kf = KFold(n_splits=k, shuffle=False, random_state=self._random_state)
for train_index, test_index in kf.split(self.get_sorted_indices(self._ds)):
if self._shuffle:
train_index = np.random.permutation(train_index)
test_index = np.random.permutation(test_index)
yield train_index, test_index
[docs]class ClusterSplitter(BaseSplitter):
"""Split the data into clusters and use the clusters as groups.
The approach has been proposed on
`Kaggle <https://www.kaggle.com/code/lucamassaron/are-you-doing-cross-validation-the-best-way/notebook>`_.
In principle, we perform the following steps:
1. Scale the data (optional).
2. Perform PCA for de-correlation.
3. Perform k-means clustering.
"""
def __init__(
self,
ds: AbstractStructureDataset,
feature_names: List[str],
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
stratification_col: Optional[Union[str, np.typing.ArrayLike]] = None,
center: callable = np.median,
q: Collection[float] = (0, 0.25, 0.5, 0.75, 1),
sort_by_len: bool = False,
scaled: bool = True,
n_pca_components: Optional[Union[int, str]] = "mle",
n_clusters: int = 4,
pca_kwargs: Optional[Dict[str, Any]] = None,
kmeans_kwargs: Optional[Dict[str, Any]] = None,
):
"""Construct a ClusterSplitter.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
feature_names (List[str]): Names of features to consider.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Union[int, np.random.RandomState], optional):
Random state for the shuffling. Defaults to None.
sample_frac (float, optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
stratification_col (Union[str, np.typing.ArrayLike], optional): Data used for stratification.
If it is categorical (see :py:meth:`mofdscribe.splitters.utils.is_categorical`)
then we directly use it for stratification. Otherwise, we use quantile binning.
Defaults to None.
center (callable): Aggregation function to compute a measure of centrality
of all the points in a group such that this can then be used for stratification.
This is only used for continuos inputs. For categorical inputs, we always use
the mode. Defaults to np.median.
q (Collection[float], optional): List of quantiles used for quantile binning.
Defaults to (0, 0.25, 0.5, 0.75, 1]. Defaults to [0, 0.25, 0.5, 0.75, 1).
sort_by_len (bool): If True, sort the splits by length.
(Applies to the train/test/valid and train/test splits). Defaults to True.
scaled (bool): If True, scale the data before clustering.
Defaults to True.
n_pca_components (Union[int, str]): Number of components to use for PCA.
If "mle", use the number of components that maximizes the variance.
Defaults to "mle".
n_clusters (int): Number of clusters to use.
Defaults to 4.
random_state (int): Random seed.
Defaults to 42.
pca_kwargs (Dict[str, Any]): Keyword arguments to pass to PCA.
Defaults to None.
kmeans_kwargs (Dict[str, Any]): Keyword arguments to pass to k-means.
Defaults to None.
"""
self.feature_names = feature_names
self.scaled = scaled
self.n_pca_components = n_pca_components
self.n_clusters = n_clusters
self._random_state = random_state
self._sorted_indices = None
self.ascending = False
self._pca_kwargs = pca_kwargs
self._kmeans_kwargs = kmeans_kwargs
super().__init__(
ds=ds,
shuffle=shuffle,
random_state=random_state,
sample_frac=sample_frac,
stratification_col=stratification_col,
center=center,
q=q,
sort_by_len=sort_by_len,
)
def _get_sorted_indices(
self, ds: AbstractStructureDataset, shuffle: bool = True
) -> Collection[int]:
if self._sorted_indices is None:
feats = ds._df[self.feature_names].values
clusters = pca_kmeans(
feats,
n_clusters=self.n_clusters,
n_pca_components=self.n_pca_components,
random_state=self._random_state,
scaled=self.scaled,
pca_kwargs=self._pca_kwargs,
kmeans_kwargs=self._kmeans_kwargs,
)
random_numbers = np.arange(len(clusters))
if shuffle:
np.random.shuffle(random_numbers)
t = [(v, i, random_numbers[i]) for i, v in enumerate(clusters)]
t.sort(reverse=not self.ascending, key=lambda x: (x[0], x[2]))
indices = [i for _, i, _ in t]
self._sorted_indices = indices
return self._sorted_indices
def _get_groups(self) -> Collection[Union[int, str]]:
si = self._get_sorted_indices(self._ds, self._shuffle)
return np.array(si)
[docs]class ClusterStratifiedSplitter(BaseSplitter):
"""Split the data into clusters and stratify on those clusters
The approach has been proposed on
`Kaggle <https://www.kaggle.com/code/lucamassaron/are-you-doing-cross-validation-the-best-way/notebook>`_.
In principle, we perform the following steps:
1. Scale the data (optional).
2. Perform PCA for de-correlation.
3. Perform k-means clustering.
"""
def __init__(
self,
ds: AbstractStructureDataset,
feature_names: List[str],
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
scaled: bool = True,
n_pca_components: Optional[int] = "mle",
n_clusters: int = 4,
pca_kwargs: Optional[Dict[str, Any]] = None,
kmeans_kwargs: Optional[Dict[str, Any]] = None,
):
"""Construct a ClusterStratifiedSplitter.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
feature_names (List[str]): Names of features to consider.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Union[int, np.random.RandomState], optional):
Random state for the shuffling. Defaults to None.
sample_frac (float, optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
scaled (bool): If True, scale the data before clustering.
Defaults to True.
n_pca_components (int): Number of components to use for PCA.
If "mle", use the number of components that maximizes the variance.
Defaults to "mle".
n_clusters (int): Number of clusters to use.
Defaults to 4.
random_state (int): Random seed.
Defaults to 42.
pca_kwargs (Dict[str, Any]): Keyword arguments to pass to PCA.
Defaults to None.
kmeans_kwargs (Dict[str, Any]): Keyword arguments to pass to k-means.
Defaults to None.
"""
self.feature_names = feature_names
self.scaled = scaled
self.n_pca_components = n_pca_components
self.n_clusters = n_clusters
self._random_state = random_state
self._stratification_groups = None
self.ascending = False
self._pca_kwargs = pca_kwargs
self._kmeans_kwargs = kmeans_kwargs
super().__init__(
ds=ds,
shuffle=shuffle,
random_state=random_state,
sample_frac=sample_frac,
stratification_col=None,
center=None,
q=None,
)
def _get_stratification_col(self) -> Collection[int]:
if self._stratification_groups is None:
feats = self._ds._df[self.feature_names].values
clusters = pca_kmeans(
feats,
n_clusters=self.n_clusters,
n_pca_components=self.n_pca_components,
random_state=self._random_state,
scaled=self.scaled,
pca_kwargs=self._pca_kwargs,
kmeans_kwargs=self._kmeans_kwargs,
)
self._stratification_groups = clusters
return self._stratification_groups
[docs]class LOCOCV(BaseSplitter):
"""
Leave-one-cluster-out cross-validation.
The general idea has been discussed before, e.g. in [Kramer]_.
Perhaps more widely used in the materials community is [Meredig]_.
Here, we perform PCA, followed by k-means clustering.
* Where k = 2 for a train/test split
* Where k = 3 for a train/valid/test split
* Where k = k for k-fold crossvalidation
By default, we will sort outputs such that the cluster sizes are
train >= test >= valid.
"""
def __init__(
self,
ds: AbstractStructureDataset,
feature_names: List[str],
shuffle: bool = True,
random_state: Optional[Union[int, np.random.RandomState]] = None,
sample_frac: Optional[float] = 1.0,
scaled: bool = True,
n_pca_components: Optional[int] = "mle",
pca_kwargs: Optional[Dict[str, Any]] = None,
kmeans_kwargs: Optional[Dict[str, Any]] = None,
):
"""Construct a LOCOCV.
Args:
ds (AbstractStructureDataset): A structure dataset.
The :code:`BaseSplitter` only requires the length magic method to be implemented.
However, other splitters might require additional methods.
feature_names (List[str]): Names of features to consider.
shuffle (bool): If True, perform a shuffled split.
Defaults to True.
random_state (Union[int, np.random.RandomState], optional):
Random state for the shuffling. Defaults to None.
sample_frac (float, optional):
This can be used for downsampling. It will randomly select a subset of
indices from all indices *before* splittings. For instance :code:`sample_frac=0.8`
will randomly select 80% of the indices before splitting.
Defaults to 1.0.
scaled (bool): If True, scale the data before clustering.
Defaults to True.
n_pca_components (int): Number of components to use for PCA.
If "mle", use the number of components that maximizes the variance.
Defaults to "mle".
random_state (int): Random seed.
Defaults to 42.
pca_kwargs (Dict[str, Any], optional): Additional keyword arguments for
sklearn's :py:class:`sklearn.decomposition.PCA`. Defaults to None.
kmeans_kwargs (Dict[str, Any], optional): Additional keyword arguments for
sklearn's :py:class:`sklearn.clustering.KMeans`. Defaults to None.
"""
self.scaled = scaled
self.n_pca_components = n_pca_components
self._random_state = random_state
self._pca_kwargs = pca_kwargs
self._kmeans_kwargs = kmeans_kwargs
self._stratification_groups = None
self.ascending = False
self.feature_names = feature_names
super().__init__(
ds=ds,
shuffle=shuffle,
random_state=random_state,
sample_frac=sample_frac,
center=None,
stratification_col=None,
q=None,
)
[docs] def train_test_split(
self,
) -> Tuple[Collection[int], Collection[int]]:
"""Perform a train/test partition.
Returns:
Tuple[Collection[int], Collection[int]]: Train indices, test indices
"""
groups = pca_kmeans(
self._ds._df[self.feature_names].values,
scaled=self.scaled,
n_pca_components=self.n_pca_components,
n_clusters=2,
random_state=self._random_state,
pca_kwargs=self._pca_kwargs,
kmeans_kwargs=self._kmeans_kwargs,
)
first_group = np.where(groups == 0)[0]
second_group = np.where(groups == 1)[0]
if self._shuffle:
np.random.shuffle(first_group)
np.random.shuffle(second_group)
# potential downsampling after shuffle
first_group = first_group[: int(self._sample_frac * len(first_group))]
second_group = second_group[: int(self._sample_frac * len(second_group))]
if len(first_group) > len(second_group):
return first_group, second_group
return second_group, first_group
[docs] def train_valid_test_split(
self,
) -> Tuple[Collection[int], Collection[int], Collection[int]]:
"""Perform a train/valid/test partition.
Returns:
Tuple[Collection[int], Collection[int], Collection[int]]: Training, validation, test set.
"""
groups = pca_kmeans(
self._ds._df[self.feature_names].values,
scaled=self.scaled,
n_pca_components=self.n_pca_components,
n_clusters=3,
random_state=self._random_state,
pca_kwargs=self._pca_kwargs,
kmeans_kwargs=self._kmeans_kwargs,
)
first_group = np.where(groups == 0)[0]
second_group = np.where(groups == 1)[0]
third_group = np.where(groups == 2)[0]
if self._shuffle:
np.random.shuffle(first_group)
np.random.shuffle(second_group)
np.random.shuffle(third_group)
# potential downsampling after shuffle
first_group = first_group[: int(self._sample_frac * len(first_group))]
second_group = second_group[: int(self._sample_frac * len(second_group))]
third_group = third_group[: int(self._sample_frac * len(third_group))]
groups_sorted_by_len = sorted(
[first_group, second_group, third_group], key=len, reverse=True
)
return groups_sorted_by_len[0], groups_sorted_by_len[2], groups_sorted_by_len[1]
[docs] def k_fold(self, k: int) -> Tuple[Collection[int], Collection[int]]:
"""Peform k-fold crossvalidation.
Args:
k (int): Number of folds. Defaults to 5.
Yields:
Iterator[Tuple[Collection[int], Collection[int]]]: Train indices, test indices.
"""
groups = pca_kmeans(
self._ds._df[self.feature_names].values,
scaled=self.scaled,
n_pca_components=self.n_pca_components,
n_clusters=k,
random_state=self._random_state,
pca_kwargs=self._pca_kwargs,
kmeans_kwargs=self._kmeans_kwargs,
)
for group in range(k):
train = np.where(groups != group)[0]
test = np.where(groups == group)[0]
if self._shuffle:
np.random.shuffle(train)
np.random.shuffle(test)
# potential downsampling after shuffle
train = train[: int(self._sample_frac * len(train))]
test = test[: int(self._sample_frac * len(test))]
if len(train) > len(test):
yield train, test
else:
yield test, train