"""This module provides functionality for shapley value estimation.
Classes and functions in this module should be considered experimental, meaning there might be breaking API changes in
the future.
"""
import itertools
import math
from enum import Enum
from typing import Callable, Dict, List, Optional, Set, Tuple, Union
import numpy as np
import scipy
from joblib import Parallel, delayed
from scipy.special import comb
from scipy.stats._qmc import Halton
from sklearn.linear_model import LinearRegression
from tqdm import tqdm
import dowhy.gcm.config as config
from dowhy.gcm.util.general import set_random_seed
[docs]class ShapleyApproximationMethods(Enum):
"""
AUTO: Using EXACT when number of players is below 6 and EARLY_STOPPING otherwise.
EXACT: Generate all possible subsets and estimate Shapley values with corresponding subset weights.
EXACT_FAST: Generate all possible subsets and estimate Shapley values via weighed least squares regression. This can
be faster, but, depending on the set function, numerically less stable.
SUBSET_SAMPLING: Randomly samples subsets and estimate Shapley values via weighed least squares regression. Here,
only a certain number of randomly drawn subsets are used.
EARLY_STOPPING: Estimate Shapley values based on a few randomly generated permutations. Stop the estimation process
when the Shapley values do not change much on average anymore between runs.
PERMUTATION: Estimates Shapley values based on a fixed number of randomly generated permutations. By fine tuning
hyperparameters, this can be potentially faster than the early stopping approach due to a better
utilization of the parallelization.
"""
AUTO = (0,)
EXACT = (1,)
EXACT_FAST = (2,)
EARLY_STOPPING = (3,)
PERMUTATION = (4,)
SUBSET_SAMPLING = (5,)
[docs]class ShapleyConfig:
def __init__(
self,
approximation_method: ShapleyApproximationMethods = ShapleyApproximationMethods.AUTO,
num_permutations: int = 2000,
num_subset_samples: int = 5000,
min_percentage_change_threshold: float = 0.05,
n_jobs: Optional[int] = None,
) -> None:
"""Config for estimating Shapley values.
:param approximation_method: Type of approximation methods (see :py:class:`ShapleyApproximationMethods <dowhy.gcm.shapley.ShapleyApproximationMethods>`).
:param num_permutations: Number of permutations used for approximating the Shapley values. This value is only
used for PERMUTATION and EARLY_STOPPING. In both cases, it indicates the maximum
number of permutations that are evaluated. Note that EARLY_STOPPING might stop before
reaching the number of permutations if the change in Shapley values fall below
min_percentage_change_threshold.
:param num_subset_samples: Number of subsets used for the SUBSET_SAMPLING method. This value is not used
otherwise.
:param min_percentage_change_threshold: This parameter is only relevant for EARLY_STOPPING and indicates the
minimum required change in percentage of the Shapley values between two
runs before the estimation stops. For instance, with a value of 0.01
the estimation would stop if all Shapley values change less than 0.01
per run. To mitigate the impact of randomness, the changes need to stay
below the threshold for at least 2 consecutive runs.
:param n_jobs: Number of parallel jobs.
"""
self.approximation_method = approximation_method
self.num_permutations = num_permutations
self.num_subset_samples = num_subset_samples
self.min_percentage_change_threshold = min_percentage_change_threshold
self.n_jobs = config.default_n_jobs if n_jobs is None else n_jobs
[docs]def estimate_shapley_values(
set_func: Callable[[np.ndarray], Union[float, np.ndarray]],
num_players: int,
shapley_config: Optional[ShapleyConfig] = None,
) -> np.ndarray:
"""Estimates the Shapley values based on the provided set function. A set function here is defined by taking a
(subset) of players and returning a certain utility value. This is in the context of attributing the
value of the i-th player to a subset of players S by evaluating v(S u {i}) - v(S), where v is the
set function and i is not in S. While we use the term 'player' here, this is often a certain feature/variable.
The input of the set function is a binary vector indicating which player is part of the set. For instance, given 4
players (1,2,3,4) and a subset only contains players 1,2,4, then this is indicated by the vector [1, 1, 0, 1]. The
function is expected to return a numeric value based on this input.
Note: The set function can be arbitrary and can resemble computationally complex operations. Keep in mind
that the estimation of Shapley values can become computationally expensive and requires a lot of memory. If the
runtime is too slow, consider changing the default config.
:param set_func: A set function that expects a binary vector as input which specifies which player is part of the
subset.
:param num_players: Total number of players.
:param shapley_config: A config object for indicating the approximation method and other parameters. If None is
given, a default config is used. For faster runtime or more accurate results, consider
creating a custom config.
:return: A numpy array representing the Shapley values for each player, i.e. there are as many Shapley values as
num_players. The i-th entry belongs to the i-th player. Here, the set function defines which index belongs
to which player and is responsible to keep it consistent.
"""
if shapley_config is None:
shapley_config = ShapleyConfig()
approximation_method = shapley_config.approximation_method
if approximation_method == ShapleyApproximationMethods.AUTO:
if num_players <= 5:
approximation_method = ShapleyApproximationMethods.EXACT
else:
approximation_method = ShapleyApproximationMethods.EARLY_STOPPING
if approximation_method == ShapleyApproximationMethods.EXACT:
return _estimate_shapley_values_exact(set_func=set_func, num_players=num_players, n_jobs=shapley_config.n_jobs)
elif approximation_method == ShapleyApproximationMethods.PERMUTATION:
return _approximate_shapley_values_via_permutation_sampling(
set_func=set_func,
num_players=num_players,
num_permutations=shapley_config.num_permutations,
n_jobs=shapley_config.n_jobs,
)
elif approximation_method == ShapleyApproximationMethods.EARLY_STOPPING:
return _approximate_shapley_values_via_early_stopping(
set_func=set_func,
num_players=num_players,
max_num_permutations=shapley_config.num_permutations,
min_percentage_change_threshold=shapley_config.min_percentage_change_threshold,
n_jobs=shapley_config.n_jobs,
)
elif approximation_method == ShapleyApproximationMethods.SUBSET_SAMPLING:
return _approximate_shapley_values_via_least_squares_regression(
set_func=set_func,
num_players=num_players,
use_subset_approximation=True,
num_samples_for_approximation=shapley_config.num_subset_samples,
n_jobs=shapley_config.n_jobs,
)
elif approximation_method == ShapleyApproximationMethods.EXACT_FAST:
return _approximate_shapley_values_via_least_squares_regression(
set_func=set_func,
num_players=num_players,
use_subset_approximation=False,
num_samples_for_approximation=-1,
n_jobs=shapley_config.n_jobs,
)
else:
raise ValueError("Unknown method for Shapley approximation!")
def _estimate_shapley_values_exact(
set_func: Callable[[np.ndarray], Union[float, np.ndarray]], num_players: int, n_jobs: int
) -> np.ndarray:
"""Following Eq. (2) in
Janzing, D., Minorics, L., & Bloebaum, P. (2020).
Feature relevance quantification in explainable AI: A causal problem.
In International Conference on Artificial Intelligence and Statistics (pp. 2907-2916). PMLR."""
all_subsets = [tuple(subset) for subset in itertools.product([0, 1], repeat=num_players)]
with Parallel(n_jobs=n_jobs) as parallel:
subset_to_result_map = _evaluate_set_function(set_func, all_subsets, parallel)
def compute_subset_weight(length: int) -> float:
return 1 / (num_players * comb(num_players - 1, length))
subset_weight_cache = {}
shapley_values = [None] * num_players
subsets_missing_one_player = np.array(list(itertools.product([0, 1], repeat=num_players - 1)))
for player_index in range(num_players):
subsets_with_player = [
tuple(subset) for subset in np.insert(subsets_missing_one_player, player_index, 1, axis=1)
]
subsets_without_player = [
tuple(subset) for subset in np.insert(subsets_missing_one_player, player_index, 0, axis=1)
]
for i in range(len(subsets_with_player)):
subset_length = int(np.sum(subsets_without_player[i]))
if subset_length not in subset_weight_cache:
subset_weight_cache[subset_length] = compute_subset_weight(subset_length)
weighted_diff = subset_weight_cache[subset_length] * (
subset_to_result_map[subsets_with_player[i]] - subset_to_result_map[subsets_without_player[i]]
)
# For estimating Shapley values for multiple samples (e.g. in feature relevance) and the number of samples
# is unknown beforehand.
if shapley_values[player_index] is None:
shapley_values[player_index] = weighted_diff
else:
shapley_values[player_index] += weighted_diff
return np.array(shapley_values).T
def _approximate_shapley_values_via_least_squares_regression(
set_func: Callable[[np.ndarray], Union[float, np.ndarray]],
num_players: int,
use_subset_approximation: bool,
num_samples_for_approximation: int,
n_jobs: int,
full_and_empty_subset_weight: float = 10**20,
) -> np.ndarray:
"""For more details about this approximation, see Section 4.1.1 in
Janzing, D., Minorics, L., & Bloebaum, P. (2020).
Feature relevance quantification in explainable AI: A causal problem.
In International Conference on Artificial Intelligence and Statistics (pp. 2907-2916). PMLR."""
if not use_subset_approximation:
all_subsets, weights = _create_subsets_and_weights_exact(num_players, full_and_empty_subset_weight)
else:
all_subsets, weights = _create_subsets_and_weights_approximation(
num_players, full_and_empty_subset_weight, num_samples_for_approximation
)
def parallel_job(subset: np.ndarray, parallel_random_seed: int):
set_random_seed(parallel_random_seed)
return set_func(subset)
with Parallel(n_jobs=n_jobs) as parallel:
random_seeds = np.random.randint(np.iinfo(np.int32).max, size=len(all_subsets))
set_function_results = parallel(
delayed(parallel_job)(subset, int(random_seed))
for subset, random_seed in tqdm(
zip(all_subsets, random_seeds),
desc="Estimate shapley values as least squares solution",
position=0,
leave=True,
disable=not config.show_progress_bars,
)
)
return LinearRegression().fit(all_subsets, np.array(set_function_results), sample_weight=weights).coef_
def _approximate_shapley_values_via_permutation_sampling(
set_func: Callable[[np.ndarray], Union[float, np.ndarray]],
num_players: int,
num_permutations: int,
n_jobs: int,
use_halton_sequence: bool = True,
) -> np.ndarray:
"""For more details about this approximation, see
Strumbelj, E., Kononenko, I. (2014).
Explaining prediction models and individual predictions with feature contributions.
In Knowledge and information systems, 41(3):647–665
When use_halton_sequence is true, a Sobol sequence is used to fill the sample space more uniformly than randomly
generating a permutation. This can improve convergence seeing that the sampling aims at maximizing the information
gain.
"""
full_subset_result, empty_subset_result = _estimate_full_and_emtpy_subset_results(set_func, num_players)
total_num_permutations = math.factorial(num_players)
halton_generator = None
num_permutations = min(total_num_permutations, num_permutations)
if use_halton_sequence:
halton_generator = Halton(num_players, seed=int(np.random.randint(np.iinfo(np.int32).max)))
subsets_to_evaluate = set()
all_permutations = []
for i in range(num_permutations):
if halton_generator is None:
permutation = np.random.choice(num_players, num_players, replace=False)
else:
permutation = np.argsort(halton_generator.random(1))[0]
all_permutations.append(permutation)
subsets_to_evaluate.update(_create_index_order_and_subset_tuples(permutation))
with Parallel(n_jobs=n_jobs) as parallel:
evaluated_subsets = _evaluate_set_function(set_func, subsets_to_evaluate, parallel)
shapley_values = _estimate_shapley_values_of_permutation(
all_permutations[0], evaluated_subsets, full_subset_result, empty_subset_result
)
for i in range(1, len(all_permutations)):
shapley_values += _estimate_shapley_values_of_permutation(
all_permutations[i], evaluated_subsets, full_subset_result, empty_subset_result
)
return shapley_values / len(all_permutations)
def _approximate_shapley_values_via_early_stopping(
set_func: Callable[[np.ndarray], Union[float, np.ndarray]],
num_players: int,
max_num_permutations: int,
min_percentage_change_threshold: float,
n_jobs: int,
num_permutations_per_run: int = 5,
use_halton_sequence: bool = True,
num_consecutive_converged_runs: int = 2,
) -> np.ndarray:
"""Combines the approximation method described in
Strumbelj, E., Kononenko, I. (2014).
Explaining prediction models and individual predictions with feature contributions.
In Knowledge and information systems, 41(3):647–665
with an early stopping criteria. This is, if the Shapley values change less than a certain threshold on average
between two runs, then stop the estimation.
When use_halton_sequence is true, a Halton sequence is used to fill the sample space more uniformly than randomly
generating a permutation. This can improve convergence seeing that the sampling aims at maximizing the information
gain.
"""
full_subset_result, empty_subset_result = _estimate_full_and_emtpy_subset_results(set_func, num_players)
shapley_values = None
old_shap_proxy = np.zeros(num_players)
evaluated_subsets = {}
num_generated_permutations = 0
run_counter = 0
halton_generator = None
convergence_tracker = None
if use_halton_sequence:
halton_generator = Halton(num_players, seed=int(np.random.randint(np.iinfo(np.int32).max)))
if config.show_progress_bars:
pbar = tqdm(total=1)
with Parallel(n_jobs=n_jobs) as parallel:
# The method stops if either the change between some consecutive runs is below the given threshold or the
# maximum number of runs is reached.
while True:
run_counter += 1
subsets_to_evaluate = set()
# In each run, we create one random permutation of players. For instance, given 4 players, a permutation
# could be [3,1,4,2].
if halton_generator is None:
permutations = [
np.random.choice(num_players, num_players, replace=False) for _ in range(num_permutations_per_run)
]
else:
# Generate k random permutations by sorting the indices of the Halton sequence
permutations = np.argsort(halton_generator.random(num_permutations_per_run), axis=1)
for permutation in permutations:
num_generated_permutations += 1
# Create all subsets belonging to the generated permutation. This is, if we have [3,1,4,2], then the
# subsets are [3], [3,1], [3,1,4] [3,1,4,2].
subsets_to_evaluate.update(
[
subset_tuple
for subset_tuple in _create_index_order_and_subset_tuples(permutation)
if subset_tuple not in evaluated_subsets
]
)
# The result for each subset is cached such that if a subset that has already been evaluated appears again,
# we can take this result directly.
evaluated_subsets.update(_evaluate_set_function(set_func, subsets_to_evaluate, parallel, False))
for permutation in permutations:
# To improve the runtime, multiple permutations are evaluated in each run.
if shapley_values is None:
shapley_values = _estimate_shapley_values_of_permutation(
permutation, evaluated_subsets, full_subset_result, empty_subset_result
)
else:
shapley_values += _estimate_shapley_values_of_permutation(
permutation, evaluated_subsets, full_subset_result, empty_subset_result
)
if run_counter > max_num_permutations:
break
new_shap_proxy = np.array(shapley_values)
# The current Shapley values are the average of the estimated values, i.e. we need to divide by the number
# of generated permutations here.
new_shap_proxy /= num_generated_permutations
if convergence_tracker is None:
if new_shap_proxy.ndim == 2:
convergence_tracker = np.zeros(new_shap_proxy.shape[0])
else:
convergence_tracker = np.array([0])
if run_counter > 1:
mean_percentage_change = 0
# In case Shapley values are estimated for multiple samples, e.g., in feature relevance. So, we have a
# matrix of Shapley values instead of a vector.
if new_shap_proxy.ndim == 2:
for i in range(new_shap_proxy.shape[0]):
converging, tmp_mean_percentage_change = _check_convergence(
new_shap_proxy[i], old_shap_proxy[i], min_percentage_change_threshold
)
if np.all(converging):
convergence_tracker[i] += 1
elif convergence_tracker[i] < num_consecutive_converged_runs:
convergence_tracker[i] = 0
mean_percentage_change += tmp_mean_percentage_change
mean_percentage_change /= new_shap_proxy.shape[0]
else:
converging, tmp_mean_percentage_change = _check_convergence(
new_shap_proxy, old_shap_proxy, min_percentage_change_threshold
)
if np.all(converging):
convergence_tracker[0] += 1
elif convergence_tracker[0] < num_consecutive_converged_runs:
convergence_tracker[0] = 0
mean_percentage_change = tmp_mean_percentage_change
if config.show_progress_bars:
pbar.set_description(
f"Estimating Shapley Values. "
f"Average change of Shapley values in run {run_counter} "
f"({num_generated_permutations} evaluated permutations): "
f"{mean_percentage_change * 100}%"
)
if np.all(convergence_tracker >= num_consecutive_converged_runs):
# Here, the change between consecutive runs is below the minimum threshold, but to reduce the
# likelihood that this just happened by chance, we require that this happens at least for
# num_consecutive_converged_runs times in a row.
break
old_shap_proxy = new_shap_proxy
if config.show_progress_bars:
pbar.update(1)
pbar.close()
return shapley_values / num_generated_permutations
def _check_convergence(
new_shap_proxy: np.ndarray, old_shap_proxy: np.ndarray, min_percentage_change_threshold: float
) -> Tuple[np.ndarray, float]:
non_zero_indices = old_shap_proxy != 0
converging = np.array([True] * new_shap_proxy.shape[0])
percentages = abs(1 - new_shap_proxy[non_zero_indices] / old_shap_proxy[non_zero_indices])
# Check if change in percentage is below threshold
converging[non_zero_indices] = percentages < min_percentage_change_threshold
# Check for values that are exactly zero. If they don't change between two runs, we consider it as converging.
converging[~non_zero_indices] = new_shap_proxy[~non_zero_indices] == old_shap_proxy[~non_zero_indices]
if np.all(~non_zero_indices):
mean_percentage_change = 1
else:
mean_percentage_change = np.mean(percentages)
return converging, mean_percentage_change
def _create_subsets_and_weights_exact(num_players: int, high_weight: float) -> Tuple[np.ndarray, np.ndarray]:
"""Creates all subsets and the exact weights of each subset. See Section 4.1.1. in
Janzing, D., Minorics, L., & Bloebaum, P. (2020).
Feature relevance quantification in explainable AI: A causal problem.
In International Conference on Artificial Intelligence and Statistics (pp. 2907-2916). PMLR.
for more details on this.
:param num_players: Total number of players.
:param high_weight: A 'high' weight for computational purposes. This is used to resemble 'infinity', but needs to be
selected carefully to avoid numerical issues.
:return: A tuple, where the first entry is a numpy array with all subsets and the second entry is an array with the
corresponding weights to each subset.
"""
all_subsets = []
num_iterations = int(np.ceil(num_players / 2))
for i in range(num_iterations):
# Create all (unique) subsets)
all_subsets.extend(
np.array(
[np.bincount(combs, minlength=num_players) for combs in itertools.combinations(range(num_players), i)]
)
)
all_subsets.extend(
np.array(
[
np.bincount(combs, minlength=num_players)
for combs in itertools.combinations(range(num_players), num_players - i)
]
)
)
if i == num_iterations - 1 and num_players % 2 == 0:
all_subsets.extend(
np.array(
[
np.bincount(combs, minlength=num_players)
for combs in itertools.combinations(range(num_players), i + 1)
]
)
)
weights = np.zeros(len(all_subsets))
for i, subset in enumerate(all_subsets):
subset_size = np.sum(subset)
if subset_size == num_players or subset_size == 0:
# Assigning a 'high' weight, since this resembles "infinity".
weights[i] = high_weight
else:
# The weight for a subset with a specific length (see paper mentioned in the docstring for more
# information).
weights[i] = (num_players - 1) / (
scipy.special.binom(num_players, subset_size) * subset_size * (num_players - subset_size)
)
return np.array(all_subsets, dtype=np.int32), weights.astype(float)
def _create_subsets_and_weights_approximation(
num_players: int, high_weight: float, num_subset_samples: int
) -> Tuple[np.ndarray, np.ndarray]:
"""Randomly samples subsets and weights them based on the number of how often they appear.
:param num_players: Total number of players.
:param high_weight: A 'high' weight for computational purposes. This is used to resemble 'infinity', but needs to be
selected carefully to avoid numerical issues.
:param num_subset_samples: Number of subset samples.
:return: A tuple, where the first entry is a numpy array with the sampled subsets and the second entry is an array
with the corresponding weights to each subset.
"""
all_subsets = [np.zeros(num_players), np.ones(num_players)]
weights = {tuple(all_subsets[0]): high_weight, tuple(all_subsets[1]): high_weight}
probabilities_of_subset_length = np.zeros(num_players + 1)
for i in range(1, num_players):
probabilities_of_subset_length[i] = (num_players - 1) / (i * (num_players - i))
probabilities_of_subset_length = probabilities_of_subset_length / np.sum(probabilities_of_subset_length)
for i in range(num_subset_samples):
subset_as_tuple = _convert_list_of_indices_to_binary_vector_as_tuple(
np.random.choice(
num_players, np.random.choice(num_players + 1, 1, p=probabilities_of_subset_length), replace=False
),
num_players,
)
if subset_as_tuple not in weights:
weights[subset_as_tuple] = 0
all_subsets.append(np.array(subset_as_tuple))
weights[subset_as_tuple] += 1
weights = np.array([weights[tuple(x)] for x in all_subsets])
return np.array(all_subsets, dtype=np.int32), weights.astype(float)
def _convert_list_of_indices_to_binary_vector_as_tuple(list_of_indices: List[int], num_players: int) -> Tuple[int]:
subset = np.zeros(num_players, dtype=np.int32)
subset[list_of_indices] = 1
return tuple(subset)
def _evaluate_set_function(
set_func: Callable[[np.ndarray], Union[float, np.ndarray]],
evaluation_subsets: Union[Set[Tuple[int]], List[Tuple[int]]],
parallel_context: Parallel,
show_progressbar: bool = True,
) -> Dict[Tuple[int], Union[float, np.ndarray]]:
def parallel_job(input_subset: Tuple[int], parallel_random_seed: int) -> Union[float, np.ndarray]:
set_random_seed(parallel_random_seed)
return set_func(np.array(input_subset))
random_seeds = np.random.randint(np.iinfo(np.int32).max, size=len(evaluation_subsets))
subset_results = parallel_context(
delayed(parallel_job)(subset_to_evaluate, int(random_seed))
for subset_to_evaluate, random_seed in tqdm(
zip(evaluation_subsets, random_seeds),
desc="Evaluate set function",
position=0,
leave=True,
disable=not config.show_progress_bars or not show_progressbar,
)
)
subset_to_result_map = {}
for (subset, result) in zip(evaluation_subsets, subset_results):
subset_to_result_map[subset] = result
return subset_to_result_map
def _estimate_full_and_emtpy_subset_results(
set_func: Callable[[np.ndarray], Union[float, np.ndarray]], num_players: int
) -> Tuple[Union[float, np.ndarray], Union[float, np.ndarray]]:
return set_func(np.ones(num_players, dtype=np.int32)), set_func(np.zeros(num_players, dtype=np.int32))
def _create_index_order_and_subset_tuples(permutation: List[int]) -> List[Tuple[int]]:
indices = []
index_tuples = []
for var in range(len(permutation) - 1):
indices += [permutation[var]]
index_tuples.append(_convert_list_of_indices_to_binary_vector_as_tuple(indices, len(permutation)))
return index_tuples
def _estimate_shapley_values_of_permutation(
permutation: List[int],
evaluated_subsets: Dict[Tuple[int], Union[float, np.ndarray]],
full_subset_result: Union[float, np.ndarray],
empty_subset_result: Union[float, np.ndarray],
) -> np.ndarray:
current_variable_set = []
shapley_values = [[]] * len(permutation)
previous_result = empty_subset_result
for n in range(len(permutation) - 1):
current_variable_set += [permutation[n]]
current_result = evaluated_subsets[
_convert_list_of_indices_to_binary_vector_as_tuple(current_variable_set, len(permutation))
]
shapley_values[permutation[n]] = current_result - previous_result
previous_result = current_result
shapley_values[permutation[-1]] = full_subset_result - previous_result
return np.array(shapley_values).T