from functools import partial
from typing import Callable, Union
import numpy as np
from scipy.stats import entropy
from sklearn.model_selection import KFold
from sklearn.neighbors import NearestNeighbors
from dowhy.gcm.auto import AssignmentQuality, select_model
from dowhy.gcm.constant import EPS
from dowhy.gcm.ml.classification import ClassificationModel, create_logistic_regression_classifier
from dowhy.gcm.util.general import has_categorical, is_categorical, setdiff2d, shape_into_2d
[docs]def auto_estimate_kl_divergence(X: np.ndarray, Y: np.ndarray) -> float:
if is_categorical(X):
return estimate_kl_divergence_categorical(X, Y)
elif not has_categorical(X) and is_probability_matrix(X):
return estimate_kl_divergence_of_probabilities(X, Y)
else:
if X.ndim == 2 and X.shape[1] > 1:
return estimate_kl_divergence_continuous_clf(X, Y)
else:
return estimate_kl_divergence_continuous_knn(X, Y)
[docs]def estimate_kl_divergence_continuous_knn(
X: np.ndarray, Y: np.ndarray, k: int = 1, remove_common_elements: bool = True, n_jobs: int = 1
) -> float:
"""Estimates KL-Divergence using k-nearest neighbours (Wang et al., 2009).
While, in theory, this handles multidimensional inputs, consider using estimate_kl_divergence_continuous_clf
for data with more than one dimension.
Q. Wang, S. R. Kulkarni, and S. VerdĂș,
"Divergence estimation for multidimensional densities via k-nearest-neighbor distances",
IEEE Transactions on Information Theory, vol. 55, no. 5, pp. 2392-2405, May 2009.
:param X: (N_1,D) Sample drawn from distribution P_X
:param Y: (N_2,D) Sample drawn from distribution P_Y
:param k: Number of neighbors to consider.
:param remove_common_elements: If true, common values in X and Y are removed. This would otherwise lead to
a KNN distance of zero for these values if k is set to 1, which would cause a
division by zero error.
:param n_jobs: Number of parallel jobs used for the nearest neighbors model. -1 means it uses all available cores.
Note that in most applications, parallelizing this rather introduces more overhead, leading to a
slower runtime.
return: Estimated value of D(P_X||P_Y).
"""
X, Y = shape_into_2d(X, Y)
if X.shape[1] != Y.shape[1]:
raise RuntimeError(
"Samples from X and Y need to have the same dimension, but X has dimension %d and Y has "
"dimension %d." % (X.shape[1], Y.shape[1])
)
X = X.astype(np.float64)
Y = Y.astype(np.float64)
# Making sure that X and Y have no overlapping values, which would lead to a distance of 0 with k=1 and, thus, to
# a division by zero.
if remove_common_elements:
X = setdiff2d(X, Y, assume_unique=True)
if X.shape[0] < k + 1:
# All elements are equal (or at least less than k samples are different)
return 0
n, m = X.shape[0], Y.shape[0]
if n == 0:
return 0
d = float(X.shape[1])
x_neighbourhood = NearestNeighbors(n_neighbors=k + 1, n_jobs=n_jobs).fit(X)
y_neighbourhood = NearestNeighbors(n_neighbors=k, n_jobs=n_jobs).fit(Y)
distances_x, _ = x_neighbourhood.kneighbors(X, n_neighbors=k + 1)
distances_y, _ = y_neighbourhood.kneighbors(X, n_neighbors=k)
rho = distances_x[:, -1]
nu = distances_y[:, -1]
result = np.sum((d / n) * np.log(nu / rho)) + np.log(m / (n - 1))
if ~np.isfinite(result):
raise RuntimeError(
"Got a non-finite KL divergence! This can happen if both data sets have overlapping "
"elements. Since these are normally removed by this method, double check whether the arrays "
"are numeric."
)
if result < 0:
result = 0
return result
[docs]def estimate_kl_divergence_continuous_clf(
samples_P: np.ndarray,
samples_Q: np.ndarray,
n_splits: int = 5,
classifier_model: Union[AssignmentQuality, Callable[[], ClassificationModel]] = partial(
create_logistic_regression_classifier, max_iter=10000
),
epsilon: float = EPS,
) -> float:
"""Estimates KL-Divergence based on probabilities given by classifier. This is:
D_f(P || Q) = \int f(p(x)/q(x)) q(x) dx ~= -1/N \sum_x log(p(Y = 1 | x) / (1 - p(Y = 1 | x)))
Here, the KL divergence can be approximated using the log ratios of probabilities to predict whether a sample
comes from distribution P or Q.
:param samples_P: Samples drawn from P. Can have a different number of samples than Q.
:param samples_Q: Samples drawn from Q. Can have a different number of samples than P.
:param n_splits: Number of splits of the training and test data. The classifier is trained on the training
data and evaluated on the test data to obtain the probabilities.
:param classifier_model: Used to estimate the probabilities for the log ratio. This can either be a
ClassificationModel or an AssignmentQuality. In the latter, a model is automatically
selected based on the best performance on a training set.
:param epsilon: If the probability is either 1 or 0, this value will be used for clipping, i.e., 0 becomes epsilon
and 1 becomes 1- epsilon.
:return: Estimated value of the KL divergence D(P||Q).
"""
samples_P, samples_Q = shape_into_2d(samples_P, samples_Q)
if samples_P.shape[1] != samples_Q.shape[1]:
raise ValueError("X and Y need to have the same number of features!")
all_probs = []
splits_p = list(KFold(n_splits=n_splits, shuffle=True).split(samples_P))
splits_q = list(KFold(n_splits=n_splits, shuffle=True).split(samples_Q))
if isinstance(classifier_model, AssignmentQuality):
classifier_model = select_model(
np.vstack([samples_P, samples_Q]),
np.concatenate([np.zeros(samples_P.shape[0]), np.ones(samples_Q.shape[0])]).astype(str),
classifier_model,
)[0]
else:
classifier_model = classifier_model()
for k in range(n_splits):
# Balance the classes
num_samples = min(len(splits_p[k][0]), len(splits_q[k][0]))
classifier_model.fit(
np.vstack([samples_P[splits_p[k][0][:num_samples]], samples_Q[splits_q[k][0][:num_samples]]]),
np.concatenate([np.zeros(num_samples), np.ones(num_samples)]).astype(str),
)
probs_P = classifier_model.predict_probabilities(samples_P[splits_p[k][1]])[:, 1]
probs_P[probs_P == 0] = epsilon
probs_P[probs_P == 1] = 1 - epsilon
all_probs.append(probs_P)
all_probs = np.concatenate(all_probs)
kl_divergence = -np.mean(np.log(all_probs / (1 - all_probs)))
if kl_divergence < 0:
kl_divergence = 0
return kl_divergence
[docs]def estimate_kl_divergence_categorical(X: np.ndarray, Y: np.ndarray) -> float:
X, Y = shape_into_2d(X, Y)
if X.shape[1] != Y.shape[1]:
raise RuntimeError(
"Samples from X and Y need to have the same dimension, but X has dimension %d and Y has "
"dimension %d." % (X.shape[1], Y.shape[1])
)
all_uniques = np.unique(np.vstack([X, Y]))
p = np.array([(np.sum(X == i) + EPS) / (X.shape[0] + EPS) for i in all_uniques])
q = np.array([(np.sum(Y == i) + EPS) / (Y.shape[0] + EPS) for i in all_uniques])
return float(np.sum(p * np.log(p / q)))
[docs]def estimate_kl_divergence_of_probabilities(X: np.ndarray, Y: np.ndarray) -> float:
"""Estimates the Kullback-Leibler divergence between each pair of probability vectors (row wise) in X and Y
separately and returns the mean over all results."""
X, Y = shape_into_2d(X, Y)
if X.shape[1] != Y.shape[1]:
raise RuntimeError(
"Samples from X and Y need to have the same dimension, but X has dimension %d and Y has "
"dimension %d." % (X.shape[1], Y.shape[1])
)
return float(np.mean(entropy(X + EPS, Y + EPS, axis=1)))
[docs]def is_probability_matrix(X: np.ndarray) -> bool:
if X.ndim == 1:
return np.all(np.isclose(np.sum(abs(X.astype(np.float64)), axis=0), 1))
elif X.shape[1] == 1:
return False
else:
return np.all(np.isclose(np.sum(abs(X.astype(np.float64)), axis=1), 1))