Source code for cana.boolean_network

# -*- coding: utf-8 -*-
"""
Boolean Network
================

Main class for Boolean network objects.

"""
#   Copyright (C) 2021 by
#   Rion Brattig Correia <rionbr@gmail.com>
#   Alex Gates <ajgates@indiana.edu>
#   Thomas Parmer <tjparmer@indiana.edu>
#   All rights reserved.
#   MIT license.
from collections import defaultdict
try:
    import cStringIO.StringIO as StringIO
except ImportError:
    from io import StringIO
import numpy as np
import networkx as nx
import random
import itertools
from cana.boolean_node import BooleanNode
import cana.bns as bns
from cana.control import fvs, mds, sc
from cana.utils import *
import warnings
import re
import copy


[docs]class BooleanNetwork: """ """ def __init__(self, name='', Nnodes=0, logic=None, sg=None, stg=None, stg_r=None, _ef=None, attractors=None, constants={}, Nconstants=0, keep_constants=False, bin2num=binstate_to_statenum, num2bin=statenum_to_binstate, verbose=False, *args, **kwargs): self.name = name # Name of the Network self.Nnodes = Nnodes # Number of Nodes self.logic = logic # A dict that contains the network logic {<id>:{'name':<string>,'in':<list-input-node-id>,'out':<list-output-transitions>},..} self._sg = sg # Structure-Graph (SG) self._stg = stg # State-Transition-Graph (STG) self._stg_r = stg_r # State-Transition-Graph Reachability dict (STG-R) self._eg = None # Effective Graph, computed from the effective connectivity self._attractors = attractors # Network Attractors # self.keep_constants = keep_constants # Keep/Include constants in some of the computations self.constants = constants # A dict that contains of constant variables in the network self.Nconstants = len(constants) # Number of constant variables # self.Nstates = 2**Nnodes # Number of possible states in the network 2^N # self.verbose = verbose # Intanciate BooleanNodes self.name2int = {logic[i]['name']: i for i in range(Nnodes)} self.Nself_loops = sum([self.name2int[logic[i]['name']] in logic[i]['in'] for i in range(Nnodes)]) self.nodes = list() for i in range(Nnodes): name = logic[i]['name'] k = len(logic[i]['in']) inputs = [self.name2int[logic[j]['name']] for j in logic[i]['in']] outputs = logic[i]['out'] node = BooleanNode(id=i, name=name, k=k, inputs=inputs, outputs=outputs, network=self) self.nodes.append(node) self.input_nodes = [i for i in range(Nnodes) if (self.nodes[i].constant or ((self.nodes[i].k == 1) and (i in self.nodes[i].inputs)))] # self.bin2num = bin2num # Helper function. Converts binstate to statenum. It gets updated by `_update_trans_func` self.num2bin = num2bin # Helper function. Converts statenum to binstate. It gets updated by `_update_trans_func` self._update_trans_func() # Updates helper functions and other variables def __str__(self): node_names = [node.name for node in self.nodes] return "<BNetwork(name='{name:s}', N={number_of_nodes:d}, Nodes={nodes:})>".format(name=self.name, number_of_nodes=self.Nnodes, nodes=node_names) # # I/O Methods #
[docs] @classmethod def from_file(self, file, type='cnet', keep_constants=True, **kwargs): """ Load the Boolean Network from a file. Args: file (string) : The name of a file containing the Boolean Network. type (string) : The type of file, either 'cnet' (default) or 'logical' for Boolean logical rules. Returns: BooleanNetwork (object) : The boolean network object. See also: :func:`from_string` :func:`from_dict` """ with open(file, 'r') as infile: if type == 'cnet': return self.from_string_cnet(infile.read(), keep_constants=keep_constants, **kwargs) elif type == 'logical': return self.from_string_boolean(infile.read(), keep_constants=keep_constants, **kwargs)
[docs] @classmethod def from_string_cnet(self, string, keep_constants=True, **kwargs): """ Instanciates a Boolean Network from a string in cnet format. Args: string (string): A cnet format representation of a Boolean Network. Returns: (BooleanNetwork) Examples: String should be structured as follow: .. code-block:: text #.v = number of nodes .v 1 #.l = node label .l 1 node-a .l 2 node-b #.n = (node number) (in-degree) (input node 1) … (input node k) .n 1 2 4 5 01 1 # transition rule See also: :func:`from_file` :func:`from_dict` """ network_file = StringIO(string) logic = defaultdict(dict) line = network_file.readline() while line != "": if line[0] != '#' and line != '\n': # .v <#-nodes> if '.v' in line: Nnodes = int(line.split()[1]) for inode in range(Nnodes): logic[inode] = {'name': '', 'in': [], 'out': []} # .l <node-id> <node-name> elif '.l' in line: logic[int(line.split()[1]) - 1]['name'] = line.split()[2] # .n <node-id> <#-inputs> <input-node-id> elif '.n' in line: inode = int(line.split()[1]) - 1 indegree = int(line.split()[2]) for jnode in range(indegree): logic[inode]['in'].append(int(line.split()[3 + jnode]) - 1) logic[inode]['out'] = [0 for i in range(2**indegree) if indegree > 0] logic_line = network_file.readline().strip() if indegree <= 0: if logic_line == '': logic[inode]['in'] = [inode] logic[inode]['out'] = [0, 1] else: logic[inode]['out'] = [int(logic_line)] else: while logic_line != '\n' and logic_line != '' and len(logic_line) > 1: for nlogicline in expand_logic_line(logic_line): logic[inode]['out'][binstate_to_statenum(nlogicline.split()[0])] = int(nlogicline.split()[1]) logic_line = network_file.readline().strip() # .e = end of file elif '.e' in line: break line = network_file.readline() return self.from_dict(logic, keep_constants=keep_constants, **kwargs)
[docs] @classmethod def from_string_boolean(self, string, keep_constants=True, **kwargs): """ Instanciates a Boolean Network from a Boolean update rules format. Args: string (string) : A boolean update rules format representation of a Boolean Network. Returns: (BooleanNetwork) : The boolean network object. Examples: String should be structured as follow .. code-block:: text # BOOLEAN RULES (this is a comment) # node_name*=node_input_1 [logic operator] node_input_2 ... NODE3*=NODE1 AND NODE2 ... See also: :func:`from_string` :func:`from_dict` """ logic = defaultdict(dict) # parse lines to receive node names network_file = StringIO(string) line = network_file.readline() i = 0 while line != "": if line[0] == '#': line = network_file.readline() continue logic[i] = {'name': line.split("*")[0].strip(), 'in': [], 'out': []} line = network_file.readline() i += 1 # Parse lines again to determine inputs and output sequence network_file = StringIO(string) line = network_file.readline() i = 0 while line != "": if line[0] == '#': line = network_file.readline() continue eval_line = line.split("=")[1] # logical condition to evaluate # RE checks for non-alphanumeric character before/after node name (node names are included in other node names) # Additional characters added to eval_line to avoid start/end of string complications input_names = [logic[node]['name'] for node in logic if re.compile('\W' + logic[node]['name'] + '\W').search('*' + eval_line + '*')] input_nums = [node for input in input_names for node in logic if input == logic[node]['name']] logic[i]['in'] = input_nums # Determine output transitions logic[i]['out'] = output_transitions(eval_line, input_names) line = network_file.readline() i += 1 return self.from_dict(logic, keep_constants=keep_constants, **kwargs)
[docs] @classmethod def from_dict(self, logic, keep_constants=True, **kwargs): """Instanciaets a BoolleanNetwork from a logic dictionary. Args: logic (dict) : The logic dict. keep_constants (bool) : Returns: (BooleanNetwork) See also: :func:`from_file` :func:`from_dict` """ Nnodes = len(logic) keep_constants = keep_constants constants = {} if 'name' in kwargs: name = kwargs['name'] else: name = '' if keep_constants: for i, nodelogic in logic.items(): # No inputs? It's a constant! if len(nodelogic['in']) == 0: constants[i] = logic[i]['out'][0] return BooleanNetwork(name=name, logic=logic, Nnodes=Nnodes, constants=constants, keep_constants=keep_constants)
[docs] def to_cnet(self, file=None, adjust_no_input=False): """Outputs the network logic to ``.cnet`` format, which is similar to the Berkeley Logic Interchange Format (BLIF). This is the format used by BNS to compute attractors. Args: file (string,optional) : A string of the file to write the output to. If not supplied, a string will be returned. adjust_no_input (bool) : Adjust output string for nodes with no input. Returns: (string) : The ``.cnet`` format string. Note: See `BNS <https://people.kth.se/~dubrova/bns.html>`_ for more information. """ # Copy logic = self.logic.copy() # if adjust_no_input: for i, data in logic.items(): # updates in place if len(data['in']) == 0: data['in'] = [i + 1] data['out'] = [0, 1] bns_string = '.v ' + str(self.Nnodes) + '\n' + '\n' for i in range(self.Nnodes): k = len(logic[i]['in']) bns_string += '.n ' + str(i + 1) + " " + str(k) + " " + " ".join([str(v + 1) for v in logic[i]['in']]) + "\n" for statenum in range(2**k): # If is a constant (TODO: This must come from the BooleanNode, not the logic) if len(logic[i]['out']) == 1: bns_string += str(logic[i]['out'][statenum]) + "\n" # Not a constant, print the state and output else: bns_string += statenum_to_binstate(statenum, base=k) + " " + str(logic[i]['out'][statenum]) + "\n" bns_string += "\n" if file is None: return bns_string else: if isinstance(file, string): with open(file, 'w') as iofile: iofile.write(bns_string) iofile.close() else: raise AttributeError("File format not supported. Please specify a string.")
# # Methods #
[docs] def structural_graph(self, remove_constants=False): """Calculates and returns the structural graph of the boolean network. Args: remove_constants (bool) : Remove constants from the graph. Defaults to ``False``. Returns: G (networkx.Digraph) : The boolean network structural graph. """ self._sg = nx.DiGraph(name="Structural Graph: " + self.name) # Add Nodes self._sg.add_nodes_from((i, {'label': n.name}) for i, n in enumerate(self.nodes)) for target in range(self.Nnodes): for source in self.logic[target]['in']: self._sg.add_edge(source, target, **{'weight': 1.}) if remove_constants: self._sg.remove_nodes_from(self.constants.keys()) # return self._sg
[docs] def number_interactions(self): """Returns the number of interactions in the Structural Graph (SG). Practically, it returns the number of edges of the SG. Returns: int """ self._check_compute_variables(sg=True) return nx.number_of_edges(self._sg)
[docs] def structural_indegrees(self): """Returns the in-degrees of the Structural Graph. Sorted. Returns: (int) : the number of in-degrees. See also: :func:`structural_outdegrees`, :func:`effective_indegrees`, :func:`effective_outdegrees` """ self._check_compute_variables(sg=True) return sorted([d for n, d in self._sg.in_degree()], reverse=True)
[docs] def structural_outdegrees(self): """Returns the out-degrees of the Structural Graph. Sorted. Returns: (list) : out-degrees. See also: :func:`structural_indegrees`, :func:`effective_indegrees`, :func:`effective_outdegrees` """ self._check_compute_variables(sg=True) return sorted([d for n, d in self._sg.out_degree()], reverse=True)
[docs] def effective_graph(self, bound='mean', threshold=None): """Computes and returns the effective graph of the network. In practive it asks each :class:`~cana.boolean_node.BooleanNode` for their :func:`~cana.boolean_node.BooleanNode.edge_effectiveness`. Args: bound (string) : The bound to which compute input redundancy. Can be one of : ["lower", "mean", "upper", "tuple"]. Defaults to "mean". threshold (float) : Only return edges above a certain effective connectivity threshold. This is usefull when computing graph measures at diffent levels. Returns: (networkx.DiGraph) : directed graph See Also: :func:`~cana.boolean_node.BooleanNode.edge_effectiveness` """ if threshold is not None: self._eg = nx.DiGraph(name="Effective Graph: " + self.name + "(Threshold: {threshold:.2f})".format(threshold=threshold)) else: self._eg = nx.DiGraph(name="Effective Graph: " + self.name + "(Threshold: None)") # Add Nodes for i, node in enumerate(self.nodes, start=0): self._eg.add_node(i, **{'label': node.name}) # Add Edges for i, node in enumerate(self.nodes, start=0): e_is = node.edge_effectiveness(bound=bound) for inputs, e_i in zip(self.logic[i]['in'], e_is): # If there is a threshold, only return those number above the threshold. Else, return all edges. if (threshold is None) or ((threshold is not None) and (e_i > threshold)): self._eg.add_edge(inputs, i, **{'weight': e_i}) return self._eg
[docs] def conditional_effective_graph(self, conditioned_nodes={}, bound='mean', threshold=None): """Computes and returns the BN effective graph conditioned on some known states. Args: conditioned_nodes (dict) : a dictionary mapping node ids to their conditioned states. dict of form { nodeid : nodestate } bound (string) : The bound to which compute input redundancy. Can be one of : ["lower", "mean", "upper", "tuple"]. Defaults to "mean". threshold (float) : Only return edges above a certain effective connectivity threshold. This is usefull when computing graph measures at diffent levels. Returns: (networkx.DiGraph) : directed graph See Also: :func:`~cana.boolean_network.BooleanNetwork.effective_graph` """ conditional_eg = copy.deepcopy(self.effective_graph(bound=bound, threshold=None)) conditioned_subgraph = set([]) # make a copy of the logic dict so we can edit it conditioned_logic = copy.deepcopy(self.logic) # separate input conditioned nodes and nodes that get conditioned all_conditioned_nodes = dict(conditioned_nodes) # Queue of nodes to condition nodes2condition = list(conditioned_nodes.keys()) while len(nodes2condition) > 0: conditioned_node = nodes2condition.pop(0) conditioned_value = str(all_conditioned_nodes[conditioned_node]) conditioned_subgraph.add(conditioned_node) # take all successors of the conditioned node ignoring self-loops successors = [n for n in list(conditional_eg.neighbors(conditioned_node)) if n != conditioned_node] conditioned_subgraph.update(successors) # we have to loop through all of the successors of the conditioned node and change their logic for n in successors: # find the index of the conditioned node in the successor logic conditioned_node_idx = conditioned_logic[n]['in'].index(conditioned_node) conditioned_subgraph.update(conditioned_logic[n]['in']) # the new successor inputs without the conditioned node new_successor_inputs = conditioned_logic[n]['in'][:conditioned_node_idx] + conditioned_logic[n]['in'][(conditioned_node_idx + 1):] newk = len(new_successor_inputs) # now we create a conditioned LUT as the subset of the original for which the conditioned node is fixed to its value if newk == 0: new_successor_outputs = [conditioned_logic[n]['out'][binstate_to_statenum(conditioned_value)]] * 2 else: new_successor_outputs = [] for sn in range(2**newk): binstate = statenum_to_binstate(sn, newk) binstate = binstate[:conditioned_node_idx] + conditioned_value + binstate[conditioned_node_idx:] new_successor_outputs.append(conditioned_logic[n]['out'][binstate_to_statenum(binstate)]) # use the new logic to calcuate a new edge effectiveness new_edge_effectiveness = BooleanNode().from_output_list(new_successor_outputs).edge_effectiveness(bound=bound) # and update the conditional effective graph with the new edge effectiveness values for i in range(newk): conditional_eg[new_successor_inputs[i]][n]['weight'] = new_edge_effectiveness[i] # now update the conditioned_logic in case these nodes are further modified by additional conditioned variables conditioned_logic[n]['in'] = new_successor_inputs conditioned_logic[n]['out'] = new_successor_outputs # check if we just made a constant node if n not in all_conditioned_nodes and len(set(new_successor_outputs)) == 1: # in which case, add it to the conditioned set and propagate the conditioned effect nodes2condition.append(n) all_conditioned_nodes[n] = new_successor_outputs[0] conditional_eg.name = "Conditioned Effective Graph: {name:s} conditioned on {nodes:s}".format(name=self.name, nodes=str(conditioned_nodes)) if threshold is None: conditional_eg.name = conditional_eg.name + " (Threshold: None)" else: conditional_eg.name = conditional_eg.name + " (Threshold: {threshold:.2f})".format(threshold=threshold) remove_edges = [(i, j) for i, j, d in conditional_eg.edges(data=True) if d['weight'] <= threshold] conditional_eg.remove_edges_from(remove_edges) # add the conditional information into the effective graph object dict_conditioned_subgraph = {n: (n in conditioned_subgraph) for n in conditional_eg.nodes()} nx.set_node_attributes(conditional_eg, values=dict_conditioned_subgraph, name='conditioned_subgraph') dict_all_conditioned_nodes = {n: all_conditioned_nodes.get(n, None) for n in conditional_eg.nodes()} nx.set_node_attributes(conditional_eg, values=dict_all_conditioned_nodes, name='conditioned_state') return conditional_eg
[docs] def effective_indegrees(self): """Returns the in-degrees of the Effective Graph. Sorted. Returns: (list) See also: :func:`effective_outdegrees`, :func:`structural_indegrees`, :func:`structural_outdegrees` """ self._check_compute_variables(eg=True) return sorted([d for n, d in self._eg.in_degree()], reverse=True)
[docs] def effective_outdegrees(self): """Returns the out-degrees of the Effective Graph. Sorted. Returns: (list) See also: :func:`effective_indegrees`, :func:`structural_indegrees`, :func:`structural_outdegrees` """ self._check_compute_variables(eg=True) return sorted([d for n, d in self._eg.out_degree()], reverse=True)
[docs] def activity_graph(self, threshold=None): """ Returns the activity graph as proposed in Ghanbarnejad & Klemm (2012) EPL, 99 Args: threshold (float) : Only return edges above a certain activity threshold. This is usefull when computing graph measures at diffent levels. Returns: (networkx.DiGraph) : directed graph """ if threshold is not None: act_g = nx.DiGraph(name="Activity Graph: " + self.name + "(Threshold: %.2f)" % threshold) else: act_g = nx.DiGraph(name="Activity Graph: " + self.name + "(Threshold: None)") # Add Nodes for i, node in enumerate(self.nodes, start=0): act_g.add_node(i, **{'label': node.name}) # Add Edges for i, node in enumerate(self.nodes, start=0): a_is = node.activities() for inputs, a_i in zip(self.logic[i]['in'], a_is): # If there is a threshold, only return those number above the threshold. Else, return all edges. if ((threshold is None) and (a_i > 0)) or ((threshold is not None) and (a_i > threshold)): act_g.add_edge(inputs, i, **{'weight': a_i}) return act_g
[docs] def state_transition_graph(self): """Creates and returns the full State Transition Graph (STG) for the Boolean Network. Returns: (networkx.DiGraph) : The state transition graph for the Boolean Network. """ self._stg = nx.DiGraph(name='STG: ' + self.name) self._stg.add_nodes_from((i, {'label': self.num2bin(i)}) for i in range(self.Nstates)) for i in range(self.Nstates): b = self.num2bin(i) self._stg.add_edge(i, self.bin2num(self.step(b))) # return self._stg
[docs] def stg_indegree(self): """Returns the In-degrees of the State-Transition-Graph (STG). Sorted. Returns: list """ self._check_compute_variables(stg=True) return sorted(self._stg.in_degree().values(), reverse=True)
[docs] def step(self, initial): """Steps the boolean network 'n' step from the given initial input condition. Args: initial (string) : the initial state. n (int) : the number of steps. Returns: (string) : The stepped binary state. """ # for every node: # node input = breaks down initial by node input # asks node to step with the input # append output to list # joins the results from each node output assert len(initial) == self.Nnodes return ''.join([node.step(node.input_mask(initial)) for node in self.nodes])
[docs] def trajectory(self, initial, length=2): """Computes the trajectory of ``length`` steps without the State Transition Graph (STG).""" trajectory = [initial] for istep in range(length): trajectory.append(self.step(trajectory[-1])) return trajectory
[docs] def trajectory_to_attractor(self, initial, precompute_attractors=True, return_attractor=False): """Computes the trajectory starting at `initial` until it reaches an attracor (this is garanteed). Args: initial (string) : the initial binstate. precompute_attractors (bool) : use precomputed attractors, default True. return_attractor (bool) : also return the attractor reached, default False. Returns: (list) : the state trajectory between initial and the final attractor state. if return_attractor: (list): the attractor """ # if the attractors are already precomputed, then we can check when we reach a known state if precompute_attractors: self._check_compute_variables(attractors=True) attractor_states = [self.num2bin(s) for att in self._attractors for s in att] trajectory = [initial] while (trajectory[-1] not in attractor_states): trajectory.append(self.step(trajectory[-1])) if return_attractor: attractor = self.attractor(trajectory[-1]) else: trajectory = [initial] while (trajectory[-1] not in trajectory[:-1]): trajectory.append(self.step(trajectory[-1])) # the attractor starts at the first occurence of the element idxatt = trajectory.index(trajectory[-1]) if return_attractor: attractor = [self.bin2num(s) for s in trajectory[idxatt:-1]] trajectory = trajectory[:(idxatt + 1)] if return_attractor: return trajectory, attractor else: return trajectory
[docs] def attractor(self, initial): """Computes the trajectory starting at ``initial`` until it reaches an attracor (this is garanteed) Args: initial (string): the initial state. Returns: attractor (string): the atractor state. """ self._check_compute_variables(attractors=True) trajectory = self.trajectory_to_attractor(initial) for attractor in self._attractors: if self.bin2num(trajectory[-1]) in attractor: return attractor
[docs] def attractors(self, mode='stg'): """Find the attractors of the boolean network. Args: mode (string) : ``stg`` or ``sat``. Defaults to ``stg``. ``stg``: Uses the full State Transition Graph (STG) and identifies the attractors as strongly connected components. ``bns``: Uses the SAT-based :mod:`cana.bns` to find all attractors. Returns: attractors (list) : A list containing all attractors for the boolean network. See also: :mod:`cana.bns` """ self._check_compute_variables(stg=True) if mode == 'stg': self._attractors = [list(a) for a in nx.attracting_components(self._stg)] elif mode == 'bns': self._attractors = bns.attractors(self.to_cnet(file=None, adjust_no_input=False)) else: raise AttributeError("Could not find the specified mode. Try 'stg' or 'bns'.") self._attractors.sort(key=len, reverse=True) return self._attractors
[docs] def network_bias(self): """Network Bias. The sum of individual node biases divided by the number of nodes. Practically, it asks each node for their own bias. .. math: TODO See Also: :func:`~cana.boolean_node.BooleanNode.bias` """ return sum([node.bias() for node in self.nodes]) / self.Nnodes
[docs] def basin_entropy(self, base=2): """ """ self._check_compute_variables(stg=True) prob_vec = np.array([len(wcc) for wcc in nx.weakly_connected_components(self._stg)]) / 2.0**self.Nnodes return entropy(prob_vec, base=base)
[docs] def set_constant(self, node, value=None): """Sets or unsets a node as a constant. Args: node (int) : The node ``id`` in the logic dict. Todo: This functions needs to better handle node_id and node_name """ if value is not None: self.nodes[node].constant = True self.nodes[node].constant_value = value self.Nconstants += 1 else: self.nodes[node].constant = False self.nodes[node].constant_value = value self.Nconstants -= 1 self._update_trans_func()
def remove_all_constants(self): self.keep_constants = False for inode in self.constants: self.set_constant(inode, None) def _update_trans_func(self): """ """ if self.keep_constants: self.Nstates = 2**(self.Nnodes - self.Nconstants) constant_template = [None if not (ivar in self.constants.keys()) else self.constants[ivar] for ivar in range(self.Nnodes)] self.bin2num = lambda bs: constantbinstate_to_statenum(bs, constant_template) self.num2bin = lambda sn: binstate_to_constantbinstate( statenum_to_binstate(sn, base=self.Nnodes - self.Nconstants), constant_template) else: self.Nstates = 2**self.Nnodes self.bin2num = binstate_to_statenum self.num2bin = lambda sn: statenum_to_binstate(sn, base=self.Nnodes) # # Dynamical Control Methods #
[docs] def state_transition_graph_reachability(self, filename=None): """Generates a State-Transition-Graph Reachability (STG-R) dictionary. This dict/file will be used by the State Transition Graph Control Analysis. Args: filename (string) : The location to a file where the STG-R will be stored. Returns: (dict) : The STG-R in dict format. """ self._check_compute_variables(stg=True) self._stg_r = {} if (filename is None): for source in self._stg: self._stg_r[source] = len(self._dfs_reachable(self._stg, source)) - 1.0 else: try: with open(filename, 'rb') as handle: self._stg_r = pickle.load(handle) except IOError: print("Finding STG dict") for source in self._stg: self._stg_r[source] = len(self._dfs_reachable(self._stg, source)) - 1.0 with open(filename, 'wb') as handle: pickle.dump(self._stg_r, handle) return self._stg_r
[docs] def attractor_driver_nodes(self, min_dvs=1, max_dvs=4, verbose=False): """Get the minimum necessary driver nodes by iterating the combination of all possible driver nodes of length :math:`min <= x <= max`. Args: min_dvs (int) : Mininum number of driver nodes to search. max_dvs (int) : Maximum number of driver nodes to search. Returns: (list) : The list of driver nodes found in the search. Note: This is an inefficient bruit force search, maybe we can think of better ways to do this? TODO: Parallelize the search on each combination. Each CSTG is independent and can be searched in parallel. See also: :func:`controlled_state_transition_graph`, :func:`controlled_attractor_graph`. """ nodeids = list(range(self.Nnodes)) if self.keep_constants: for cv in self.constants.keys(): nodeids.remove(cv) attractor_controllers_found = [] nr_dvs = min_dvs while (len(attractor_controllers_found) == 0) and (nr_dvs <= max_dvs): if verbose: print("Trying with {:d} Driver Nodes".format(nr_dvs)) for dvs in itertools.combinations(nodeids, nr_dvs): dvs = list(dvs) # cstg = self.controlled_state_transition_graph(dvs) cag = self.controlled_attractor_graph(dvs) att_reachable_from = self.mean_reachable_attractors(cag) if att_reachable_from == 1.0: attractor_controllers_found.append(dvs) # Add another driver node nr_dvs += 1 if len(attractor_controllers_found) == 0: warnings.warn("No attractor control driver variable sets found after exploring all subsets of size {:,d} to {:,d} nodes!!".format(min_dvs, max_dvs)) return attractor_controllers_found
[docs] def controlled_state_transition_graph(self, driver_nodes=[]): """Returns the Controlled State-Transition-Graph (CSTG). In practice, it copies the original STG, flips driver nodes (variables), and updates the CSTG. Args: driver_nodes (list) : The list of driver nodes. Returns: (networkx.DiGraph) : The Controlled State-Transition-Graph. See also: :func:`attractor_driver_nodes`, :func:`controlled_attractor_graph`. """ self._check_compute_variables(attractors=True) if self.keep_constants: for dv in driver_nodes: if dv in self.constants: warnings.warn("Cannot control a constant variable '%s'! Skipping" % self.nodes[dv].name ) # attractor_states = [s for att in self._attractors for s in att] cstg = copy.deepcopy(self._stg) cstg.name = 'C-' + cstg.name + ' (' + ','.join(map(str, [self.nodes[dv].name for dv in driver_nodes])) + ')' # add the control pertubations applied to all other configurations for statenum in range(self.Nstates): binstate = self.num2bin(statenum) controlled_states = flip_binstate_bit_set(binstate, copy.copy(driver_nodes)) controlled_states.remove(binstate) for constate in controlled_states: cstg.add_edge(statenum, self.bin2num(constate)) return cstg
[docs] def pinning_controlled_state_transition_graph(self, driver_nodes=[]): """Returns a dictionary of Controlled State-Transition-Graph (CSTG) under the assumptions of pinning controllability. In practice, it copies the original STG, flips driver nodes (variables), and updates the CSTG. Args: driver_nodes (list) : The list of driver nodes. Returns: (networkx.DiGraph) : The Pinning Controlled State-Transition-Graph. See also: :func:`controlled_state_transition_graph`, :func:`attractor_driver_nodes`, :func:`controlled_attractor_graph`. """ self._check_compute_variables(attractors=True) if self.keep_constants: for dv in driver_nodes: if dv in self.constants: warnings.warn("Cannot control a constant variable {dv:s}'! Skipping".format(dv=self.nodes[dv].name)) uncontrolled_system_size = self.Nnodes - len(driver_nodes) pcstg_dict = {} for att in self._attractors: dn_attractor_transitions = [tuple(''.join([self.num2bin(s)[dn] for dn in driver_nodes]) for s in att_edge) for att_edge in self._stg.subgraph(att).edges()] pcstg_states = [self.bin2num(binstate_pinned_to_binstate( statenum_to_binstate(statenum, base=uncontrolled_system_size), attsource, pinned_var=driver_nodes)) for statenum in range(2**uncontrolled_system_size) for attsource, attsink in dn_attractor_transitions] pcstg = nx.DiGraph(name='STG: ' + self.name) pcstg.name = 'PC-' + pcstg.name + ' (' + ','.join(map(str, [self.nodes[dv].name for dv in driver_nodes])) + ')' pcstg.add_nodes_from((ps, {'label': ps}) for ps in pcstg_states) for attsource, attsink in dn_attractor_transitions: for statenum in range(2**uncontrolled_system_size): initial = binstate_pinned_to_binstate(statenum_to_binstate(statenum, base=uncontrolled_system_size), attsource, pinned_var=driver_nodes) pcstg.add_edge(self.bin2num(initial), self.bin2num(self.pinned_step(initial, pinned_binstate=attsink, pinned_var=driver_nodes))) pcstg_dict[tuple(att)] = pcstg return pcstg_dict
[docs] def pinned_step(self, initial, pinned_binstate, pinned_var): """Steps the boolean network 1 step from the given initial input condition when the driver variables are pinned to their controlled states. Args: initial (string) : the initial state. n (int) : the number of steps. Returns: (string) : The stepped binary state. """ # for every node: # node input = breaks down initial by node input # asks node to step with the input # append output to list # joins the results from each node output assert len(initial) == self.Nnodes return ''.join([str(node.step(''.join(initial[j] for j in self.logic[i]['in']))) if not (i in pinned_var) else initial[i] for i, node in enumerate(self.nodes, start=0)])
[docs] def controlled_attractor_graph(self, driver_nodes=[]): """ Args: cstg (networkx.DiGraph) : A Controlled State-Transition-Graph (CSTG) Returns: (networkx.DiGraph) : The Controlled Attractor Graph (CAG) See also: :func:`attractor_driver_nodes`, :func:`controlled_state_transition_graph`. """ self._check_compute_variables(attractors=True) if self.keep_constants: for dv in driver_nodes: if dv in self.constants: warnings.warn("Cannot control a constant variable '%s'! Skipping" % self.nodes[dv].name ) attractor_states = [s for att in self._attractors for s in att] cstg = copy.deepcopy(self._stg) cstg.name = 'C-' + cstg.name + ' Att(' + ','.join(map(str, [self.nodes[dv].name for dv in driver_nodes])) + ')' # add the control pertubations applied to only attractor configurations for statenum in attractor_states: binstate = self.num2bin(statenum) controlled_states = flip_binstate_bit_set(binstate, copy.copy(driver_nodes)) controlled_states.remove(binstate) for constate in controlled_states: cstg.add_edge(statenum, self.bin2num(constate)) Nattract = len(self._attractors) cag = nx.DiGraph(name='CAG: ' + cstg.name) # Nodes for i, attr in enumerate(self._attractors): cag.add_node(i, **{'label': '|'.join([self.num2bin(a) for a in attr])}) # Edges for i in range(Nattract): ireach = self._dfs_reachable(cstg, self._attractors[i][0]) for j in range(i + 1, Nattract): if self._attractors[j][0] in ireach: cag.add_edge(i, j) if self._attractors[i][0] in self._dfs_reachable(cstg, self._attractors[j][0]): cag.add_edge(j, i) return cag
[docs] def mean_reachable_configurations(self, cstg): """Returns the Mean Fraction of Reachable Configurations Args: cstg (networkx.DiGraph) : The Controlled State-Transition-Graph. Returns: (float) : Mean Fraction of Reachable Configurations """ reachable_from = [] for source in cstg: control_reach = len(self._dfs_reachable(cstg, source)) - 1.0 reachable_from.append(control_reach) norm = (2.0**self.Nnodes - 1.0) * len(reachable_from) reachable_from = sum(reachable_from) / (norm) return reachable_from
[docs] def mean_controlable_configurations(self, cstg): """The Mean Fraction of Controlable Configurations Args: cstg (networkx.DiGraph) : The Controlled State-Transition-Graph. Returns: (float) : Mean Fraction of Controlable Configurations. """ self._check_compute_variables(stg_r=True) control_from, reachable_from = [], [] for source in cstg: control_reach = len(self._dfs_reachable(cstg, source)) - 1.0 control_from.append(control_reach - self._stg_r[source]) reachable_from.append(control_reach) norm = (2.0**self.Nnodes - 1.0) * len(reachable_from) control_from = sum(control_from) / (norm) return control_from
[docs] def mean_reachable_attractors(self, cag, norm=True): """The Mean Fraction of Reachable Attractors to a specific Controlled Attractor Graph (CAG). Args: cag (networkx.DiGraph) : A Controlled Attractor Graph (CAG). Returns: (float) Mean Fraction of Reachable Attractors """ att_norm = (float(len(cag)) - 1.0) * len(cag) if att_norm == 0: # if there is only one attractor everything is reachable att_reachable_from = 1 else: # otherwise find the reachable from each attractor att_reachable_from = [len(self._dfs_reachable(cag, idxatt)) - 1.0 for idxatt in cag] att_reachable_from = sum(att_reachable_from) / (att_norm) return att_reachable_from
[docs] def fraction_pinned_attractors(self, pcstg_dict): """Returns the Number of Accessible Attractors Args: pcstg_dict (dict of networkx.DiGraph) : The dictionary of Pinned Controlled State-Transition-Graphs. Returns: (int) : Number of Accessible Attractors """ reached_attractors = [] for att, pcstg in pcstg_dict.items(): pinned_att = list(nx.attracting_components(pcstg)) print(set(att), pinned_att) reached_attractors.append(set(att) in pinned_att) return sum(reached_attractors) / float(len(pcstg_dict))
[docs] def fraction_pinned_configurations(self, pcstg_dict): """Returns the Fraction of successfully Pinned Configurations Args: pcstg_dict (dict of networkx.DiGraph) : The dictionary of Pinned Controlled State-Transition-Graphs. Returns: (list) : the Fraction of successfully Pinned Configurations to each attractor """ pinned_configurations = [] for att, pcstg in pcstg_dict.items(): att_reached = False for wcc in nx.weakly_connected_components(pcstg): if set(att) in list(nx.attracting_components(pcstg.subgraph(wcc))): pinned_configurations.append(len(wcc) / len(pcstg)) att_reached = True if not att_reached: pinned_configurations.append(0) return pinned_configurations
[docs] def mean_fraction_pinned_configurations(self, pcstg_dict): """Returns the mean Fraction of successfully Pinned Configurations Args: pcstg_dict (dict of networkx.DiGraph) : The dictionary of Pinned Controlled State-Transition-Graphs. Returns: (int) : the mean Fraction of successfully Pinned Configurations """ return sum(self.fraction_pinned_configurations(pcstg_dict)) / len(pcstg_dict)
def _dfs_reachable(self, G, source): """Produce nodes in a depth-first-search pre-ordering starting from source.""" return [n for n in nx.dfs_preorder_nodes(G, source)] # # Feedback Vertex Set (FVS) #
[docs] def feedback_vertex_set_driver_nodes(self, graph='structural', method='grasp', max_iter=1, max_search=11, keep_self_loops=True, *args, **kwargs): """The minimum set of necessary driver nodes to control the network based on Feedback Vertex Set (FVS) theory. Args: graph (string) : Which graph to perform computation method (string) : FVS method. ``bruteforce`` or ``grasp`` (default). max_iter (int) : The maximum number of iterations used by the grasp method. max_search (int) : The maximum number of searched used by the bruteforce method. keep_self_loops (bool) : Keep or remove self loop in the graph to be searched. Returns: (list) : A list-of-lists with FVS solution nodes. Note: When computing FVS on the structural graph, you might want to use ``remove_constants=True`` to make sure the resulting set is minimal – since constants are not controlabled by definition. Also, when computing on the effective graph, you can define the desired ``threshold`` level. """ self._check_compute_variables(sg=True) if graph == 'structural': dg = self.structural_graph(*args, **kwargs) elif graph == 'effective': dg = self.effective_graph(mode='input', bound='mean', threshold=None, *args, **kwargs) else: raise AttributeError("The graph type '%s' is not accepted. Try 'structural' or 'effective'." % graph) # if method == 'grasp': fvssets = fvs.fvs_grasp(dg, max_iter=max_iter, keep_self_loops=keep_self_loops) elif method == 'bruteforce': fvssets = fvs.fvs_bruteforce(dg, max_search=max_search, keep_self_loops=keep_self_loops) else: raise AttributeError("The FVS method '%s' does not exist. Try 'grasp' or 'bruteforce'." % method) fvssets = [fvc.union(set(self.input_nodes)) for fvc in fvssets] return fvssets # [ [self.nodes[i].name for i in fvsset] for fvsset in fvssets]
# # Minimum Dominating Set #
[docs] def minimum_dominating_set_driver_nodes(self, graph='structural', max_search=5, keep_self_loops=True, *args, **kwargs): """The minimun set of necessary driver nodes to control the network based on Minimum Dominating Set (MDS) theory. Args: max_search (int) : Maximum search of additional variables. Defaults to 5. keep_self_loops (bool) : If self-loops are used in the computation. Returns: (list) : A list-of-lists with MDS solution nodes. """ self._check_compute_variables(sg=True) # if graph == 'structural': dg = self.structural_graph(*args, **kwargs) elif graph == 'effective': dg = self.effective_graph(mode='input', bound='mean', threshold=None, *args, **kwargs) else: raise AttributeError("The graph type '%s' is not accepted. Try 'structural' or 'effective'." % graph) # mdssets = mds.mds(dg, max_search=max_search, keep_self_loops=keep_self_loops) return mdssets # [ [self.nodes[i].name for i in mdsset] for mdsset in mdssets]
# Structural Controllability #
[docs] def structural_controllability_driver_nodes(self, graph='structural', keep_self_loops=True, *args, **kwargs): """The minimum set of necessary driver nodes to control the network based on Structural Controlability (SC) theory. Args: keep_self_loops (bool) : If self-loops are used in the computation. Returns: (list) : A list-of-lists with SC solution nodes. """ self._check_compute_variables(sg=True) if graph == 'structural': dg = self.structural_graph(*args, **kwargs) elif graph == 'effective': dg = self.effective_graph(mode='input', bound='mean', threshold=None, *args, **kwargs) else: raise AttributeError("The graph type '%s' is not accepted. Try 'structural' or 'effective'." % graph) # scsets = [set(scset).union(set(self.input_nodes)) for scset in sc.sc(dg, keep_self_loops=keep_self_loops)] return scsets # [ [self.nodes[i].name for i in scset] for scset in scsets]
# # Dynamical Impact #
[docs] def partial_derative_node(self, node, n_traj=10, t=1): """The partial derivative of node on all other nodes after t steps Args: node (int) : the node index for perturbations t (int) : the number of time steps the system is run before impact is calculated. n_traj (int) : the number of trajectories used to approximate the dynamical impact of a node. if 0 then the full STG is used to calculate the true value instead of the approximation method. Returns: (vector) : the partial derivatives """ partial = np.zeros((t, self.Nnodes), dtype=float) if n_traj == 0: config_genderator = (self.num2bin(statenum) for statenum in range(self.Nstates)) n_traj = self.Nstates else: # sample configurations config_genderator = (random_binstate(self.Nnodes) for itraj in range(n_traj)) for config in config_genderator: perturbed_config = flip_binstate_bit(config, node) for n_step in range(t): config = self.step(config) perturbed_config = self.step(perturbed_config) partial[n_step] += np.logical_not(binstate_compare(config, perturbed_config)) partial /= n_traj return partial
[docs] def approx_dynamic_impact(self, source, n_steps=1, target_set=None, bound='mean', threshold=0.0): """Use the network structure to approximate the dynamical impact of a perturbation to node for each of n_steps for details see: Gates et al (2020). Args: source (int) : the source index for perturbations n_steps (int) : the number of time steps bound (str) : the bound for the effective graph 'mean' - edge effectiveness 'upper' - activity Returns: (matrix) : approximate dynamical impact for each node at each step (2 x n_steps x n_nodes) """ if target_set is None: target_set = range(self.Nnodes) Gstr = self.structural_graph() Geff = self.effective_graph(bound=bound, threshold=threshold) # the maximum path with length given by product of weights is the same as minimal path of negative log weight def eff_weight_func(u, v, e): return -np.log(e['weight']) def inv_eff_weight_func(pathlength): return np.exp(-pathlength) impact_matrix = np.zeros((2, n_steps + 1, len(target_set))) impact_matrix[0, :, :] = self.Nnodes + 1 # if we can't reach the node, then the paths cant be longer than the number of nodes in the graph # note that by default: impact_matrix[1, :, :] = 0 the minimum path for nodes we cant reach in the effective graph # in the structural graph, calcluate the dijkstra shortest paths from the source to all targets that are shorter than the cufoff Gstr_shortest_dist, Gstr_shortest_paths = nx.single_source_dijkstra(Gstr, source, target=None, cutoff=n_steps) Gstr_shortest_dist = {n: int(l) for n, l in Gstr_shortest_dist.items()} # in the effective graph, calcluate the dijkstra shortest paths from the source to all targets that are shorter than the cufoff # where the edge weight is given by the effective weight function Geff_shortest_dist, Geff_shortest_paths = nx.single_source_dijkstra(Geff, source, target=None, cutoff=n_steps, weight=eff_weight_func) for itar, target in enumerate(target_set): # we dont need to worry about a path to iteself (source==target) # and if the target doesnt appear in the shortest path dict, then no path exists that is less than the cutoff if target != source and not Gstr_shortest_dist.get(target, None) is None: # the light cone is at least as big as the number of edges in the structural shorest path impact_matrix[0, list(range(Gstr_shortest_dist[target], n_steps + 1)), itar] = Gstr_shortest_dist[target] # if the path exists, then the number of edges (timesteps) is one less than the number of nodes if not Geff_shortest_paths.get(target, None) is None: eff_path_steps = len(Geff_shortest_paths[target]) - 1 else: # or the path doesnt exist eff_path_steps = n_steps + 100 # any number bigger than the longest path to represent we cannot reach the node # start by checking if the number of timesteps is less than the maximum allowable number of steps if eff_path_steps <= n_steps: # now check if the most likely effective path is longer (in terms of # of timesteps) than the structural shortest path if eff_path_steps > Gstr_shortest_dist[target]: # if it is, then we need to find another effective path constrained by the light-cone # for all time steps where the most likely effective path is longer (in terms of # of timesteps) # than the structural shortest path for istep in range(Gstr_shortest_dist[target], eff_path_steps): # bc the effective graph has fully redundant edges, there may actually not be a path try: redo_dijkstra_dist, _ = nx.single_source_dijkstra(Geff, source=source, target=target, cutoff=istep, weight=eff_weight_func) impact_matrix[1, istep, itar] = inv_eff_weight_func(redo_dijkstra_dist) except nx.NetworkXNoPath: pass # once the lightcone includes the target node on the effective shortest path, # then for all other steps the effective path is the best impact_matrix[1, list(range(eff_path_steps, n_steps + 1)), itar] = inv_eff_weight_func(Geff_shortest_dist[target]) return impact_matrix[:, 1:]
[docs] def dist_from_attractor(self): """Find the distance from attractor for each configuration. Returns: distance (dict). Nodes are dictionary indexes and distances the values. """ self._check_compute_variables(attractors=True) dist = {} # stores [node, distance] pair for att in self._attractors: dist.update({a: (0, a) for a in att}) dag = nx.bfs_tree(self._stg, att[0], reverse=True) attractor_states = set(att) for node in nx.topological_sort(dag): # pairs of dist,node for all incoming edges if node not in attractor_states: pairs = [(dist[v][0] + 1, v) for v in dag.pred[node]] if pairs: dist[node] = min(pairs) else: dist[node] = (0, node) return dist
def average_dist_from_attractor(self): dist = self.dist_from_attractor() return np.mean([d[0] for d in dist.values() if d[0] > 0]) # # Dynamics Canalization Map (DCM) #
[docs] def dynamics_canalization_map(self, output=None, simplify=True): """Computes the Dynamics Canalization Map (DCM). In practice, it asks each node to compute their Canalization Map and then puts them together, simplifying it if possible. Args: output (int) : The output DCM to return. Default is ``None``, retuning both [0,1]. simplify (bool) : Attemps to simpify the DCM by removing thresholds nodes with :math:`\tao=1`. Returns: DCM (networkx.DiGraph) : a directed graph representation of the DCM. See Also: :func:`boolean_node.canalizing_map` for the CM and :func:`drawing.draw_dynamics_canalizing_map_graphviz` for plotting. """ CMs = [] for node in self.nodes: if self.keep_constants or not node.constant: CMs.append(node.canalizing_map(output)) # https://networkx.github.io/documentation/stable/reference/algorithms/generated/networkx.algorithms.operators.all.compose_all.html DCM = nx.compose_all(CMs) DCM.name = 'DCM: %s' % (self.name) if simplify: # Loop all threshold nodes threshold_nodes = [(n, d) for n, d in DCM.nodes(data=True) if d['type'] == 'threshold'] for n, d in threshold_nodes: # Constant, remove threshold node if d['tau'] == 0: DCM.remove_node(n) # Tau == 1 if d['tau'] == 1: in_nei = list(DCM.in_edges(n))[0] out_nei = list(DCM.out_edges(n))[0] # neis = set(list(in_nei) + list(out_nei)) # Convert to self loop if (in_nei == out_nei[::-1]): DCM.remove_node(n) DCM.add_edge(in_nei[0], out_nei[1], **{'type': 'simplified', 'mode': 'selfloop'}) # Link variables nodes directly elif not any([DCM.nodes[tn]['type'] == 'fusion' for tn in in_nei]): DCM.remove_node(n) DCM.add_edge(in_nei[0], out_nei[1], **{'type': 'simplified', 'mode': 'direct'}) # Remove Isolates isolates = list(nx.isolates(DCM)) DCM.remove_nodes_from(isolates) return DCM
def _check_compute_variables(self, **kwargs): """Recursevely check if the requested control variables are instantiated/computed. Otherwise computes them in order. """ if 'sg' in kwargs: if self._sg is None: self._sg = self.structural_graph() elif 'eg' in kwargs: if self._eg is None: self._eg = self.effective_graph() elif 'stg' in kwargs: if self._stg is None: self._check_compute_variables(sg=True) self._stg = self.state_transition_graph() elif 'attractors' in kwargs: if self._attractors is None: self._check_compute_variables(stg=True) self._attractors = self.attractors() elif 'stg_r' in kwargs: if self._stg_r is None: self._check_compute_variables(stg=True) self._stg_r = self.state_transition_graph_reachability() else: raise Exception('Control variable name not found. %s' % kwargs) return True # # Get Node Names from Ids # def _get_node_name(self, id): """Return the name of the node based on its id. Args: id (int): id of the node. Returns: name (string): name of the node. """ try: node = self.nodes[id] except error: raise AttributeError("Node with id {id:d} does not exist. {error::s}".format(id=id, error=error)) else: return node.name
[docs] def get_node_name(self, iterable=[]): """Return node names. Optionally, it returns only the names of the ids requested. Args: iterable (int,list, optional) : The id (or list of ids) of nodes to which return their names. Returns: names (list) : The name of the nodes. """ # If only one id is passed, make it a list if not isinstance(iterable, list): iterable = [iterable] # No ids requested, return all the names if not len(iterable): return [n.name for n in self.nodes] # otherwise, use the recursive map to change ids to names else: return recursive_map(self._get_node_name, iterable)
[docs] def average_trajectory_length(self, nsamples=10, random_seed=None, method='random'): """The average length of trajectories from a random initial configuration to its attractor. Args: nsamples (int) : The number of samples per hammimg distance to get. random_seed (int) : The random state seed. method (string) : specify the method you want. either 'random' or .... Returns: trajlen (float) : The average trajectory length to an attractor. """ return sum(len(self.trajectory_to_attractor(random_binstate(self.Nnodes))) for isample in range(nsamples)) / nsamples
[docs] def derrida_curve(self, nsamples=10, max_hamm=None, random_seed=None, method='random'): """The Derrida Curve (also reffered as criticality measure :math:`D_s`). When "mode" is set as "random" (default), it would use random sampling to estimate Derrida value If "mode" is set as "sensitivity", it would use c-sensitivity to calculate Derrida value (slower) You can refer to :cite:'kadelka2017influence' about why c-sensitivity can be used to caculate Derrida value Args: nsamples (int) : The number of samples per hammimg distance to get. max_hamm (int) : The maximum Hamming distance between starting states. default: self.Nnodes random_seed (int) : The random state seed. method (string) : specify the method you want. either 'random' or 'sensitivity' Returns: (dx,dy) (tuple) : The dx and dy of the curve. """ random.seed(random_seed) if max_hamm is None or (max_hamm > self.Nnodes): max_hamm = self.Nnodes dx = np.linspace(0, 1, max_hamm, endpoint=True) dy = np.zeros(max_hamm + 1) if method == 'random': # for each possible hamming distance between the starting states for hamm_dist in range(1, max_hamm + 1): # sample nsample times for isample in range(nsamples): rnd_config = random_binstate(self.Nnodes) perturbed_var = random.sample(range(self.Nnodes), hamm_dist) perturbed_config = [flip_bit(rnd_config[ivar]) if ivar in perturbed_var else rnd_config[ivar] for ivar in range(self.Nnodes)] dy[hamm_dist] += hamming_distance(self.step(rnd_config), self.step(perturbed_config)) / self.Nnodes # normalized Hamming Distance dy /= nsamples elif method == 'sensitivity': for hamm_dist in range(1, max_hamm + 1): dy[hamm_dist] = sum([node.c_sensitivity(hamm_dist, mode='forceK', max_k=self.Nnodes) for node in self.nodes]) / self.Nnodes return dx, dy
[docs] def derrida_coefficient(self, nsamples=10, random_seed=None, method='random'): """The Derrida Coefficient. When "mode" is set as "random" (default), it would use random sampling to estimate Derrida value If "mode" is set as "sensitivity", it would use c-sensitivity to calculate Derrida value (slower) You can refer to :cite:'kadelka2017influence' about why c-sensitivity can be used to caculate Derrida value Args: nsamples (int) : The number of samples per hammimg distance to get. random_seed (int) : The random state seed. method (string) : specify the method you want. either 'random' or 'sensitivity' Returns: (dx,dy) (tuple) : The dx and dy of the curve. """ random.seed(random_seed) hamm_dist = 1 if method == 'random': # for each possible hamming distance between the starting states dy = 0 # sample nsample times for isample in range(nsamples): rnd_config = random_binstate(self.Nnodes) perturbed_var = random.sample(range(self.Nnodes), hamm_dist) perturbed_config = [flip_bit(rnd_config[ivar]) if ivar in perturbed_var else rnd_config[ivar] for ivar in range(self.Nnodes)] dy += hamming_distance(self.step(rnd_config), self.step(perturbed_config)) dy /= float(nsamples) elif method == 'sensitivity': # raise NotImplementedError dy = sum([node.c_sensitivity(hamm_dist, mode='forceK', max_k=self.Nnodes) for node in self.nodes]) return dy / float(self.Nnodes)