from abc import ABCMeta, abstractmethod
from typing import Optional, Set, Tuple, Union
import numpy as np
import pandas as pd
import sklearn.utils
from numpy.typing import ArrayLike
from dodiscover.typing import Column
from .monte_carlo import generate_knn_in_subspace, restricted_nbr_permutation
[docs]class BaseConditionalIndependenceTest(metaclass=ABCMeta):
"""Abstract class for any conditional independence test.
All CI tests are used in constraint-based causal discovery algorithms. This
class interface is expected to be very lightweight to enable anyone to convert
a function for CI testing into a class, which has a specific API.
"""
# default CI tests do not allow multivariate input
_allow_multivariate_input: bool = False
def _check_test_input(
self,
df: pd.DataFrame,
x_vars: Set[Column],
y_vars: Set[Column],
z_covariates: Optional[Set[Column]],
):
if any(col not in df.columns for col in x_vars):
raise ValueError(f"The x variables {x_vars} are not all in the DataFrame.")
if any(col not in df.columns for col in y_vars):
raise ValueError(
f"The y variables {y_vars} are not all in the DataFrame: {df.columns}."
)
if z_covariates is not None and any(col not in df.columns for col in z_covariates):
raise ValueError(
f"The z conditioning set variables {z_covariates} are not all in the "
f"DataFrame with {df.columns}."
)
if not self._allow_multivariate_input and (len(x_vars) > 1 or len(y_vars) > 1):
raise RuntimeError(f"{self.__class__} does not support multivariate input for X and Y.")
[docs] @abstractmethod
def test(
self,
df: pd.DataFrame,
x_vars: Set[Column],
y_vars: Set[Column],
z_covariates: Optional[Set[Column]] = None,
) -> Tuple[float, float]:
"""Abstract method for all conditional independence tests.
Parameters
----------
df : pd.DataFrame
The dataframe containing the dataset.
x_vars : Set of column
A column in ``df``.
y_vars : Set of column
A column in ``df``.
z_covariates : Set, optional
A set of columns in ``df``, by default None. If None, then
the test should run a standard independence test.
Returns
-------
Tuple[float, float]
Test statistic and pvalue.
"""
pass
class ClassifierCIMixin:
random_state: np.random.Generator
test_size: Union[float, int]
def generate_train_test_data(
self,
df: pd.DataFrame,
x_vars: Set[Column],
y_vars: Set[Column],
z_covariates: Optional[Set[Column]] = None,
k: int = 1,
) -> Tuple:
"""Generate a training and testing dataset for CCIT.
This takes a conditional independence problem given a dataset
and converts it to a binary classification problem.
Parameters
----------
df : pd.DataFrame
The dataframe containing the dataset.
x_vars : Set of column
A column in ``df``.
y_vars : Set of column
A column in ``df``.
z_covariates : Set, optional
A set of columns in ``df``, by default None. If None, then
the test should run a standard independence test.
k : int
The K nearest-neighbors in subspaces for the conditional permutation
step to generate distribution with conditional independence. By
default, 1.
Returns
-------
X_train, Y_train, X_test, Y_test : Tuple[ArrayLike, ArrayLike, ArrayLike, ArrayLike]
The X_train, y_train, X_test, y_test to be used in
binary classification, where each dataset comprises
of samples from the joint and conditionally independent
distributions. ``y_train`` and ``y_test`` are comprised of
1's and 0's only. Indices with value 1 indicate the original
joint distribution, and indices with value 0 indicate
the shuffled distribution.
"""
if z_covariates is None:
z_covariates = set()
x_arr = df[list(x_vars)].to_numpy()
y_arr = df[list(y_vars)].to_numpy()
n_samples_x, _ = x_arr.shape
test_size = self.test_size
if test_size <= 1.0:
test_size = int(test_size * n_samples_x)
train_size = int(n_samples_x - test_size)
# now slice the arrays to produce a training and testing dataset
x_arr_train = x_arr[:train_size, :]
y_arr_train = y_arr[:train_size, :]
x_arr_test = x_arr[train_size:, :]
y_arr_test = y_arr[train_size:, :]
if len(z_covariates) == 0:
n_samples_y, _ = y_arr.shape
if n_samples_y != n_samples_x:
raise ValueError(
f"There is unequal number of samples in x and y array: "
f"{n_samples_x} and {n_samples_y}."
)
# generate the training dataset
X_ind, y_ind, X_joint, y_joint = self._unconditional_shuffle(x_arr_train, y_arr_train)
X_train = np.vstack((X_ind, X_joint))
Y_train = np.vstack((y_ind, y_joint))
# generate the testing dataset
X_ind, y_ind, X_joint, y_joint = self._unconditional_shuffle(x_arr_test, y_arr_test)
X_test = np.vstack((X_ind, X_joint))
Y_test = np.vstack((y_ind, y_joint))
else:
z_arr = df[list(z_covariates)].to_numpy()
z_arr_train = z_arr[:train_size, :]
z_arr_test = z_arr[train_size:, :]
n_samples_x, _ = x_arr.shape
n_samples_y, _ = y_arr.shape
n_samples_z, _ = z_arr.shape
if n_samples_y != n_samples_x or n_samples_z != n_samples_x:
raise ValueError(
f"There is unequal number of samples in x, y and z array: "
f"{n_samples_x}, {n_samples_y}, {n_samples_z}."
)
# generate the training dataset
X_ind, y_ind, X_joint, y_joint = self._conditional_shuffle(
x_arr_train, y_arr_train, z_arr_train, k=k
)
X_train = np.vstack((X_ind, X_joint))
Y_train = np.vstack((y_ind, y_joint))
# generate the testing dataset
X_ind, y_ind, X_joint, y_joint = self._conditional_shuffle(
x_arr_test, y_arr_test, z_arr_test, k=k
)
X_test = np.vstack((X_ind, X_joint))
Y_test = np.vstack((y_ind, y_joint))
return X_train, Y_train, X_test, Y_test
def _unconditional_shuffle(
self, x_arr: ArrayLike, y_arr: ArrayLike
) -> Tuple[ArrayLike, ArrayLike, ArrayLike, ArrayLike]:
r"""Generate samples to emulate X independent of Y.
Based on input data ``(X, Y)``, partitions the dataset into halves, where
one-half represents the joint distribution of ``P(X, Y)`` and the other
represents the independent distribution of ``P(X)P(Y)``.
Parameters
----------
x_arr : ArrayLike of shape (n_samples, n_dims_x)
The input X variable data.
y_arr : ArrayLike of shape (n_samples, n_dims_y)
The input Y variable data.
Returns
-------
X_ind, y_ind, X_joint, y_joint : Tuple[ArrayLike, ArrayLike, ArrayLike, ArrayLike]
The X_ind, y_ind, X_joint, y_joint to be used in
binary classification, where ``X_ind`` is the data features
that are from the generated independent data and ``X_joint``
is the data features from the original jointly distributed data.
``y_ind`` and ``y_joint`` correspond to the class labels 0 and 1
respectively.
Notes
-----
Shuffles the Y samples, such that if there is any dependence among X and Y,
they are broken and the resulting samples emulate those which came from
the distribution with :math:`X \\perp Y`. This essentially takes input
data and generates a null distribution.
"""
n_samples_x, _ = x_arr.shape
# number of samples we will generate from the independent distribution
n_ind_samples = n_samples_x // 2
n_joint_samples = n_samples_x - n_ind_samples
# generate half of the indices at random to generate the independent dataset
rand_indices = self.random_state.choice(n_samples_x, size=n_ind_samples, replace=False)
mask = np.ones((n_samples_x,), dtype=bool)
mask[rand_indices] = False
# subset the data into halves that maintain the joint distribution
# and half that we will permute to create independence
x_arr_ind = x_arr[rand_indices, :]
x_arr_joint = x_arr[mask, :]
y_arr_ind = y_arr[rand_indices, :]
y_arr_joint = y_arr[mask, :]
# now create independence in half of the data, while keeping
# the joint distribution in the other half of the data
indices = np.arange(n_ind_samples).astype(int)
self.random_state.shuffle(indices)
y_arr_ind = y_arr_ind[indices, :]
# joint distributed data and independent data with their
# corresponding class labels
X_ind = np.hstack([x_arr_ind, y_arr_ind])
y_ind = np.zeros((n_ind_samples, 1))
X_joint = np.hstack([x_arr_joint, y_arr_joint])
y_joint = np.ones((n_joint_samples, 1))
return X_ind, y_ind, X_joint, y_joint
def _conditional_shuffle(
self,
x_arr: ArrayLike,
y_arr: ArrayLike,
z_arr: ArrayLike,
method="knn",
k: int = 1,
) -> Tuple[ArrayLike, ArrayLike, ArrayLike, ArrayLike]:
"""Generate dataset for CI testing as binary classification problem.
Based on input data ``(X, Y, Z)``, partitions the dataset into halves, where
one-half represents the joint distribution of ``P(X, Y, Z)`` and the other
represents the conditionally independent distribution of ``P(X | Z)P(Y)``.
This is done by a nearest-neighbor bootstrap approach.
Parameters
----------
x_arr : ArrayLike of shape (n_samples, n_dims_x)
The input X variable data.
y_arr : ArrayLike of shape (n_samples, n_dims_y)
The input Y variable data.
z_arr : ArrayLike of shape (n_samples, n_dims_x)
The input Z variable data.
method : str, optional
Method to use, by default 'knn'. Can be ('knn', 'kdtree').
k : int, optional
Number of nearest neighbors to swap, by default 1.
Returns
-------
X_ind, y_ind, X_joint, y_joint : Tuple[ArrayLike, ArrayLike, ArrayLike, ArrayLike]
The X_ind, y_ind, X_joint, y_joint to be used in
binary classification, where ``X_ind`` is the data features
that are from the generated independent data and ``X_joint``
is the data features from the original jointly distributed data.
``y_ind`` and ``y_joint`` correspond to the class labels 0 and 1
respectively.
Notes
-----
This algorithm implements a nearest-neighbor bootstrap approach for generating
samples from the null hypothesis where :math:`X \\perp Y | Z`.
"""
# use a method to generate k-nearest-neighbors in subspace of Z
indices = generate_knn_in_subspace(z_arr, method=method, k=k)
# Get the index of the sample kth closest to the 'idx' sample of Z
# Then we will swap those samples in the Y variable
y_prime = y_arr.copy()[indices[:, k], :]
# construct the joint and conditionally independent distributions
# and their corresponding class labels
X_joint = np.hstack([x_arr, y_arr, z_arr])
X_ind = np.hstack([x_arr, y_prime, z_arr])
y_joint = np.ones((len(X_joint), 1))
y_ind = np.zeros((len(X_ind), 1))
return X_ind, y_ind, X_joint, y_joint
class CMIMixin:
random_state: np.random.Generator
random_seed: Optional[int]
n_jobs: Optional[int]
def _estimate_null_dist(
self,
data: pd.DataFrame,
x_vars: Set[Column],
y_vars: Set[Column],
z_covariates: Set,
n_shuffle_nbrs: int,
n_shuffle: int,
) -> float:
"""Compute pvalue by performing a nearest-neighbor shuffle test.
XXX: improve with parallelization with joblib
Parameters
----------
data : pd.DataFrame
The dataset.
x_vars : Set[Column]
The X variable column(s).
y_var : Set[Column]
The Y variable column(s).
z_covariates : Set[Column]
The Z variable column(s).
n_shuffle_nbrs : int
The number of nearest-neighbors in feature space to shuffle.
n_shuffle : int
The number of times to generate the shuffled distribution.
Returns
-------
pvalue : float
The pvalue.
"""
n_samples, _ = data.shape
x_cols = list(x_vars)
if len(z_covariates) > 0 and n_shuffle_nbrs < n_samples:
null_dist = np.zeros(n_shuffle)
# create a copy of the data to store the shuffled array
data_copy = data.copy()
# Get nearest neighbors around each sample point in Z subspace
z_array = data[list(z_covariates)].to_numpy()
nbrs = generate_knn_in_subspace(
z_array, method="kdtree", k=n_shuffle_nbrs, n_jobs=self.n_jobs
)
# we will now compute a shuffled distribution, where X_i is replaced
# by X_j only if the corresponding Z_i and Z_j are "nearby", computed
# using the spatial tree query
for idx in range(n_shuffle):
# Shuffle neighbor indices for each sample index
for i in range(len(nbrs)):
self.random_state.shuffle(nbrs[i])
# Select a series of neighbor indices that contains as few as
# possible duplicates
restricted_permutation = restricted_nbr_permutation(
nbrs, random_seed=self.random_seed
)
# update the X variable column
data_copy.loc[:, x_cols] = data.loc[restricted_permutation, x_cols].to_numpy()
# compute the CMI on the shuffled array
null_dist[idx] = self._compute_cmi(data_copy, x_vars, y_vars, z_covariates)
else:
null_dist = self._compute_shuffle_dist(
data, x_vars, y_vars, z_covariates, n_shuffle=n_shuffle
)
return null_dist
def _compute_shuffle_dist(
self,
data: pd.DataFrame,
x_vars: Set[Column],
y_vars: Set[Column],
z_covariates: Set,
n_shuffle: int,
) -> ArrayLike:
"""Compute a shuffled distribution of the test statistic."""
data_copy = data.copy()
x_cols = list(x_vars)
# initialize the shuffle distribution
shuffle_dist = np.zeros((n_shuffle,))
for idx in range(n_shuffle):
# compute a shuffled version of the data
x_data = data[x_cols]
shuffled_x = sklearn.utils.shuffle(x_data, random_state=self.random_seed)
# compute now the test statistic on the shuffle data
data_copy[x_cols] = shuffled_x.values
shuffle_dist[idx] = self._compute_cmi(data_copy, x_vars, y_vars, z_covariates)
return shuffle_dist
def _compute_cmi(
self, df: pd.DataFrame, x_vars: Set[Column], y_vars: Set[Column], z_covariates: Set[Column]
):
raise NotImplementedError("All CMI methods must implement a _compute_cmi function.")