from typing import Callable, List, Optional
import networkx as nx
import numpy as np
import scipy.stats
import pywhy_graphs.networkx as pywhy_nx
from pywhy_graphs.classes import StationaryTimeSeriesDiGraph
from pywhy_graphs.classes.timeseries import numpy_to_tsgraph, tsgraph_to_numpy
from pywhy_graphs.typing import Node
def simulate_random_er_dag(
n_nodes: int, p: float = 0.5, seed: Optional[int] = None, ensure_acyclic: bool = False
):
"""Simulate a random Erdos-Renyi graph.
Parameters
----------
n_nodes : int
The number of nodes.
p : float, optional
The probability of an edge, by default 0.5.
seed : int, optional
Random seed, by default None.
ensure_acyclic : bool
Whether or not to ensure the resulting graph is acyclic (default is False).
If True, then we will only keep edges in their topological order starting
from the original simulated ER-graph that may be cyclic.
Returns
-------
graph : nx.DiGraph
The resulting graph.
"""
# Generate graph using networkx package
G = nx.gnp_random_graph(n=n_nodes, p=p, seed=seed, directed=True)
# Convert generated graph to DAG
if ensure_acyclic:
graph = nx.DiGraph()
graph.add_nodes_from(G)
graph.add_edges_from([(u, v, {}) for (u, v) in G.edges() if u < v])
if not nx.is_directed_acyclic_graph(graph):
raise RuntimeError("The resulting random graph should be a DAG...")
else:
graph = G
return graph
def simulate_random_tsgraph(
n_variables, max_lag, p_time_self=0.5, p_time_vars=0.5, p_contemporaneous=0.5, random_state=None
):
"""Simulate a random time-series graph.
Parameters
----------
n_variables : int
The number of variables.
max_lag : int
The maximum lag.
p_time_self : float, optional
Probability of auto-connection, by default 0.5.
p_time_vars : float, optional
Probability of connection between variables, by default 0.5.
p_contemporaneous : float, optional
Probability of contemporaneous edge, by default 0.5.
random_state : int, optional
Random seed, by default None.
Returns
-------
G : StationaryTimeSeriesDiGraph
The stationary time series graph.
"""
rng = np.random.default_rng(random_state)
# first we sample the time-series graph
node_names = list(range(n_variables))
G = StationaryTimeSeriesDiGraph(max_lag=max_lag)
G.add_variables_from(node_names)
# loop through all possible edge combinations from (x, 0) to (x', -lag)
# for lag up to max_lag
for non_lag_node in G.nodes_at(t=0):
for lag in range(max_lag + 1):
for lag_node in G.nodes_at(t=lag):
# then we are looking at a auto-lag nbr in the same variable
if non_lag_node[1] == lag_node[1] and p_contemporaneous > 0:
if rng.binomial(n=1, p=p_contemporaneous, size=None) == 1:
G.add_edge(lag_node, non_lag_node)
# check that the addition of this edge does not result in a cyclic
# causal relationship
if not nx.is_directed_acyclic_graph(G.subgraph(G.nodes_at(t=0))):
G.remove_edge(lag_node, non_lag_node)
elif non_lag_node[0] == lag_node[0] and p_time_self > 0:
# sample binomial distribution with probability p_time_self
if rng.binomial(n=1, p=p_time_self, size=None) == 1:
G.add_edge(lag_node, non_lag_node)
elif p_time_vars > 0:
if rng.binomial(n=1, p=p_time_vars, size=None) == 1:
G.add_edge(lag_node, non_lag_node)
return G
[docs]
def simulate_data_from_var(
var_arr: np.ndarray,
n_times: int = 1000,
n_realizations: int = 1,
var_names: Optional[List[Node]] = None,
random_state: Optional[int] = None,
):
"""Simulate data from an already set VAR process.
Parameters
----------
var_arr : ArrayLike of shape (n_variables, n_variables, max_lag + 1)
The stationary time-series vector-auto-regressive process. The 3rd dimension
is the lag and we assume that there are lag points ``(t=0, ..., t=-max_lag)``.
n_times : int, optional
Number of observations (time points) to simulate, this includes the initial
observations to start the autoregressive process. By default 1000.
n_realizations : int, optional
The number of independent realizations to generate the VAR process, by default 1.
var_names : list of nodes, optional
The variable names. If passed in, then must have length equal ``n_variables``. If passed in,
then the output will be converted to a pandas DataFrame with ``var_names`` as the
columns. By default, None.
random_state : int, optional
The random state, by default None.
Returns
-------
x : ArrayLike of shape (n_variables, n_times * n_realizations), or
pandas.DataFrame of shape (n_times * n_realizations, n_variables)
The simulated data. If ``node_names`` are passed in, then the output will be
converted to a pandas DataFrame.
Notes
-----
The simulated ``x`` array consists of multiple "instances" of the underlying stationary
VAR process. For example, if ``n_times`` is 1000, and ``max_lag = 2``, then technically you
have 500 realizations of the time-series graph occurring over this multivariate time-series.
However, each realization is dependent on the previous realizations in this case.
In order to start from a completely independent initial conditions, then one can modify the
``n_realizations`` parameter instead. To generate 500 independent realizations in the
above example, one would set ``n_realizations = 500`` and ``n_times=2``.
"""
import pandas as pd
if var_arr.shape[0] != var_arr.shape[1]:
raise ValueError(
f"The shape of the VAR array should be "
f"(n_variables, n_variables, max_lag). Your array dimensions are {var_arr.shape}."
)
n_vars, _, max_lag = var_arr.shape
# the 3rd dimension of the VAR array
max_lag = max_lag - 1
if var_names is not None and len(var_names) != n_vars:
raise ValueError(f"The passed in node names {var_names} should have {n_vars} variables.")
rng = np.random.default_rng(random_state)
# initialize the final data array for speed
x = np.zeros((n_vars, n_realizations * n_times))
for idx in range(n_realizations):
# sample the initial conditions
x0 = rng.standard_normal(size=(n_vars, max_lag))
x[:, :max_lag] = x0
# simulate forward in time
for tdx in range(max_lag, n_times):
# note that for a single realization, this is just 'tdx'
starting_point = (idx + 1) * tdx
ygen = x[:, starting_point]
# loop over the lags to generate the next sample as a linear
# combination of previous lag data points
for pdx in range(max_lag):
ygen += np.dot(var_arr[..., pdx], x[:, tdx - pdx - 1].T).T
# convert to a DataFrame, if node names are explicitly passed in
if var_names is not None:
x = pd.DataFrame(x.T, columns=var_names)
return x
[docs]
def simulate_linear_var_process(
n_variables: int = 5,
p_time_self: float = 0.5,
p_time_vars: float = 0.5,
p_contemporaneous: float = 0.5,
max_lag: int = 1,
n_times: int = 1000,
n_realizations: int = 1,
weight_dist: Callable = scipy.stats.norm,
random_state: Optional[int] = None,
):
"""Simulate a linear VAR process of a "stationary" causal graph.
Parameters
----------
n_variables : int, optional
The number of variables to included in the final simulation, by default 5.
p_time_self : float, optional
The probability for an edge across time for the same variable, by default 0.5.
p_time_vars : float, optional
The probability for an edge across time for different variables, by default 0.5.
p_contemporaneous : float, optional
The probability for a contemporaneous edge among different variables, by default 0.5.
Can be set to 0 to specify that the underlying causal process has no instantaneous
effects.
max_lag : int, optional
The maximum lag allowed in the resulting process, by default 1.
n_times : int, optional
The number of time points to generate per realization, by default 1000. See
`simulate_data_from_var` for more information.
n_realizations : int, optional
The number of independent realizations, by default 1. See `simulate_data_from_var` for
more information.
weight_dist : Callable, optional
The distribution of weights for connections, by default None.
random_state : int, optional
The random state, by default None.
Returns
-------
data : ArrayLike of shape (n_nodes, n_times * n_realizations)
The simulated data.
var_model : StationaryTimeSeriesDiGraph of shape (n_nodes, n_nodes, max_lag)
The resulting time-series causal graph.
Notes
-----
In stationary time-series graphs without latent confounders, there is never a worry
about acyclicity among "lagged nodes" in the graph. However, if we model the situation
where there are instantaenous, or contemporaneous effects, then those edges may form
a cycle when simulating the graph. Therefore, if ``p_contemporaneous > 0``, then an
additional check for DAG among the contemporaneous edge network is checked.
To simulate VAR process with latent confounders (i.e. missing variable time-series), then
one can simply simulate the full VAR process and then delete the simulated time-series data
from the latent variable.
"""
# first we sample the time-series graph
G = simulate_random_tsgraph(
n_variables,
max_lag=max_lag,
p_time_self=p_time_self,
p_time_vars=p_time_vars,
p_contemporaneous=p_contemporaneous,
random_state=random_state,
)
# then we convert this into an array of 1's and 0's
# we maintain a lagged-order of the nodes, so that way
# reshaping into a 3D array works properly
var_order = list(G.variables)
ts_graph_arr = tsgraph_to_numpy(G, var_order=var_order)
# reshape the array to the correct shape
# graph_arr = graph_arr.reshape((n_variables, n_variables, max_lag + 1))
nnz_index = np.nonzero(ts_graph_arr)
nnz = np.count_nonzero(ts_graph_arr)
# we sample weights from our weight distribution to fill
# in every non-zero index of our VAR array
weights = weight_dist.rvs(size=nnz) # type: ignore
ts_graph_arr[nnz_index] = weights
# our resulting VAR array is the function, which we will
# simulate our data, starting from random Gaussian initial conditions.
x = simulate_data_from_var(
var_arr=ts_graph_arr,
n_times=n_times,
n_realizations=n_realizations,
var_names=var_order,
random_state=random_state,
)
return x, G
[docs]
def simulate_var_process_from_summary_graph(
G: pywhy_nx.MixedEdgeGraph, max_lag=1, n_times=1000, random_state: Optional[int] = None
):
"""Simulate a VAR(max_lag) process starting from a summary graph.
Parameters
----------
G : nx.MixedEdgeGraph
A time-series summary graph.
max_lag : int, optional
The maximum time-lag to consider, by default 1, which corresponds
to a VAR(1) process.
n_times : int
Number of observations (time points) to simulate, this includes the initial
observations to start the autoregressive process. By default 1000.
random_state : int, optional
The random seed, by default None.
Returns
-------
x_df : pandas DataFrame of shape (n_nodes, n_times)
The sampled dataset.
var_arr : ArrayLike of shape (n_nodes, n_nodes, max_lag)
The stationary time-series graph.
Notes
-----
Right now, it is assumed that the summary graph is just a DAG.
"""
import pandas as pd
rng = np.random.default_rng(random_state)
n_nodes = G.number_of_nodes()
var_arr = np.zeros((n_nodes, n_nodes, max_lag + 1))
# get the non-zeros
# undir_graph = G.to_undirected()
# simulate weights of the weight matrix
summary_arr = np.zeros((n_nodes, n_nodes))
nodelist = list(G.nodes)
for _, graph in G.get_graphs().items():
# get the graph array
graph_arr = nx.to_numpy_array(graph, nodelist=nodelist, weight="weight")
non_zero_index = np.nonzero(graph_arr)
weights = rng.normal(size=(len(non_zero_index[0]),))
# set the weights in the summary graph
summary_arr[non_zero_index] = weights
# Now simulate across time-points. First initialize such that
# the edge between every time-point is there and reflective of the
# summary graph.
# Assume that every variable has an edge between time points
for i in range(max_lag + 1):
var_arr[..., i] = summary_arr
# convert VAR array to stationary time-series graph
G = numpy_to_tsgraph(var_arr, var_order=nodelist, create_using=StationaryTimeSeriesDiGraph)
# simulate initial conditions data
x0 = rng.standard_normal(size=(n_nodes, max_lag))
x = np.zeros((n_nodes, n_times))
x[:, :max_lag] = x0
# simulate forward in time
for tdx in range(max_lag, n_times):
ygen = x[:, tdx]
for pdx in range(max_lag):
ygen += np.dot(var_arr[..., pdx], x[:, tdx - pdx - 1].T).T
# convert to a DataFrame
x_df = pd.DataFrame(x.T, columns=nodelist)
return x_df, G