当前位置 博文首页 > Lockey23的博客:Python implementation of directed acyclic gr

    Lockey23的博客:Python implementation of directed acyclic gr

    作者:[db:作者] 时间:2021-09-05 19:13

    Python implementation of directed acyclic graph(python实现的有向无环图)

    关于此实现可以在GitHub上找到开源模块

    缘由:项目中用到了dag来表示某个数据分析的结果,然后需要转换成Excel来显示(此话题将在下一篇博客中)
    这里写图片描述

    然后又要将Excel内容以原样展示到网页中(此话题将在下下一篇博客中)
    这里写图片描述
    python实现此数据结构的时候使用到了另外一种特殊的数据结构有序字典(OrderedDict)

    >>> from collections import OrderedDict
    >>> x = OrderedDict()
    >>> x['a'] = set()
    >>> x
    OrderedDict([('a', set([]))])
    >>> x['b'] = set()
    >>> x
    OrderedDict([('a', set([])), ('b', set([]))])
    >>> x['a'].add('b')
    >>> x
    OrderedDict([('a', set(['b'])), ('b', set([]))])
    >>> x['a'].remove('b')
    >>> x
    OrderedDict([('a', set([])), ('b', set([]))])
    >>> [key for key in x if not x[key]]
    ['b']
    
    

    dag/six_subset.py

    import operator
    import sys
    
    # Useful for very coarse version differentiation.
    PY2 = sys.version_info[0] == 2
    PY3_AND_UP = sys.version_info[0] >= 3
    
    if PY3_AND_UP:
        def iterkeys(d, **kw):
            return iter(d.keys(**kw))
    
        def itervalues(d, **kw):
            return iter(d.values(**kw))
    
        def iteritems(d, **kw):
            return iter(d.items(**kw))
    
        def iterlists(d, **kw):
            return iter(d.lists(**kw))
    
        viewkeys = operator.methodcaller("keys")
    
        viewvalues = operator.methodcaller("values")
    
        viewitems = operator.methodcaller("items")
    else:
        def iterkeys(d, **kw):
            return d.iterkeys(**kw)
    
        def itervalues(d, **kw):
            return d.itervalues(**kw)
    
        def iteritems(d, **kw):
            return d.iteritems(**kw)
    
        def iterlists(d, **kw):
            return d.iterlists(**kw)
    
        viewkeys = operator.methodcaller("viewkeys")
    
        viewvalues = operator.methodcaller("viewvalues")
    
        viewitems = operator.methodcaller("viewitems")

    dag/_init_.py

    from copy import copy, deepcopy
    from collections import deque
    
    from . import six_subset as six
    
    try:
        from collections import OrderedDict
    except ImportError:
        from ordereddict import OrderedDict
    
    
    class DAGValidationError(Exception):
        pass
    
    
    class DAG(object):
        """ Directed acyclic graph implementation. """
    
        def __init__(self):
            """ Construct a new DAG with no nodes or edges. """
            self.reset_graph()#重置图,实际是创建了一个有序字典
            self.node_depth = []
            #self.graph = OrderedDict()
    
        def add_node(self, node_name, graph=None):
            """ Add a node if it does not exist yet, or error out. """
            if not graph:
                graph = self.graph
            if node_name in graph:
                raise KeyError('node %s already exists' % node_name)
            graph[node_name] = set()
    
        def add_node_if_not_exists(self, node_name, graph=None):
            try:
                self.add_node(node_name, graph=graph)
            except KeyError:
                pass
    
        def delete_node(self, node_name, graph=None):
            """ Deletes this node and all edges referencing it. """
            if not graph:
                graph = self.graph
            if node_name not in graph:
                raise KeyError('node %s does not exist' % node_name)
            graph.pop(node_name)#删除节点
    
            for node, edges in six.iteritems(graph):#删除节点对应的边
                if node_name in edges:
                    edges.remove(node_name)
    
        def delete_node_if_exists(self, node_name, graph=None):
            try:
                self.delete_node(node_name, graph=graph)
            except KeyError:
                pass
    
        def add_edge(self, ind_node, dep_node, graph=None):#加边
            """ Add an edge (dependency) between the specified nodes. """
            if not graph:
                graph = self.graph
            if ind_node not in graph or dep_node not in graph:
                raise KeyError('one or more nodes do not exist in graph')
            test_graph = deepcopy(graph)
            test_graph[ind_node].add(dep_node)
            is_valid, message = self.validate(test_graph)
            if is_valid:
                graph[ind_node].add(dep_node)
            else:
                raise DAGValidationError()
    
        def delete_edge(self, ind_node, dep_node, graph=None):#删边
            """ Delete an edge from the graph. """
            if not graph:
                graph = self.graph
            if dep_node not in graph.get(ind_node, []):
                raise KeyError('this edge does not exist in graph')
            graph[ind_node].remove(dep_node)
    
        def rename_edges(self, old_task_name, new_task_name, graph=None):
            """ Change references to a task in existing edges. """
            if not graph:
                graph = self.graph
            for node, edges in graph.items():
    
                if node == old_task_name:
                    graph[new_task_name] = copy(edges)
                    del graph[old_task_name]
    
                else:
                    if old_task_name in edges:
                        edges.remove(old_task_name)
                        edges.add(new_task_name)
    
        def predecessors(self, node, graph=None):#返回给定节点的父节点
            """ Returns a list of all predecessors of the given node """
            if graph is None:
                graph = self.graph
            return [key for key in graph if node in graph[key]]
    
        def downstream(self, node, graph=None):#放回给定节点的下一层节点和叶子
            """ Returns a list of all nodes this node has edges towards. """
            if graph is None:
                graph = self.graph
            if node not in graph:
                raise KeyError('node %s is not in graph' % node)
            return list(graph[node])
    
        def all_downstreams(self, node, graph=None):#返回给定节点的所有节点和叶子
            """Returns a list of all nodes ultimately downstream
            of the given node in the dependency graph, in
            topological order."""
            if graph is None:
                graph = self.graph
            nodes = [node]
            nodes_seen = set()
            i = 0
            while i < len(nodes):
                downstreams = self.downstream(nodes[i], graph)
                for downstream_node in downstreams:
                    if downstream_node not in nodes_seen:
                        nodes_seen.add(downstream_node)
                        nodes.append(downstream_node)
                i += 1
            return list(
                filter(
                    lambda node: node in nodes_seen,
                    self.topological_sort(graph=graph)
                )
            )
    
        def all_leaves(self, graph=None):返回所有的叶子
            """ Return a list of all leaves (nodes with no downstreams) """
            if graph is None:
                graph = self.graph
            return [key for key in graph if not graph[key]]
    
        def from_dict(self, graph_dict):#从字典生成图
            """ Reset the graph and build it from the passed dictionary.
            The dictionary takes the form of {node_name: [directed edges]}
            """
    
            self.reset_graph()
            for new_node in six.iterkeys(graph_dict):
                self.add_node(new_node)
            for ind_node, dep_nodes in six.iteritems(graph_dict):
                if not isinstance(dep_nodes, list):
                    raise TypeError('dict values must be lists')
                for dep_node in dep_nodes:
                    self.add_edge(ind_node, dep_node)
    
        def reset_graph(self):
            """ Restore the graph to an empty state. """
            self.graph = OrderedDict()
    
        def ind_nodes(self, graph=None):#返回无依赖的节点列表
            """ Returns a list of all nodes in the graph with no dependencies. """
            if graph is None:
                graph = self.graph
    
            dependent_nodes = set(
                node for dependents in six.itervalues(graph) for node in dependents
            )
            return [node for node in graph.keys() if node not in dependent_nodes]
    
        def validate(self, graph=None):
            """ Returns (Boolean, message) of whether DAG is valid. """
            graph = graph if graph is not None else self.graph
            if len(self.ind_nodes(graph)) == 0:
                return (False, 'no independent nodes detected')
            try:
                self.topological_sort(graph)
            except ValueError:
                return (False, 'failed topological sort')
            return (True, 'valid')
    
        def topological_sort(self, graph=None):#返回从根到叶子的排序列表
            """ Returns a topological ordering of the DAG.
            Raises an error if this is not possible (graph is not valid).
            """
            if graph is None:
                graph = self.graph
    
            in_degree = {}
            for u in graph:
                in_degree[u] = 0
    
            for u in graph:
                for v in graph[u]:
                    in_degree[v] += 1
    
            queue = deque()
            for u in in_degree:
                if in_degree[u] == 0:
                    queue.appendleft(u)
    
            l = []
            while queue:
                u = queue.pop()
                l.append(u)
                for v in graph[u]:
                    in_degree[v] -= 1
                    if in_degree[v] == 0:
                        queue.appendleft(v)
    
            if len(l) == len(graph):
                return l
            else:
                raise ValueError('graph is not acyclic')
        def dag_depth(self,node,graph=None,depth=0):
            if self.graph == {}:
                return 0
            nodes = self.downstream(node)
            if len(nodes) == 0:
                self.node_depth.append(depth)
            else:
                for node in nodes:
                    self.dag_depth(node,self.graph,depth+1)
            return max(self.node_depth)
    
        def size(self):
            return len(self.graph)
    cs
    下一篇:没有了