Skip to content

Commit

Permalink
Merge pull request iterative#3217 from fabiosantoscode/feature/1744-p…
Browse files Browse the repository at this point in the history
…ipeline-eliminate-extra-connection

pipeline: show: outs: eliminate extra edges in DAG
  • Loading branch information
efiop authored Feb 4, 2020
2 parents ed04a9f + 758205a commit 93d1b6d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
57 changes: 30 additions & 27 deletions dvc/command/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,45 +31,50 @@ def _show(self, target, commands, outs, locked):
else:
logger.info(stage.path_in_repo)

def __build_graph(self, target, commands, outs):
def _build_graph(self, target, commands, outs):
import networkx
from dvc.stage import Stage
from dvc.repo.graph import get_pipeline

stage = Stage.load(self.repo, target)
G = get_pipeline(self.repo.pipelines, stage)
target_stage = Stage.load(self.repo, target)
G = get_pipeline(self.repo.pipelines, target_stage)

nodes = []
for stage in G:
nodes = set()
for stage in networkx.dfs_preorder_nodes(G, target_stage):
if commands:
if stage.cmd is None:
continue
nodes.append(stage.cmd)
nodes.add(stage.cmd)
elif outs:
for out in stage.outs:
nodes.append(str(out))
nodes.add(str(out))
for dep in stage.deps:
nodes.add(str(dep))
else:
nodes.append(stage.relpath)
nodes.add(stage.relpath)

edges = []
for from_stage, to_stage in G.edges():
if commands:
if to_stage.cmd is None:
continue
edges.append((from_stage.cmd, to_stage.cmd))
elif outs:
for from_out in from_stage.outs:
for to_out in to_stage.outs:
edges.append((str(from_out), str(to_out)))
else:
edges.append((from_stage.relpath, to_stage.relpath))

return nodes, edges, networkx.is_tree(G)
if outs:
for stage in networkx.dfs_preorder_nodes(G, target_stage):
for dep in stage.deps:
for out in stage.outs:
edges.append((str(out), str(dep)))
else:
for from_stage, to_stage in networkx.dfs_edges(G, target_stage):
if commands:
if to_stage.cmd is None:
continue
edges.append((from_stage.cmd, to_stage.cmd))
else:
edges.append((from_stage.relpath, to_stage.relpath))

return list(nodes), edges, networkx.is_tree(G)

def _show_ascii(self, target, commands, outs):
from dvc.dagascii import draw

nodes, edges, _ = self.__build_graph(target, commands, outs)
nodes, edges, _ = self._build_graph(target, commands, outs)

if not nodes:
return
Expand All @@ -79,7 +84,7 @@ def _show_ascii(self, target, commands, outs):
def _show_dependencies_tree(self, target, commands, outs):
from treelib import Tree

nodes, edges, is_tree = self.__build_graph(target, commands, outs)
nodes, edges, is_tree = self._build_graph(target, commands, outs)
if not nodes:
return
if not is_tree:
Expand All @@ -100,12 +105,12 @@ def _show_dependencies_tree(self, target, commands, outs):
observe_list.pop(0)
tree.show()

def __write_dot(self, target, commands, outs):
def _write_dot(self, target, commands, outs):
import io
import networkx
from networkx.drawing.nx_pydot import write_dot

_, edges, _ = self.__build_graph(target, commands, outs)
_, edges, _ = self._build_graph(target, commands, outs)
edges = [edge[::-1] for edge in edges]

simple_g = networkx.DiGraph()
Expand All @@ -126,9 +131,7 @@ def run(self):
target, self.args.commands, self.args.outs
)
elif self.args.dot:
self.__write_dot(
target, self.args.commands, self.args.outs
)
self._write_dot(target, self.args.commands, self.args.outs)
elif self.args.tree:
self._show_dependencies_tree(
target, self.args.commands, self.args.outs
Expand Down
22 changes: 22 additions & 0 deletions tests/func/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from dvc.main import main
from dvc.command.pipeline import CmdPipelineShow
from tests.basic_env import TestDvc
from tests.func.test_repro import TestRepro
from tests.func.test_repro import TestReproChangedDeepData
Expand Down Expand Up @@ -98,6 +99,27 @@ def test_dot_commands(self):
self.assertEqual(ret, 0)


def test_disconnected_stage(tmp_dir, dvc):
tmp_dir.dvc_gen({"base": "base"})

dvc.add("base")
dvc.run(deps=["base"], outs=["derived1"], cmd="echo derived1 > derived1")
dvc.run(deps=["base"], outs=["derived2"], cmd="echo derived2 > derived2")
final_stage = dvc.run(
deps=["derived1"], outs=["final"], cmd="echo final > final"
)

command = CmdPipelineShow([])
# Need to test __build_graph directly
nodes, edges, is_tree = command._build_graph(
final_stage.path, commands=False, outs=True
)

assert set(nodes) == {"final", "derived1", "base"}
assert edges == [("final", "derived1"), ("derived1", "base")]
assert is_tree is True


def test_print_locked_stages(tmp_dir, dvc, caplog):
tmp_dir.dvc_gen({"foo": "foo content", "bar": "bar content"})
dvc.lock_stage("foo.dvc")
Expand Down

0 comments on commit 93d1b6d

Please sign in to comment.