Source code for cana.control.sc

# -*- coding: utf-8 -*-
"""
Structural Controllability
===========================


"""
#   Copyright (C) 2021 by
#   Alex Gates <ajgates42@gmail.com>
#   Rion Brattig Correia <rionbr@gmail.com>
#   All rights reserved.
#   MIT license.
import networkx as nx


[docs]def sc(directed_graph, keep_self_loops=True): """The Structural Contolability driver sets. Parameters: directed_graph (networkx.DiGraph) : The directed graph capturing the network structure. keep_self_loops (bool) : Determines if the self loops are kept in the determination of structural control. Returns: (list) : A list of sets with the driver nodes. See also: If you only need the size of the set, see :func:`sc_min_size`. """ # get the bipartite represenation of the directed graph bipartite_graph, vertex_out, vertex_in = _directed_to_bipartite(directed_graph, keep_self_loops=keep_self_loops) # do the maximum matching on the bipartite representation max_matching = nx.bipartite.hopcroft_karp_matching(bipartite_graph, top_nodes=vertex_out) all_max_matchings = _enumerate_maximum_matchings(bipartite_graph, vertex_out, vertex_in, max_matching) sc_sets = [] for M in all_max_matchings: # find the vertex set whose in-coming vertices were matched matched_incoming = set(vertex_in) & set(M.keys()) matched_original_vertices = set([sink[0] for sink in matched_incoming]) new_dv_set = sorted(set(directed_graph.nodes()) - matched_original_vertices) if new_dv_set not in sc_sets: sc_sets.append(sorted(new_dv_set)) return sc_sets
[docs]def sc_min_size(directed_graph, keep_self_loops=True): """The minimum number of driver variables required by structural controllability. Parameters: directed_graph (networkx.DiGraph) : The directed graph capturing the network structure. keep_self_loops (bool) : Determines if the self loops are kept in the determination of structural control. TODO: Implement the removal of self loops Returns: (int) : The number of driver variables necessary to render the graph structurally controlled. See also: :func:`sc` """ # get the bipartite represenation of the directed graph bipartite_graph, vertex_out, vertex_in = _directed_to_bipartite(directed_graph, keep_self_loops=keep_self_loops) # do the maximum matching on the bipartite representation max_matching = nx.bipartite.hopcroft_karp_matching(bipartite_graph) # find the vertex set whose in-coming vertices were matched matched_incoming = set(vertex_in) & set(max_matching.keys()) matched_original_vertices = set([sink[0] for sink in matched_incoming]) # driver variables are those whose in-coming edges were not matched # although this set is not unique, only the size of the set is gaurenteed to be # a minimum sc_min_size = len(set(directed_graph.nodes()) - matched_original_vertices) return sc_min_size
def _directed_to_bipartite(directed_graph, keep_self_loops=True): """Create the undirected Bipartite representation of a directed graph. In this represtation, each vertex is a tuple (n, direction) where one vertex set (labeled with "+") denotes the out-going directed edges while the other vertex set (labeled with "-") denotes the in-coming directed edges. Two vertices are connected with an undirected edge if a directed edge exsisted in the original graph. Args: directed_graph (networkx.DiGraph) : The directed graph to be transformed. Returns: bipartite_graph (networkx.Graph) : The bipartite representation of the directed graph. vertex_out (list) : The list of out-going vertices. vertex_in (list) : The list of in-coming vertices. """ # get the two vertex sets vertex_out = [(n, True) for n in directed_graph.nodes()] vertex_in = [(n, False) for n in directed_graph.nodes()] # make the bipartite graph bipartite_graph = nx.Graph() bipartite_graph.add_nodes_from(vertex_out, bipartite=0) bipartite_graph.add_nodes_from(vertex_in, bipartite=1) # add the edges for n in directed_graph.nodes(): for v in directed_graph.successors(n): if keep_self_loops: bipartite_graph.add_edge((n, True), (v, False)) elif (n != v): bipartite_graph.add_edge((n, True), (v, False)) return bipartite_graph, vertex_out, vertex_in def _bipartite_to_directed(bipartite_graph): """Recover the directed graph from its bipartite representation. Args: bipartite_graph (networkx.Graph) : The bipartite represtation of a directed graph consisting of two vertex sets: vertex_out and vertex_in, to be transformed back to a directed graph. Returns: directed_graph (networkx.DiGraph) : The directed graph. See also: :attr:`_directed_to_bipartite`. """ directed_graph = nx.DiGraph() for edge, d in bipartite_graph.edges(): for node in edge: if node[1] is True: source = node[0] elif node[1] is False: sink = node[0] directed_graph.add_edge(source, sink) return directed_graph def _enumerate_maximum_matchings(bipartite_graph, vertex_out, vertex_in, max_matching): """ @@@ Uno (1999) ??? TODO: Could not find the original paper to cite here. """ # this is the first matching in our list matchings_list = [max_matching] D = _make_matching_digraph(bipartite_graph, vertex_out, vertex_in, max_matching) D = _trim_unnecessary_edges(D) matchings_list = _enumerate_maximum_matchings_iter(bipartite_graph, vertex_out, vertex_in, max_matching, D, matchings_list) return matchings_list def _make_matching_digraph(bipartite_graph, U, V, M): """ @@@ Uno (1999) ??? TODO: Could not find the original paper to cite here. """ matching_digraph = nx.DiGraph() matching_digraph.add_nodes_from(bipartite_graph.nodes()) matched_vertices = M.keys() # loop over the first vertex set for v in V: # and loop over all neighbors of v for u in bipartite_graph.neighbors(v): # check if the edge is in the maximum matching if v in matched_vertices and M[v] == u: # if the edge is matched, place a directed edge from U to V matching_digraph.add_edge(u, v) else: # otherwise, place a directed edge from V to U matching_digraph.add_edge(v, u) return matching_digraph def _trim_unnecessary_edges(matching_digraph): """ """ scc_subgraphs = [matching_digraph.subgraph(scc) for scc in nx.strongly_connected_components(matching_digraph)] trimmed_graph = scc_subgraphs[0] for next_subgraph in scc_subgraphs[1:]: trimmed_graph = nx.union(trimmed_graph, next_subgraph) return trimmed_graph def _enumerate_maximum_matchings_iter(G, U, V, M, D, matchings_list): """ """ if len(G) > 0 and not (D is None): # find the cycles in the matching digraph cycles = [c for c in nx.simple_cycles(D)] if len(cycles) > 0: # swap the edges in the cycle e, Mprime = _swap_edges_in_cycle(cycles[0], M) # this creates a new maximum matching, see if we already have it or add it if Mprime not in matchings_list: matchings_list.append(Mprime) # now we have two subpromblems which result in new iterations # Problem plus: Gplus, Uplus, Vplus, Mplus = _make_graph_plus(G, U, V, M, e) Dplus = _make_matching_digraph(Gplus, Uplus, Vplus, Mplus) Dplus = _trim_unnecessary_edges(Dplus) matchings_list = _enumerate_maximum_matchings_iter(Gplus, Uplus, Vplus, Mplus, Dplus, matchings_list) # and Problem minus: Gminus = _make_graph_minus(G, e) Dminus = _make_matching_digraph(Gminus, U, V, Mprime) Dminus = _trim_unnecessary_edges(Dminus) matchings_list = _enumerate_maximum_matchings_iter(Gminus, U, V, Mprime, Dminus, matchings_list) else: # if there are no cycles call the iter again without the matching digraph matchings_list = _enumerate_maximum_matchings_iter(G, U, V, M, None, matchings_list) elif len(G) > 0: path = _find_path_length_two(G, V, M) if not (path is None): # swap edges in the path e, Mprime = _swap_edges_in_path(path, M) # this creates a new maximum matching, see if we already have it or add it if Mprime not in matchings_list: matchings_list.append(Mprime) # now we have two subpromblems which result in new iterations # Problem plus: Gplus, Uplus, Vplus, Mplus = _make_graph_plus(G, U, V, M, e) matchings_list = _enumerate_maximum_matchings_iter(Gplus, Uplus, Vplus, Mprime, None, matchings_list) # Problem minus with M: Gminus = _make_graph_minus(G, e) matchings_list = _enumerate_maximum_matchings_iter(Gminus, U, V, M, None, matchings_list) return matchings_list def _make_graph_plus(G, U, V, M, e): """ """ Gplus = G.copy() # remove the two endpoints and all adjacent edges Gplus.remove_node(e[0]) Gplus.remove_node(e[1]) Uplus = set(U) Vplus = set(V) if e[0] in U: Uplus.remove(e[0]) Vplus.remove(e[1]) else: Uplus.remove(e[1]) Vplus.remove(e[0]) Mplus = dict(M) return Gplus, Uplus, Vplus, Mplus def _make_graph_minus(G, e): """ """ Gminus = G.copy() # remove the edge Gminus.remove_edge(e[0], e[1]) return Gminus def _find_path_length_two(G, V, M): """ """ path = None for v in _listminus(V, M.keys()): for u in G.neighbors(v): for w in G.neighbors(u): if w in M and M[w] == u: path = [v, u, w] break return path def _listminus(list1, list2): """ """ return [a for a in list1 if a not in list2] def _swap_edges_in_cycle(cycle, M): """ """ Mprime = dict(M) e = None cycle = cycle + [cycle[0]] for i in range(len(cycle) - 1): if cycle[i] in Mprime: if M[cycle[i]] == cycle[i + 1]: e = (cycle[i], cycle[i + 1]) Mprime[cycle[i]] = cycle[i - 1] Mprime[cycle[i - 1]] = Mprime[cycle[i]] else: e = (cycle[i], cycle[i - 1]) Mprime[cycle[i]] = cycle[i + 1] Mprime[cycle[i + 1]] = Mprime[cycle[i]] return e, Mprime def _swap_edges_in_path(path, M): """ """ Mprime = dict(M) e = (path[2], path[1]) m = Mprime[path[2]] del Mprime[path[2]] del Mprime[m] Mprime[path[0]] = path[1] Mprime[path[1]] = path[0] return e, Mprime