Skip to content

Commit

Permalink
dvc: replace pipeline with dag (iterative#3905)
Browse files Browse the repository at this point in the history
`pipeline list` is no more, it had some very questionable applications
(none to be precise) and was duplicating old `pipeline show` functionality.
We might introduce a different command to assist in listing (e.g. for
shell completion) in the future.

New `dvc dag` replaces `dvc pipeline show` and by-default will show
all pipelines in the project in ASCII (so there is no more --ascii)
and also `--dot` support, just like before. When given a target,
`dvc dag` (in any mode) will show DAG that leads to the target,
skipping the descendants, unless `--full` is specified.
  • Loading branch information
efiop authored May 31, 2020
1 parent edfa174 commit d1e08b6
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 638 deletions.
4 changes: 2 additions & 2 deletions dvc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
commit,
config,
daemon,
dag,
data_sync,
destroy,
diff,
Expand All @@ -26,7 +27,6 @@
metrics,
move,
params,
pipeline,
plots,
remote,
remove,
Expand Down Expand Up @@ -67,7 +67,7 @@
root,
ls,
freeze,
pipeline,
dag,
daemon,
commit,
diff,
Expand Down
115 changes: 115 additions & 0 deletions dvc/command/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import argparse
import logging

from dvc.command.base import CmdBase, append_doc_link
from dvc.exceptions import DvcException

logger = logging.getLogger(__name__)


def _show_ascii(G):
from dvc.repo.graph import get_pipelines
from dvc.dagascii import draw

pipelines = get_pipelines(G)

ret = []
for pipeline in pipelines:
ret.append(draw(pipeline.nodes, pipeline.edges))

return "\n".join(ret)


def _show_dot(G):
import io
from networkx.drawing.nx_pydot import write_dot

dot_file = io.StringIO()
write_dot(G, dot_file)
return dot_file.getvalue()


def _build(G, target=None, full=False):
import networkx as nx
from dvc.repo.graph import get_pipeline, get_pipelines

if target:
H = get_pipeline(get_pipelines(G), target)
if not full:
descendants = nx.descendants(G, target)
descendants.add(target)
H.remove_nodes_from(set(G.nodes()) - descendants)
else:
H = G

def _relabel(stage):
return stage.addressing

return nx.relabel_nodes(H, _relabel, copy=False)


class CmdDAG(CmdBase):
def run(self):
try:
target = None
if self.args.target:
stages = self.repo.collect(self.args.target)
if len(stages) > 1:
logger.error(
f"'{self.args.target}' contains more than one stage "
"{stages}, please specify one stage"
)
return 1
target = stages[0]

G = _build(self.repo.graph, target=target, full=self.args.full,)

if self.args.dot:
logger.info(_show_dot(G))
else:
from dvc.utils.pager import pager

pager(_show_ascii(G))

return 0
except DvcException:
msg = "failed to show"
if self.args.target:
msg += f"a pipeline for '{target}'"
else:
msg += "pipelines"
logger.exception(msg)
return 1


def add_parser(subparsers, parent_parser):
DAG_HELP = "Visualize DVC project DAG."
dag_parser = subparsers.add_parser(
"dag",
parents=[parent_parser],
description=append_doc_link(DAG_HELP, "dag"),
help=DAG_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
dag_parser.add_argument(
"--dot",
action="store_true",
default=False,
help="Print DAG with .dot format.",
)
dag_parser.add_argument(
"--full",
action="store_true",
default=False,
help=(
"Show full DAG that the target belongs too, instead of "
"showing DAG consisting only of ancestors."
),
)
dag_parser.add_argument(
"target",
nargs="?",
help="Stage or output to show pipeline for. Optional. "
"(Finds all stages in the workspace by default.)",
)
dag_parser.set_defaults(func=CmdDAG)
Loading

0 comments on commit d1e08b6

Please sign in to comment.