Source code for pywhy_graphs.functional.utils

import itertools
from typing import Optional

import networkx as nx
import numpy as np


def to_pgmpy_bayesian_network(G):
    """Convert a discrete graph to a pgmpy Bayesian network.

    Parameters
    ----------
    G : nx.DiGraph
        The discrete graph.

    Returns
    -------
    model : pgmpy.models.BayesianModel
        The Bayesian network.

    Notes
    -----
    This function allows us to use the pgmpy library for inference and sampling.
    For example, one can forward sample from the Bayesian network as follows:

    >>> from pgmpy.sampling import BayesianModelSampling
    >>> inference = BayesianModelSampling(model)
    >>> df = inference.forward_sample(size=5000)
    """
    if G.graph.get("functional") != "discrete":
        raise ValueError("Model is not a functional model.")

    from pgmpy.models import BayesianNetwork

    model = BayesianNetwork(G)
    for node in G.nodes:
        model.add_cpds(G.nodes[node]["cpd"])
    return model


def pre_compute_reduce_maps(G, variable):
    """
    Get probability array-maps for a node as function of conditional dependencies

    Parameters
    ----------
    G : nx.DiGraph
        The discrete graph.
    variable: Bayesian Model Node
        node of the Bayesian network

    Returns
    -------
    state_index : dictionary
        With probability array-index for node as function of conditional dependency values.
    index_to_weight : dictionary
        With mapping of probability array-index to probability array.
    """
    variable_cpd = get_cpd(G, variable)
    variable_evid = variable_cpd.variables[:0:-1]

    # compute all possible state combinations for the parent variables
    state_combinations = [
        tuple(sc)
        for sc in itertools.product(*[range(get_cardinality(G, var)) for var in variable_evid])
    ]

    # compute weights for all possible state combinations
    weights_list = np.array(
        [
            variable_cpd.reduce(
                list(zip(variable_evid, sc)), inplace=False, show_warnings=False
            ).values
            for sc in state_combinations
        ]
    )

    unique_weights, weights_indices = np.unique(weights_list, axis=0, return_inverse=True)

    # convert weights to index; make mapping of state to index
    state_to_index = dict(zip(state_combinations, weights_indices))

    # make mapping of index to weights
    index_to_weight = dict(enumerate(unique_weights))

    # return mappings of state to index, and index to weight
    return state_to_index, index_to_weight


def get_cpd(G, node):
    """Get the CPD associated with a node.

    Parameters
    ----------
    node : Node
        The node for which the CPD is to be returned.

    Returns
    -------
    cpd : TabularCPD
        The CPD associated with the node.
    """
    # Check if the node is present in the graph
    if node not in G:
        raise ValueError(f"Node {node} not present in the graph.")

    # Check if the graph is functional
    if not G.graph.get("functional", False) == "discrete":
        raise ValueError("Model is not a functional model.")

    cpd = G.nodes[node].get("cpd", None)
    if cpd is None:
        raise RuntimeError(f"No CPD associated with {node}.")
    return cpd


def get_cardinality(G, node):
    from pgmpy.factors.discrete import TabularCPD

    cpd: TabularCPD = get_cpd(G, node)
    return cpd.cardinality[0]


def check_discrete_model(G):
    """
    Check the model for various errors. This method checks for the following
    errors.

    * Checks if the sum of the probabilities for each state is equal to 1 (tol=0.01).
    * Checks if the CPDs associated with nodes are consistent with their parents.

    Parameters
    ----------
    G : DiGraph
        The graph to be checked.

    Returns
    -------
    check : boolean
        True if all the checks pass otherwise should throw an error.
    """
    from pgmpy.factors.continuous import ContinuousFactor
    from pgmpy.factors.discrete import TabularCPD

    if not G.graph.get("functional", False) == "discrete":
        raise ValueError("Model is not a functional model.")

    for node in G.nodes:
        cpd = get_cpd(G, node=node)

        # Check if a CPD is associated with every node.
        if cpd is None:
            raise ValueError(f"No CPD associated with {node}")

        # Check if the CPD is an instance of either TabularCPD or ContinuousFactor.
        elif isinstance(cpd, (TabularCPD, ContinuousFactor)):
            evidence = cpd.get_evidence()
            parents = G.predecessors(node)

            # Check if the evidence set of the CPD is same as its parents.
            if set(evidence) != set(parents):
                raise ValueError(
                    f"CPD associated with {node} doesn't have proper parents associated with it."
                )

            # Check if the values of the CPD sum to 1.
            if not cpd.is_valid_cpd():
                raise ValueError(
                    f"Sum or integral of conditional probabilities for node {node} "
                    f"is not equal to 1."
                )

            if len(set(cpd.variables) - set(cpd.state_names.keys())) > 0:
                raise ValueError(
                    f"CPD for {node} doesn't have state names defined for all the variables."
                )

        for index, node in enumerate(cpd.variables[1:]):
            parent_cpd = get_cpd(G, node)
            # Check if the evidence cardinality specified is same as parent's cardinality
            if parent_cpd.cardinality[0] != cpd.cardinality[1 + index]:
                raise ValueError(f"The cardinality of {node} doesn't match in it's child nodes.")
            # Check if the state_names are the same in parent and child CPDs.
            if parent_cpd.state_names[node] != cpd.state_names[node]:
                raise ValueError(f"The state names of {node} doesn't match in it's child nodes.")

    return True


[docs] def set_node_attributes_with_G(G1, G2, node): """Set node attributes in G1 using G2. Parameters ---------- G1 : Graph The target graph that is modified. G2 : Graph The source graph that is used to set the node attributes. node : Node The specific node to set. """ # get the node attributes of node in G2 src_node_attrs = G2.nodes(data=True)[node] # get the target node attrs target_node_attrs = G1.nodes(data=True)[node] # update the node attributes with respect target_node_attrs.update(src_node_attrs) nx.set_node_attributes(G1, {node: target_node_attrs}) return G1
def _preprocess_parameter_inputs( node_mean_lims, node_std_lims, edge_functions, edge_weight_lims, multi_domain: bool = False, n_domains: Optional[int] = None, ): """Helper function to preprocess common parameter inputs for sampling functional graphs. Nodes' exogenous variables are sampled from a Gaussian distribution. Edges are sampled, such that an additive linear model is assumed. Note the edge functions may be nonlinear, but how they are combined for each node as a function of its parents is linear. """ if node_mean_lims is None: node_mean_lims = [0, 0] if node_std_lims is None: node_std_lims = [1, 1] if edge_functions is None: edge_functions = [lambda x: x] if edge_weight_lims is None: edge_weight_lims = [1, 1] if not multi_domain: for param in [node_mean_lims, node_std_lims, edge_weight_lims]: if len(param) != 2: raise ValueError(f"{param} must be a list of length 2.") elif multi_domain: for param in [node_mean_lims, node_std_lims, edge_weight_lims]: if len(param) != n_domains: raise ValueError(f"{param} must be a list of length 2 or {n_domains} domains.") # if the parameters are not a list of length n_domains, then they must be a # list of length n_domains if len(node_mean_lims) != n_domains: node_mean_lims = [node_mean_lims] * n_domains # type: ignore if len(node_std_lims) != n_domains: node_std_lims = [node_std_lims] * n_domains # type: ignore if len(edge_weight_lims) != n_domains: edge_weight_lims = [edge_weight_lims] * n_domains # type: ignore return node_mean_lims, node_std_lims, edge_functions, edge_weight_lims def _preprocess_md_parameter_inputs( node_mean_lims, node_std_lims, edge_functions, edge_weight_lims, n_domains: int ): """Helper function to preprocess common parameter inputs for sampling functional graphs. Nodes' exogenous variables are sampled from a Gaussian distribution. Edges are sampled, such that an additive linear model is assumed. Note the edge functions may be nonlinear, but how they are combined for each node as a function of its parents is linear. Parameters are encoded as a 2D array with rows as the domains and columns as the lower and upper bound. Parameters ---------- node_mean_lims : ArrayLike of shape (n_domains, 2) A 2D array with rows as the domains and columns as the lower and upper bound of the node's exogenous variable mean. If None, then initialized to [0, 1] for every domain. node_std_lims : ArrayLike of shape (n_domains, 2) A 2D array with rows as the domains and columns as the lower and upper initialized of the node's exogenous variable standard deviation. If None, then initialized to [0.1, 1.0] for every domain. edge_functions : ArrayLike of shape (n_functions,) A set of different lambda functions. edge_weight_lims : ArrayLike of shape (n_domains, 2) A 2D array with rows as the domains and columns as the lower and upper bound of the edge weights that are used to combine the parents' values linearly. If None, then initialized to [-1, 1] for every domain. n_domains : int Number of domains. """ # initialize to default values if node_mean_lims is None: node_mean_lims = np.zeros((n_domains, 2)) node_mean_lims[:, 1] = 1 if node_std_lims is None: node_std_lims = np.zeros((n_domains, 2)) node_std_lims[:, 1] = 1 node_std_lims[:, 0] = 0.1 if edge_functions is None: edge_functions = [lambda x: x] if edge_weight_lims is None: edge_weight_lims = np.zeros((n_domains, 2)) edge_weight_lims[:, 0] = -1 edge_weight_lims[:, 1] = 1 for param in [node_mean_lims, node_std_lims, edge_weight_lims]: if len(param) != n_domains: raise ValueError(f"{param} must be a list of length 2 or {n_domains} domains.") return node_mean_lims, node_std_lims, edge_functions, edge_weight_lims