Source code for dowhy.causal_refuters.evalue_sensitivity_analyzer

import copy
import logging

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import statsmodels.api as sm

from dowhy.causal_estimator import CausalEstimate, CausalEstimator
from dowhy.causal_estimators.econml import Econml
from dowhy.causal_estimators.generalized_linear_model_estimator import GeneralizedLinearModelEstimator
from dowhy.causal_estimators.linear_regression_estimator import LinearRegressionEstimator
from dowhy.causal_identifier import IdentifiedEstimand


[docs]class EValueSensitivityAnalyzer: """ This class computes Ding & VanderWeele's E-value for unmeasured confounding. The E-value is the minimum strength of association on the risk ratio scale that an unmeasured confounder would need to have with both the treatment and the outcome, conditional on the measured covariates, to fully explain away a specific treatment-outcome association. It benchmarks the E-value against measured confounders using McGowan and Greevy Jr.'s Observed Covariate E-value. This approach drops measured confounders and re-fits the estimator, measuring how much the limiting bound of the confidence interval changes on the E-value scale. This benchmarks hypothetical unmeasured confounding against each of the measured confounders. See: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4820664/, https://dash.harvard.edu/bitstream/handle/1/36874927/EValue_FinalSubmission.pdf, and https://arxiv.org/pdf/2011.07030.pdf. The implementation is based on the R packages https://github.com/cran/EValue and https://github.com/LucyMcGowan/tipr. :param estimate: CausalEstimate :param estimand: IdentifiedEstimand :param data: pd.DataFrame :param outcome_name: Outcome variable name :param no_effect_baseline: A number to which to shift the observed estimate to. Defaults to 1 for ratio measures (RR, OR, HR) and 0 for additive measures (OLS, MD). (Default = None) """ def __init__( self, estimate: CausalEstimate, estimand: IdentifiedEstimand, data: pd.DataFrame, treatment_name: str, outcome_name: str, no_effect_baseline=None, ): self.estimate = estimate self.estimand = estimand self.data = data self.treatment_name = treatment_name self.outcome_name = outcome_name self.no_effect_baseline = no_effect_baseline if self.no_effect_baseline is None: if isinstance(self.estimate.estimator, LinearRegressionEstimator): self.no_effect_baseline = 0 elif isinstance(self.estimate.estimator, GeneralizedLinearModelEstimator): self.no_effect_baseline = 1 self.logger = logging.getLogger(__name__) self.stats = None self.benchmarking_results = None self.sd_outcome = np.std(self.data[outcome_name])
[docs] def check_sensitivity(self, plot=True): """ Computes E-value for point estimate and confidence limits. Benchmarks E-values against measured confounders using Observed Covariate E-values. Plots E-values and Observed Covariate E-values. :param plot: plots E-value for point estimate and confidence limit. (Default = True) """ if isinstance(self.estimate.estimator, LinearRegressionEstimator): coef_est = self.estimate.value coef_se = self.estimate.get_standard_error()[0] elif isinstance(self.estimate.estimator, GeneralizedLinearModelEstimator): coef_est = self.estimate.estimator.model.params[1:2].to_numpy()[0] coef_se = self.estimate.estimator.model.bse[1:2].to_numpy()[0] self.stats = self.get_evalue(coef_est, coef_se) eval_lo = self.stats["evalue_lower_ci"] eval_hi = self.stats["evalue_upper_ci"] if (eval_lo is None and eval_hi == 1) or (eval_hi is None and eval_lo == 1): self.logger.info( "Not benchmarking with Observed Covariate E-values. Confidence interval is already tipped." ) else: self.benchmark() if plot: self.plot()
[docs] def get_evalue(self, coef_est, coef_se): """ Computes E-value for point estimate and confidence limits. The estimate and confidence limits are converted to the risk ratio scale before the E-value is calculated. :param coef_est: coefficient estimate :param coef_se: coefficient standard error """ if isinstance(self.estimate.estimator, LinearRegressionEstimator): return self._evalue_OLS(coef_est, coef_se, self.sd_outcome, self.no_effect_baseline) elif isinstance(self.estimate.estimator, GeneralizedLinearModelEstimator): est = np.exp(coef_est) lo = np.exp(coef_est - 1.96 * coef_se) hi = np.exp(coef_est + 1.96 * coef_se) family = self.estimate.estimator.family link = self.estimate.estimator.family.link if isinstance(family, sm.families.Binomial) and isinstance(link, sm.families.links.Logit): rare = self.data[self.outcome_name].mean() < 0.15 return self._evalue_OR(est, lo, hi, rare, self.no_effect_baseline) elif isinstance( family, (sm.families.Poisson, sm.families.NegativeBinomial, sm.families.Gamma), ) and isinstance(link, sm.families.links.Log): return self._evalue_RR(est, lo, hi, self.no_effect_baseline) else: raise NotImplementedError( "Currently, only four GLM families are supported: " "1) sm.families.Binomial(link=sm.families.links.logit), " "2) sm.families.Poisson(link=sm.families.links.log), " "3) sm.families.NegativeBinomial(link=sm.families.links.log), " "4) sm.families.Gamma(link=sm.families.links.log), " )
[docs] def plot( self, num_points_per_contour=200, plot_size=(6.4, 4.8), contour_colors=["blue", "red"], benchmarking_color="green", xy_limit=None, ): """ Plots contours showing the combinations of treatment-confounder and confounder-outcome risk ratios that would tip the point estimate and confidence limit. The X-axis shows the treatment-confounder risk ratio and the Y-axis shows the confounder-outcome risk ratio. :param num_points_per_contour: number of points to calculate and plot for each contour (Default = 200) :param plot_size: size of the plot (Default = (6.4,4.8)) :param contour_colors: colors for point estimate and confidence limit contour (Default = ["blue", "red"]) :param benchmarking_color: color for observed covariate E-values. (Default = "green") :param xy_limit: plot's maximum x and y value. Default is 2 x E-value. (Default = None) """ if self.stats is None: raise ValueError("Must call ``check_sensitivity'' before calling ``plot''") fig, ax = plt.subplots(1, 1, figsize=plot_size) eval_est = self.stats["evalue_estimate"] rr = self.stats["converted_estimate"] if rr < 1: rr = 1 / rr if xy_limit is None: xy_limit = eval_est * 2 self._plot_contour(ax, rr, eval_est, num_points_per_contour, contour_colors[0], xy_limit) eval_lo = self.stats["evalue_lower_ci"] eval_hi = self.stats["evalue_upper_ci"] if (eval_lo is None and eval_hi == 1) or (eval_hi is None and eval_lo == 1): self.logger.info("Plotting contour for point estimate only. Confidence interval is already tipped.") else: if eval_lo is None: rr_ci = self.stats["converted_upper_ci"] eval_ci = eval_hi else: rr_ci = self.stats["converted_lower_ci"] eval_ci = eval_lo if rr_ci < 1: rr_ci = 1 / rr_ci self._plot_contour( ax, rr_ci, eval_ci, num_points_per_contour, contour_colors[1], xy_limit, point_est=False, ) ax.scatter( self.benchmarking_results["observed_covariate_e_value"], self.benchmarking_results["observed_covariate_e_value"], label="Observed Covariate E-values", color=benchmarking_color, ) example_var = self.benchmarking_results.iloc[0] obs_evalue = example_var["observed_covariate_e_value"] ax.text(obs_evalue, obs_evalue, example_var.name) ax.set(xlabel="$RR_{treatment-confounder}$", ylabel="$RR_{confounder-outcome}$") plt.ylim(1, xy_limit) plt.xlim(1, xy_limit) plt.legend() plt.show()
def _plot_contour(self, ax, rr, evalue, n_pts, color, xy_limit, point_est=True): """ Plots a single contour line :param ax: matplotlib axis :param rr: observed point estimate/confidence limit on the risk ratio scale :param evalue: E-value corresponding to observed point estimate/confidence limit :param n_pts: number of points to calculate and plot for each contour :param color: color for contour :param xy_limit: plot's maximum x and y value :param point_est: whether this is the point estimate (Default = True) """ step = (xy_limit - rr) / n_pts x_est = np.linspace(rr + step, xy_limit, num=n_pts) y_est = rr * (rr - 1) / (x_est - rr) + rr est_string = "point estimate" if point_est else "confidence interval" ax.scatter( evalue, evalue, label=f"E-value for {est_string}: {evalue.round(2)}", color=color, ) ax.fill_between( x_est, y_est, xy_limit, color=color, alpha=0.2, label=f"Tips {est_string}", ) ax.plot(x_est, y_est, color=color)
[docs] def benchmark(self): """ Benchmarks E-values against the measured confounders using McGowan and Greevy Jr.'s Observed Covariate E-value. This approach drops measured confounders and re-fits the estimator, measuring how much the limiting bound of the confidence interval changes on the E-value scale. This benchmarks hypothetical unmeasured confounding against each of the measured confounders. See: https://arxiv.org/pdf/2011.07030.pdf and https://github.com/LucyMcGowan/tipr """ new_ests = [] new_lo = [] new_hi = [] observed_covariate_e_values = [] backdoor_vars = self.estimand.get_backdoor_variables() for drop_var in backdoor_vars: # new estimator new_backdoor_vars = [var for var in backdoor_vars if var != drop_var] new_estimand = copy.deepcopy(self.estimand) new_estimand.set_backdoor_variables(new_backdoor_vars) new_estimator = self.estimate.estimator.get_new_estimator_object(new_estimand) new_estimator.fit( self.data, new_estimand.treatment_variable, new_estimand.outcome_variable, self.estimate.estimator._effect_modifier_names, **new_estimator._econml_fit_params if isinstance(new_estimator, Econml) else {}, ) # new effect estimate new_effect = new_estimator.estimate_effect( control_value=self.estimate.control_value, treatment_value=self.estimate.treatment_value, target_units=self.estimate.estimator._target_units, ) if isinstance(self.estimate.estimator, LinearRegressionEstimator): coef_est = new_effect.value coef_se = new_effect.get_standard_error()[0] elif isinstance(self.estimate.estimator, GeneralizedLinearModelEstimator): coef_est = new_effect.estimator.model.params[1:2].to_numpy()[0] coef_se = new_effect.estimator.model.bse[1:2].to_numpy()[0] new_stats = self.get_evalue(coef_est, coef_se) # observed covariate E-value if self.stats["evalue_lower_ci"] is None: ci = self.stats["converted_upper_ci"] new_ci = new_stats["converted_upper_ci"] else: ci = self.stats["converted_lower_ci"] new_ci = new_stats["converted_lower_ci"] covariate_e_value = self._observed_covariate_e_value(ci, new_ci) new_ests.append(new_stats["converted_estimate"]) new_lo.append(new_stats["converted_lower_ci"]) new_hi.append(new_stats["converted_upper_ci"]) observed_covariate_e_values.append(covariate_e_value) self.benchmarking_results = pd.DataFrame( { "dropped_covariate": backdoor_vars, "converted_est": new_ests, "converted_lower_ci": new_lo, "converted_upper_ci": new_hi, "observed_covariate_e_value": observed_covariate_e_values, } ).sort_values(by="observed_covariate_e_value", ascending=False) self.benchmarking_results = self.benchmarking_results.set_index("dropped_covariate")
def _observed_covariate_e_value(self, ci, new_ci): """ Computes Observed Covariate E-value given effect estimate from new model without a specific measured confounder. Based on: https://github.com/LucyMcGowan/tipr/blob/master/R/observed_covariate_e_value.R :param ci: limiting confidence bound from original model on risk ratio scale :param new_ci: limiting confidence bound from new model on risk ratio scale """ if ci < 1: ci = 1 / ci new_ci = 1 / new_ci if ci < new_ci: ratio = new_ci / ci else: ratio = ci / new_ci return ratio + np.sqrt(ratio * (ratio - 1)) def _evalue_OLS(self, est, se, sd, no_effect_baseline=0): """ Computes E-value from OLS coefficient. :param est: coefficient from OLS :param se: standard error of point estimate :param sd: residual standard deviation :param no_effect_baseline: no_effect_baseline standardized difference to which to shift the observed estimate. (Default = 0) """ if se < 0: raise ValueError("Standard error cannot be negative") delta = abs(self.estimate.estimator._treatment_value - self.estimate.estimator._control_value) est = self._to_smd(est, sd, delta=delta) se = self._to_smd( se, sd, delta=1 ) # already multiplied by delta in LinearRegressionEstimator._estimate_std_error() return self._evalue_MD(est=est, se=se, no_effect_baseline=no_effect_baseline) def _evalue_MD(self, est, se=None, no_effect_baseline=0): """ Computes E-value for standardized difference and its confidence limits. :param est: point estimate as standardized difference (i.e., Cohen's d) :param se: standard error of the point estimate :param no_effect_baseline: no_effect_baseline standardized difference to which to shift the observed estimate. (Default = 0) """ if se < 0: raise ValueError("Standard error cannot be negative") if se is None: lo = None hi = None else: # see Table 2 and p.37 in https://dash.harvard.edu/bitstream/handle/1/36874927/EValue_FinalSubmission.pdf lo = np.exp(0.91 * est - 1.78 * se) hi = np.exp(0.91 * est + 1.78 * se) est = self._md_to_rr(est) no_effect_baseline = self._md_to_rr(no_effect_baseline) return self._evalue_RR(est=est, lo=lo, hi=hi, no_effect_baseline=no_effect_baseline) def _evalue_OR(self, est, lo=None, hi=None, rare=None, no_effect_baseline=1): """ Computes E-value for an odds ratio and its confidence limits. :param est: point estimate :param lo: lower limit of confidence interval :param hi: upper limit of confidence interval :param rare: if outcome is rare (<15%) :param no_effect_baseline: the no_effect_baseline OR to which to shift the observed estimate. (Default = 1) """ if est < 0: raise ValueError("Odds Ratio cannot be negative") est = self._or_to_rr(est, rare) if lo is not None: lo = self._or_to_rr(lo, rare) if hi is not None: hi = self._or_to_rr(hi, rare) no_effect_baseline = self._or_to_rr(no_effect_baseline, rare) return self._evalue_RR(est=est, lo=lo, hi=hi, no_effect_baseline=no_effect_baseline) def _evalue_RR(self, est, lo=None, hi=None, no_effect_baseline=1): """ Computes E-value for a risk ratio or rate ratio and its confidence limits. :param est: point estimate :param lo: lower limit of confidence interval :param hi: upper limit of confidence interval :param no_effect_baseline: the no_effect_baseline RR to which to shift the observed estimate. (Default = 1) """ if est < 0: raise ValueError("Risk/Rate Ratio cannot be negative") if no_effect_baseline < 0: raise ValueError("no_effect_baseline value is impossible") if no_effect_baseline != 1: self.logger.info( 'You are calculating a "non-null" E-value, i.e., an E-value for the minimum amount of unmeasured ' "confounding needed to move the estimate and confidence interval to your specified no_effect_baseline value " "rather than to the null value." ) if lo is not None and hi is not None: if lo > hi: raise ValueError("Lower confidence limit should be less than upper confidence limit") if lo is not None and est < lo: raise ValueError("Point estimate should be inside confidence interval") if hi is not None and est > hi: raise ValueError("Point estimate should be inside confidence interval") e_est = self._threshold(est, no_effect_baseline=no_effect_baseline) e_lo = self._threshold(lo, no_effect_baseline=no_effect_baseline) e_hi = self._threshold(hi, no_effect_baseline=no_effect_baseline) # if CI crosses null, set its E-value to 1 null_CI = False if est > no_effect_baseline and lo is not None: null_CI = lo < no_effect_baseline if est < no_effect_baseline and hi is not None: null_CI = hi > no_effect_baseline if null_CI: e_lo = np.float64(1) e_hi = np.float64(1) # only report E-value for CI limit closer to null if lo is not None or hi is not None: if est > no_effect_baseline: e_hi = None else: e_lo = None return { "converted_estimate": est, "converted_lower_ci": lo, "converted_upper_ci": hi, "evalue_estimate": e_est, "evalue_lower_ci": e_lo, "evalue_upper_ci": e_hi, } def _threshold(self, x, no_effect_baseline=1): """ Computes E-value for single value of risk ratio. :param x: risk ratio :param no_effect_baseline: the no_effect_baseline RR to which to shift the observed estimate """ if x is None: return None if x <= 1: x = 1 / x no_effect_baseline = 1 / no_effect_baseline if no_effect_baseline <= x: return (x + np.sqrt(x * (x - no_effect_baseline))) / no_effect_baseline else: ratio = no_effect_baseline / x return ratio + np.sqrt(ratio * (ratio - 1)) def _to_smd(self, est, sd, delta=1): """ Converts estimate to standardized mean difference. :param est: estimate :param sd: residual standard deviation or standard deviation of outcome :param delta: contrast of interest in the treatment/exposure """ return est * delta / sd def _md_to_rr(self, est): """ Converts standardized mean difference to approximate risk ratio. :param est: estimate """ # see Table 2 and p.37 in https://dash.harvard.edu/bitstream/handle/1/36874927/EValue_FinalSubmission.pdf return np.exp(0.91 * est) def _hr_to_rr(self, est, rare): """ Converts hazard ratio to approximate risk ratio. :param est: estimate :param rare: if outcome is rare (<15%) """ if rare is None: raise ValueError('Must specify whether the rare outcome assumption can be made. Use argument "rare" =') if rare: return est else: return (1 - 0.5 ** np.sqrt(est)) / ((1 - 0.5 ** np.sqrt(1 / est))) def _or_to_rr(self, est, rare): """ Converts odds ratio to approximate risk ratio. :param est: estimate :param rare: if outcome is rare (<15%) """ if rare: return est else: return np.sqrt(est) def __str__(self): s = "Sensitivity Analysis to Unobserved Confounding using the E-value\n\n" s += f"Unadjusted Estimates of Treatment: {self.treatment_name}\n" s += f"Estimate (converted to risk ratio scale): {self.stats['converted_estimate']}\n" s += f"Lower 95% CI (converted to risk ratio scale): {self.stats['converted_lower_ci']}\n" s += f"Upper 95% CI (converted to risk ratio scale): {self.stats['converted_upper_ci']}\n\n" s += "Sensitivity Statistics: \n" s += f"E-value for point estimate: {self.stats['evalue_estimate']}\n" s += f"E-value for lower 95% CI: {self.stats['evalue_lower_ci']}\n" s += f"E-value for upper 95% CI: {self.stats['evalue_upper_ci']}\n" largest_obs_evalue = self.benchmarking_results["observed_covariate_e_value"].iloc[0] largest_obs_cov = self.benchmarking_results.index[0] s += f"Largest Observed Covariate E-value: {largest_obs_evalue} ({largest_obs_cov})\n\n" s += "Interpretation of results:\n" if self.stats["evalue_lower_ci"] is None: ci = self.stats["evalue_upper_ci"] direction = "decrease" else: ci = self.stats["evalue_lower_ci"] direction = "increase" s += ( f"Unmeasured confounder(s) would have to be associated with a {self.stats['evalue_estimate'].round(2)}-fold {direction} " f"in the risk of {self.outcome_name}, and must be {self.stats['evalue_estimate'].round(2)} times more prevalent in " f"{self.treatment_name}, to explain away the observed point estimate.\n" ) s += ( f"Unmeasured confounder(s) would have to be associated with a {ci.round(2)}-fold {direction} " f"in the risk of {self.outcome_name}, and must be {ci.round(2)} times more prevalent in " f"{self.treatment_name}, to explain away the observed confidence interval." ) return s