当前位置 博文首页 > Lockey23的博客:Python implementation of directed acyclic gr
关于此实现可以在GitHub上找到开源模块
缘由:项目中用到了dag来表示某个数据分析的结果,然后需要转换成Excel来显示(此话题将在下一篇博客中)
然后又要将Excel内容以原样展示到网页中(此话题将在下下一篇博客中)
python实现此数据结构的时候使用到了另外一种特殊的数据结构有序字典(OrderedDict)
>>> from collections import OrderedDict
>>> x = OrderedDict()
>>> x['a'] = set()
>>> x
OrderedDict([('a', set([]))])
>>> x['b'] = set()
>>> x
OrderedDict([('a', set([])), ('b', set([]))])
>>> x['a'].add('b')
>>> x
OrderedDict([('a', set(['b'])), ('b', set([]))])
>>> x['a'].remove('b')
>>> x
OrderedDict([('a', set([])), ('b', set([]))])
>>> [key for key in x if not x[key]]
['b']
dag/six_subset.py
import operator
import sys
# Useful for very coarse version differentiation.
PY2 = sys.version_info[0] == 2
PY3_AND_UP = sys.version_info[0] >= 3
if PY3_AND_UP:
def iterkeys(d, **kw):
return iter(d.keys(**kw))
def itervalues(d, **kw):
return iter(d.values(**kw))
def iteritems(d, **kw):
return iter(d.items(**kw))
def iterlists(d, **kw):
return iter(d.lists(**kw))
viewkeys = operator.methodcaller("keys")
viewvalues = operator.methodcaller("values")
viewitems = operator.methodcaller("items")
else:
def iterkeys(d, **kw):
return d.iterkeys(**kw)
def itervalues(d, **kw):
return d.itervalues(**kw)
def iteritems(d, **kw):
return d.iteritems(**kw)
def iterlists(d, **kw):
return d.iterlists(**kw)
viewkeys = operator.methodcaller("viewkeys")
viewvalues = operator.methodcaller("viewvalues")
viewitems = operator.methodcaller("viewitems")
dag/_init_.py
from copy import copy, deepcopy
from collections import deque
from . import six_subset as six
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
class DAGValidationError(Exception):
pass
class DAG(object):
""" Directed acyclic graph implementation. """
def __init__(self):
""" Construct a new DAG with no nodes or edges. """
self.reset_graph()#重置图,实际是创建了一个有序字典
self.node_depth = []
#self.graph = OrderedDict()
def add_node(self, node_name, graph=None):
""" Add a node if it does not exist yet, or error out. """
if not graph:
graph = self.graph
if node_name in graph:
raise KeyError('node %s already exists' % node_name)
graph[node_name] = set()
def add_node_if_not_exists(self, node_name, graph=None):
try:
self.add_node(node_name, graph=graph)
except KeyError:
pass
def delete_node(self, node_name, graph=None):
""" Deletes this node and all edges referencing it. """
if not graph:
graph = self.graph
if node_name not in graph:
raise KeyError('node %s does not exist' % node_name)
graph.pop(node_name)#删除节点
for node, edges in six.iteritems(graph):#删除节点对应的边
if node_name in edges:
edges.remove(node_name)
def delete_node_if_exists(self, node_name, graph=None):
try:
self.delete_node(node_name, graph=graph)
except KeyError:
pass
def add_edge(self, ind_node, dep_node, graph=None):#加边
""" Add an edge (dependency) between the specified nodes. """
if not graph:
graph = self.graph
if ind_node not in graph or dep_node not in graph:
raise KeyError('one or more nodes do not exist in graph')
test_graph = deepcopy(graph)
test_graph[ind_node].add(dep_node)
is_valid, message = self.validate(test_graph)
if is_valid:
graph[ind_node].add(dep_node)
else:
raise DAGValidationError()
def delete_edge(self, ind_node, dep_node, graph=None):#删边
""" Delete an edge from the graph. """
if not graph:
graph = self.graph
if dep_node not in graph.get(ind_node, []):
raise KeyError('this edge does not exist in graph')
graph[ind_node].remove(dep_node)
def rename_edges(self, old_task_name, new_task_name, graph=None):
""" Change references to a task in existing edges. """
if not graph:
graph = self.graph
for node, edges in graph.items():
if node == old_task_name:
graph[new_task_name] = copy(edges)
del graph[old_task_name]
else:
if old_task_name in edges:
edges.remove(old_task_name)
edges.add(new_task_name)
def predecessors(self, node, graph=None):#返回给定节点的父节点
""" Returns a list of all predecessors of the given node """
if graph is None:
graph = self.graph
return [key for key in graph if node in graph[key]]
def downstream(self, node, graph=None):#放回给定节点的下一层节点和叶子
""" Returns a list of all nodes this node has edges towards. """
if graph is None:
graph = self.graph
if node not in graph:
raise KeyError('node %s is not in graph' % node)
return list(graph[node])
def all_downstreams(self, node, graph=None):#返回给定节点的所有节点和叶子
"""Returns a list of all nodes ultimately downstream
of the given node in the dependency graph, in
topological order."""
if graph is None:
graph = self.graph
nodes = [node]
nodes_seen = set()
i = 0
while i < len(nodes):
downstreams = self.downstream(nodes[i], graph)
for downstream_node in downstreams:
if downstream_node not in nodes_seen:
nodes_seen.add(downstream_node)
nodes.append(downstream_node)
i += 1
return list(
filter(
lambda node: node in nodes_seen,
self.topological_sort(graph=graph)
)
)
def all_leaves(self, graph=None):返回所有的叶子
""" Return a list of all leaves (nodes with no downstreams) """
if graph is None:
graph = self.graph
return [key for key in graph if not graph[key]]
def from_dict(self, graph_dict):#从字典生成图
""" Reset the graph and build it from the passed dictionary.
The dictionary takes the form of {node_name: [directed edges]}
"""
self.reset_graph()
for new_node in six.iterkeys(graph_dict):
self.add_node(new_node)
for ind_node, dep_nodes in six.iteritems(graph_dict):
if not isinstance(dep_nodes, list):
raise TypeError('dict values must be lists')
for dep_node in dep_nodes:
self.add_edge(ind_node, dep_node)
def reset_graph(self):
""" Restore the graph to an empty state. """
self.graph = OrderedDict()
def ind_nodes(self, graph=None):#返回无依赖的节点列表
""" Returns a list of all nodes in the graph with no dependencies. """
if graph is None:
graph = self.graph
dependent_nodes = set(
node for dependents in six.itervalues(graph) for node in dependents
)
return [node for node in graph.keys() if node not in dependent_nodes]
def validate(self, graph=None):
""" Returns (Boolean, message) of whether DAG is valid. """
graph = graph if graph is not None else self.graph
if len(self.ind_nodes(graph)) == 0:
return (False, 'no independent nodes detected')
try:
self.topological_sort(graph)
except ValueError:
return (False, 'failed topological sort')
return (True, 'valid')
def topological_sort(self, graph=None):#返回从根到叶子的排序列表
""" Returns a topological ordering of the DAG.
Raises an error if this is not possible (graph is not valid).
"""
if graph is None:
graph = self.graph
in_degree = {}
for u in graph:
in_degree[u] = 0
for u in graph:
for v in graph[u]:
in_degree[v] += 1
queue = deque()
for u in in_degree:
if in_degree[u] == 0:
queue.appendleft(u)
l = []
while queue:
u = queue.pop()
l.append(u)
for v in graph[u]:
in_degree[v] -= 1
if in_degree[v] == 0:
queue.appendleft(v)
if len(l) == len(graph):
return l
else:
raise ValueError('graph is not acyclic')
def dag_depth(self,node,graph=None,depth=0):
if self.graph == {}:
return 0
nodes = self.downstream(node)
if len(nodes) == 0:
self.node_depth.append(depth)
else:
for node in nodes:
self.dag_depth(node,self.graph,depth+1)
return max(self.node_depth)
def size(self):
return len(self.graph)
cs