diff --git a/airflow/bin/airflow b/airflow/bin/airflow index 81c74e8626d6d..80f1135c01e22 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -2,7 +2,7 @@ import logging import os from airflow import configuration -from airflow.bin.cli import get_parser +from airflow.bin.cli import CLIFactory if __name__ == '__main__': @@ -10,6 +10,6 @@ if __name__ == '__main__': os.environ['KRB5CCNAME'] = configuration.get('kerberos', 'ccache') os.environ['KRB5_KTNAME'] = configuration.get('kerberos', 'keytab') - parser = get_parser() + parser = CLIFactory.get_parser() args = parser.parse_args() args.func(args) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index a37dfe2850d76..789ffb3aa21e7 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -7,9 +7,10 @@ import warnings from datetime import datetime -from builtins import input import argparse -import dateutil.parser +from builtins import input +from collections import namedtuple +from dateutil.parser import parse as parsedate import json import airflow @@ -21,10 +22,6 @@ DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) -# Common help text across subcommands -mark_success_help = "Mark jobs as succeeded without running them" -subdir_help = "File location or directory from which to look for the dag" - def process_subdir(subdir): dags_folder = conf.get("core", "DAGS_FOLDER") @@ -41,19 +38,22 @@ def process_subdir(subdir): return subdir -def backfill(args): - logging.basicConfig( - level=settings.LOGGING_LEVEL, - format=settings.SIMPLE_LOG_FORMAT) +def get_dag(args): dagbag = DagBag(process_subdir(args.subdir)) if args.dag_id not in dagbag.dags: raise AirflowException('dag_id could not be found') - dag = dagbag.dags[args.dag_id] + return dagbag.dags[args.dag_id] + + +def backfill(args, dag=None): + logging.basicConfig( + level=settings.LOGGING_LEVEL, + format=settings.SIMPLE_LOG_FORMAT) - if args.start_date: - args.start_date = dateutil.parser.parse(args.start_date) - if args.end_date: - args.end_date = dateutil.parser.parse(args.end_date) + dag = dag or get_dag(args) + + if not args.start_date and not args.end_date: + raise AirflowException("Provide a start_date and/or end_date") # If only one date is passed, using same as start and end args.end_date = args.end_date or args.start_date @@ -90,7 +90,7 @@ def trigger_dag(args): execution_date = datetime.now() run_id = args.run_id or "manual__{0}".format(execution_date.isoformat()) dr = session.query(DagRun).filter( - DagRun.dag_id==args.dag_id, DagRun.run_id==run_id).first() + DagRun.dag_id == args.dag_id, DagRun.run_id == run_id).first() conf = {} if args.conf: @@ -110,19 +110,16 @@ def trigger_dag(args): session.commit() -def pause(args): - set_is_paused(True, args) +def pause(args, dag=None): + set_is_paused(True, args, dag) -def unpause(args): - set_is_paused(False, args) +def unpause(args, dag=None): + set_is_paused(False, args, dag) -def set_is_paused(is_paused, args): - dagbag = DagBag(process_subdir(args.subdir)) - if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') - dag = dagbag.dags[args.dag_id] +def set_is_paused(is_paused, args, dag=None): + dag = dag or get_dag(args) session = settings.Session() dm = session.query(DagModel).filter( @@ -134,35 +131,29 @@ def set_is_paused(is_paused, args): print(msg) -def run(args): +def run(args, dag=None): utils.pessimistic_connection_handling() + if dag: + args.dag_id = dag.dag_id # Setting up logging log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) if not os.path.exists(directory): os.makedirs(directory) - args.execution_date = dateutil.parser.parse(args.execution_date) iso = args.execution_date.isoformat() filename = "{directory}/{iso}".format(**locals()) - subdir = process_subdir(args.subdir) logging.root.handlers = [] logging.basicConfig( filename=filename, level=settings.LOGGING_LEVEL, format=settings.LOG_FORMAT) - if not args.pickle: - dagbag = DagBag(subdir) - if args.dag_id not in dagbag.dags: - msg = 'DAG [{0}] could not be found in {1}'.format(args.dag_id, subdir) - logging.error(msg) - raise AirflowException(msg) - dag = dagbag.dags[args.dag_id] - task = dag.get_task(task_id=args.task_id) - else: + if not args.pickle and not dag: + dag = get_dag(args) + elif not dag: session = settings.Session() logging.info('Loading pickle id {args.pickle}'.format(**locals())) dag_pickle = session.query( @@ -170,12 +161,8 @@ def run(args): if not dag_pickle: raise AirflowException("Who hid the pickle!? [missing pickle]") dag = dag_pickle.pickle - task = dag.get_task(task_id=args.task_id) + task = dag.get_task(task_id=args.task_id) - task_start_date = None - if args.task_start_date: - task_start_date = dateutil.parser.parse(args.task_start_date) - task.start_date = task_start_date ti = TaskInstance(task, args.execution_date) if args.local: @@ -185,7 +172,7 @@ def run(args): mark_success=args.mark_success, force=args.force, pickle_id=args.pickle, - task_start_date=task_start_date, + task_start_date=args.task_start_date, ignore_dependencies=args.ignore_dependencies, pool=args.pool) run_job.run() @@ -248,7 +235,6 @@ def run(args): remote_log_location = filename.replace(log_base, remote_base) # S3 - if remote_base.startswith('s3:/'): utils.S3Log().write(log, remote_log_location) # GCS @@ -270,11 +256,7 @@ def task_state(args): >>> airflow task_state tutorial sleep 2015-01-01 success """ - args.execution_date = dateutil.parser.parse(args.execution_date) - dagbag = DagBag(process_subdir(args.subdir)) - if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') - dag = dagbag.dags[args.dag_id] + dag = get_dag(args) task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date) print(ti.current_state()) @@ -285,11 +267,8 @@ def list_dags(args): print("\n".join(sorted(dagbag.dags))) -def list_tasks(args): - dagbag = DagBag(process_subdir(args.subdir)) - if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') - dag = dagbag.dags[args.dag_id] +def list_tasks(args, dag=None): + dag = dag or get_dag(args) if args.tree: dag.tree_view() else: @@ -297,12 +276,9 @@ def list_tasks(args): print("\n".join(sorted(tasks))) -def test(args): - args.execution_date = dateutil.parser.parse(args.execution_date) - dagbag = DagBag(process_subdir(args.subdir)) - if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') - dag = dagbag.dags[args.dag_id] +def test(args, dag=None): + dag = dag or get_dag(args) + task = dag.get_task(task_id=args.task_id) # Add CLI provided task_params to task.params if args.task_params: @@ -317,11 +293,7 @@ def test(args): def render(args): - args.execution_date = dateutil.parser.parse(args.execution_date) - dagbag = DagBag(process_subdir(args.subdir)) - if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') - dag = dagbag.dags[args.dag_id] + dag = get_dag(args) task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date) ti.render_templates() @@ -338,16 +310,7 @@ def clear(args): logging.basicConfig( level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) - dagbag = DagBag(process_subdir(args.subdir)) - - if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') - dag = dagbag.dags[args.dag_id] - - if args.start_date: - args.start_date = dateutil.parser.parse(args.start_date) - if args.end_date: - args.end_date = dateutil.parser.parse(args.end_date) + dag = get_dag(args) if args.task_regex: dag = dag.sub_dag( @@ -476,315 +439,295 @@ def flower(args): def kerberos(args): # noqa print(settings.HEADER) - import airflow.security.kerberos airflow.security.kerberos.run() - -def get_parser(): - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(help='sub-command help', dest='subcommand') - subparsers.required = True - - ht = "Run subsections of a DAG for a specified date range" - parser_backfill = subparsers.add_parser('backfill', help=ht) - parser_backfill.add_argument("dag_id", help="The id of the dag to run") - parser_backfill.add_argument( - "-t", "--task_regex", - help="The regex to filter specific task_ids to backfill (optional)") - parser_backfill.add_argument( - "-s", "--start_date", help="Override start_date YYYY-MM-DD") - parser_backfill.add_argument( - "-e", "--end_date", help="Override end_date YYYY-MM-DD") - parser_backfill.add_argument( - "-m", "--mark_success", - help=mark_success_help, action="store_true") - parser_backfill.add_argument( - "-l", "--local", - help="Run the task using the LocalExecutor", action="store_true") - parser_backfill.add_argument( - "-x", "--donot_pickle", - help=( - "Do not attempt to pickle the DAG object to send over " - "to the workers, just tell the workers to run their version " - "of the code."), - action="store_true") - parser_backfill.add_argument( - "-a", "--include_adhoc", - help="Include dags with the adhoc parameter.", action="store_true") - parser_backfill.add_argument( - "-i", "--ignore_dependencies", - help=( - "Skip upstream tasks, run only the tasks " - "matching the regexp. Only works in conjunction with task_regex"), - action="store_true") - parser_backfill.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_backfill.add_argument( - "-p", "--pool", help="Pool to use to run the backfill") - parser_backfill.add_argument( - "-dr", "--dry_run", help="Perform a dry run", action="store_true") - parser_backfill.set_defaults(func=backfill) - - ht = "Clear a set of task instance, as if they never ran" - parser_clear = subparsers.add_parser('clear', help=ht) - parser_clear.add_argument("dag_id", help="The id of the dag to run") - parser_clear.add_argument( - "-t", "--task_regex", - help="The regex to filter specific task_ids to clear (optional)") - parser_clear.add_argument( - "-s", "--start_date", help="Override start_date YYYY-MM-DD") - parser_clear.add_argument( - "-e", "--end_date", help="Override end_date YYYY-MM-DD") - ht = "Include upstream tasks" - parser_clear.add_argument( - "-u", "--upstream", help=ht, action="store_true") - ht = "Only failed jobs" - parser_clear.add_argument( - "-f", "--only_failed", help=ht, action="store_true") - ht = "Only running jobs" - parser_clear.add_argument( - "-r", "--only_running", help=ht, action="store_true") - ht = "Include downstream tasks" - parser_clear.add_argument( - "-d", "--downstream", help=ht, action="store_true") - parser_clear.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - ht = "Do not request confirmation" - parser_clear.add_argument( - "-c", "--no_confirm", help=ht, action="store_true") - parser_clear.set_defaults(func=clear) - - ht = "Trigger a DAG" - parser_trigger_dag = subparsers.add_parser('trigger_dag', help=ht) - parser_trigger_dag.add_argument("dag_id", help="The id of the dag to run") - parser_trigger_dag.add_argument( - "-r", "--run_id", - help="Helps to indentify this run") - ht = "json string that gets pickled into the DagRun's conf attribute" - parser_trigger_dag.add_argument('-c', '--conf', help=ht) - parser_trigger_dag.set_defaults(func=trigger_dag) - - ht = "Pause a DAG" - parser_pause = subparsers.add_parser('pause', help=ht) - parser_pause.add_argument("dag_id", help="The id of the dag to pause") - parser_pause.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_pause.set_defaults(func=pause) - - ht = "Unpause a DAG" - parser_unpause = subparsers.add_parser('unpause', help=ht) - parser_unpause.add_argument("dag_id", help="The id of the dag to unpause") - parser_unpause.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_unpause.set_defaults(func=unpause) - - ht = "Run a single task instance" - parser_run = subparsers.add_parser('run', help=ht) - parser_run.add_argument("dag_id", help="The id of the dag to run") - parser_run.add_argument("task_id", help="The task_id to run") - parser_run.add_argument( - "execution_date", help="The execution date to run") - parser_run.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_run.add_argument( - "-s", "--task_start_date", - help="Override the tasks's start_date (used internally)",) - parser_run.add_argument( - "-m", "--mark_success", help=mark_success_help, action="store_true") - parser_run.add_argument( - "-f", "--force", - help="Force a run regardless or previous success", - action="store_true") - parser_run.add_argument( - "-l", "--local", - help="Runs the task locally, don't use the executor", - action="store_true") - parser_run.add_argument( - "-r", "--raw", - help=argparse.SUPPRESS, - action="store_true") - parser_run.add_argument( - "--pool", help="Pool to use to run the task instance") - parser_run.add_argument( - "-i", "--ignore_dependencies", - help="Ignore upstream and depends_on_past dependencies", - action="store_true") - parser_run.add_argument( - "--ship_dag", - help="Pickles (serializes) the DAG and ships it to the worker", - action="store_true") - parser_run.add_argument( - "-p", "--pickle", - help="Serialized pickle object of the entire dag (used internally)") - parser_run.add_argument( - "-j", "--job_id", help=argparse.SUPPRESS) - parser_run.set_defaults(func=run) - - ht = ( - "Test a task instance. This will run a task without checking for " - "dependencies or recording it's state in the database." - ) - parser_test = subparsers.add_parser('test', help=ht) - parser_test.add_argument("dag_id", help="The id of the dag to run") - parser_test.add_argument("task_id", help="The task_id to run") - parser_test.add_argument( - "execution_date", help="The execution date to run") - parser_test.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_test.add_argument( - "-dr", "--dry_run", help="Perform a dry run", action="store_true") - parser_test.add_argument( - "-tp", "--task_params", help="Sends a JSON params dict to the task") - parser_test.set_defaults(func=test) - - ht = "Get the status of a task instance." - parser_task_state = subparsers.add_parser('task_state', help=ht) - parser_task_state.add_argument("dag_id", help="The id of the dag to check") - parser_task_state.add_argument("task_id", help="The task_id to check") - parser_task_state.add_argument( - "execution_date", help="The execution date to check") - parser_task_state.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_task_state.set_defaults(func=task_state) - - ht = "Start a Airflow webserver instance" - parser_webserver = subparsers.add_parser('webserver', help=ht) - parser_webserver.add_argument( - "-p", "--port", - default=conf.get('webserver', 'WEB_SERVER_PORT'), - type=int, - help="Set the port on which to run the web server") - parser_webserver.add_argument( - "-w", "--workers", - default=conf.get('webserver', 'WORKERS'), - type=int, - help="Number of workers to run the webserver on") - parser_webserver.add_argument( - "-k", "--workerclass", - default=conf.get('webserver', 'WORKER_CLASS'), - choices=['sync', 'eventlet', 'gevent', 'tornado'], - help="The worker class to use for gunicorn") - parser_webserver.add_argument( - "-hn", "--hostname", - default=conf.get('webserver', 'WEB_SERVER_HOST'), - help="Set the hostname on which to run the web server") - ht = "Use the server that ships with Flask in debug mode" - parser_webserver.add_argument( - "-d", "--debug", help=ht, action="store_true") - parser_webserver.set_defaults(func=webserver) - - ht = "Start a scheduler scheduler instance" - parser_scheduler = subparsers.add_parser('scheduler', help=ht) - parser_scheduler.add_argument( - "-d", "--dag_id", help="The id of the dag to run") - parser_scheduler.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_scheduler.add_argument( - "-n", "--num_runs", - default=None, - type=int, - help="Set the number of runs to execute before exiting") - parser_scheduler.add_argument( - "-p", "--do_pickle", - default=False, - help=( - "Attempt to pickle the DAG object to send over " - "to the workers, instead of letting workers run their version " - "of the code."), - action="store_true") - parser_scheduler.set_defaults(func=scheduler) - - ht = "Initialize the metadata database" - parser_initdb = subparsers.add_parser('initdb', help=ht) - parser_initdb.set_defaults(func=initdb) - - ht = "Burn down and rebuild the metadata database" - parser_resetdb = subparsers.add_parser('resetdb', help=ht) - parser_resetdb.add_argument( - "-y", "--yes", +Arg = namedtuple( + 'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices']) +Arg.__new__.__defaults__ = (None, None, None, None, None, None, None) + + +class CLIFactory(object): + args = { + # Shared + 'dag_id': Arg(("dag_id",), "The id of the dag"), + 'task_id': Arg(("task_id",), "The id of the task"), + 'execution_date': Arg( + ("execution_date",), help="The execution date of the DAG", + type=parsedate), + 'task_regex': Arg( + ("-t", "--task_regex"), + "The regex to filter specific task_ids to backfill (optional)"), + 'subdir': Arg( + ("-sd", "--subdir"), + "File location or directory from which to look for the dag", + default=DAGS_FOLDER), + 'start_date': Arg( + ("-s", "--start_date"), "Override start_date YYYY-MM-DD", + type=parsedate), + 'end_date': Arg( + ("-e", "--end_date"), "Override end_date YYYY-MM-DD", + type=parsedate), + 'dry_run': Arg( + ("-dr", "--dry_run"), "Perform a dry run", "store_true"), + + # backfill + 'mark_success': Arg( + ("-m", "--mark_success"), + "Mark jobs as succeeded without running them", "store_true"), + 'local': Arg( + ("-l", "--local"), + "Run the task using the LocalExecutor", "store_true"), + 'donot_pickle': Arg( + ("-x", "--donot_pickle"), ( + "Do not attempt to pickle the DAG object to send over " + "to the workers, just tell the workers to run their version " + "of the code."), + "store_true"), + 'include_adhoc': Arg( + ("-a", "--include_adhoc"), + "Include dags with the adhoc parameter.", "store_true"), + 'bf_ignore_dependencies': Arg( + ("-i", "--ignore_dependencies"), + ( + "Skip upstream tasks, run only the tasks " + "matching the regexp. Only works in conjunction " + "with task_regex"), + "store_true"), + 'pool': Arg(("--pool",), "Resource pool to use"), + # list_dags + 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"), + # clear + 'upstream': Arg( + ("-u", "--upstream"), "Include upstream tasks", "store_true"), + 'only_failed': Arg( + ("-f", "--only_failed"), "Only failed jobs", "store_true"), + 'only_running': Arg( + ("-r", "--only_running"), "Only running jobs", "store_true"), + 'downstream': Arg( + ("-d", "--downstream"), "Include downstream tasks", "store_true"), + 'no_confirm': Arg( + ("-c", "--no_confirm"), + "Do not request confirmation", "store_true"), + # trigger_dag + 'run_id': Arg(("-r", "--run_id"), "Helps to indentify this run"), + 'conf': Arg( + ('-c', '--conf'), + "json string that gets pickled into the DagRun's conf attribute"), + # kerberos + 'principal': Arg( + ("principal",), "kerberos principal", + nargs='?', default=conf.get('kerberos', 'principal')), + 'keytab': Arg( + ("-kt", "--keytab"), "keytab", + nargs='?', default=conf.get('kerberos', 'keytab')), + # run + 'task_start_date': Arg( + ("-s", "--task_start_date"), + "Override the tasks's start_date (used internally)", + type=parsedate), + 'force': Arg( + ("-f", "--force"), + "Force a run regardless or previous success", "store_true"), + 'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"), + 'ignore_dependencies': Arg( + ("-i", "--ignore_dependencies"), + "Ignore upstream and depends_on_past dependencies", "store_true"), + 'ship_dag': Arg( + ("--ship_dag",), + "Pickles (serializes) the DAG and ships it to the worker", + "store_true"), + 'pickle': Arg( + ("-p", "--pickle"), + "Serialized pickle object of the entire dag (used internally)"), + 'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS), + # webserver + 'port': Arg( + ("-p", "--port"), + default=conf.get('webserver', 'WEB_SERVER_PORT'), + type=int, + help="The port on which to run the server"), + 'workers': Arg( + ("-w", "--workers"), + default=conf.get('webserver', 'WORKERS'), + type=int, + help="Number of workers to run the webserver on"), + 'workerclass': Arg( + ("-k", "--workerclass"), + default=conf.get('webserver', 'WORKER_CLASS'), + choices=['sync', 'eventlet', 'gevent', 'tornado'], + help="The worker class to use for gunicorn"), + 'hostname': Arg( + ("-hn", "--hostname"), + default=conf.get('webserver', 'WEB_SERVER_HOST'), + help="Set the hostname on which to run the web server"), + 'debug': Arg( + ("-d", "--debug"), + "Use the server that ships with Flask in debug mode", + "store_true"), + # resetdb + 'yes': Arg( + ("-y", "--yes"), + "Do not prompt to confirm reset. Use with care!", + "store_true", + default=False), + # scheduler + 'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"), + 'num_runs': Arg( + ("-n", "--num_runs"), + default=None, type=int, + help="Set the number of runs to execute before exiting"), + # worker + 'do_pickle': Arg( + ("-p", "--do_pickle"), default=False, - help="Do not prompt to confirm reset. Use with care!", - action="store_true") - parser_resetdb.set_defaults(func=resetdb) - - ht = "Upgrade metadata database to latest version" - parser_upgradedb = subparsers.add_parser('upgradedb', help=ht) - parser_upgradedb.set_defaults(func=upgradedb) - - ht = "List the DAGs" - parser_list_dags = subparsers.add_parser('list_dags', help=ht) - parser_list_dags.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_list_dags.set_defaults(func=list_dags) - - ht = "List the tasks within a DAG" - parser_list_tasks = subparsers.add_parser('list_tasks', help=ht) - parser_list_tasks.add_argument( - "-t", "--tree", help="Tree view", action="store_true") - parser_list_tasks.add_argument( - "dag_id", help="The id of the dag") - parser_list_tasks.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_list_tasks.set_defaults(func=list_tasks) - - ht = "Start a Celery worker node" - parser_worker = subparsers.add_parser('worker', help=ht) - parser_worker.add_argument( - "-q", "--queues", - help="Comma delimited list of queues to serve", - default=conf.get('celery', 'DEFAULT_QUEUE')) - parser_worker.add_argument( - "-c", "--concurrency", - type=int, - help="The number of worker processes", - default=conf.get('celery', 'celeryd_concurrency')) - parser_worker.set_defaults(func=worker) - - ht = "Serve logs generate by worker" - parser_logs = subparsers.add_parser('serve_logs', help=ht) - parser_logs.set_defaults(func=serve_logs) - - ht = "Start a Celery Flower" - parser_flower = subparsers.add_parser('flower', help=ht) - parser_flower.add_argument( - "-p", "--port", help="The port") - parser_flower.add_argument( - "-a", "--broker_api", help="Broker api") - parser_flower.set_defaults(func=flower) - - parser_version = subparsers.add_parser('version', help="Show version") - parser_version.set_defaults(func=version) - - ht = "Start a kerberos ticket renewer" - parser_kerberos = subparsers.add_parser('kerberos', help=ht) - parser_kerberos.add_argument( - "-kt", "--keytab", help="keytab", - nargs='?', default=conf.get('kerberos', 'keytab')) - parser_kerberos.add_argument( - "principal", help="kerberos principal", - nargs='?', default=conf.get('kerberos', 'principal')) - parser_kerberos.set_defaults(func=kerberos) - - ht = "Render a task instance's template(s)" - parser_render = subparsers.add_parser('render', help=ht) - parser_render.add_argument("dag_id", help="The id of the dag to check") - parser_render.add_argument("task_id", help="The task_id to check") - parser_render.add_argument( - "execution_date", help="The execution date to check") - parser_render.add_argument( - "-sd", "--subdir", help=subdir_help, - default=DAGS_FOLDER) - parser_render.set_defaults(func=render) - - return parser + help=( + "Attempt to pickle the DAG object to send over " + "to the workers, instead of letting workers run their version " + "of the code."), + action="store_true"), + 'queues': Arg( + ("-q", "--queues"), + help="Comma delimited list of queues to serve", + default=conf.get('celery', 'DEFAULT_QUEUE')), + 'concurrency': Arg( + ("-c", "--concurrency"), + type=int, + help="The number of worker processes", + default=conf.get('celery', 'celeryd_concurrency')), + # flower + 'broker_api': Arg(("-a", "--broker_api"), help="Broker api"), + 'flower_port': Arg( + ("-p", "--port"), + default=conf.get('webserver', 'WEB_SERVER_PORT'), + type=int, + help="The port on which to run the server"), + 'task_params': Arg( + ("-tp", "--task_params"), + help="Sends a JSON params dict to the task"), + } + subparsers = ( + { + 'func': backfill, + 'help': "Run subsections of a DAG for a specified date range", + 'args': ( + 'dag_id', 'task_regex', 'start_date', 'end_date', + 'mark_success', 'local', 'donot_pickle', 'include_adhoc', + 'bf_ignore_dependencies', 'subdir', 'pool', 'dry_run') + }, { + 'func': list_tasks, + 'help': "List the tasks within a DAG", + 'args': ('dag_id', 'tree', 'subdir'), + }, { + 'func': clear, + 'help': "Clear a set of task instance, as if they never ran", + 'args': ( + 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir', + 'upstream', 'downstream', 'no_confirm'), + }, { + 'func': pause, + 'help': "Pause a DAG", + 'args': ('dag_id', 'subdir'), + }, { + 'func': unpause, + 'help': "Pause a DAG", + 'args': ('dag_id', 'subdir'), + }, { + 'func': trigger_dag, + 'help': "Trigger a DAG run", + 'args': ('dag_id', 'subdir', 'run_id', 'conf'), + }, { + 'func': kerberos, + 'help': "Start a kerberos ticket renewer", + 'args': ('dag_id', 'principal', 'keytab'), + }, { + 'func': render, + 'help': "Render a task instance's template(s)", + 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), + }, { + 'func': run, + 'help': "Run a single task instance", + 'args': ( + 'dag_id', 'task_id', 'execution_date', 'subdir', + 'mark_success', 'force', 'pool', + 'task_start_date', 'local', 'raw', 'ignore_dependencies', + 'ship_dag', 'pickle', 'job_id'), + }, { + 'func': initdb, + 'help': "Initialize the metadata database", + 'args': tuple(), + }, { + 'func': list_dags, + 'help': "List all the DAGs", + 'args': ('subdir',), + }, { + 'func': task_state, + 'help': "Get the status of a task instance", + 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), + }, { + 'func': serve_logs, + 'help': "Serve logs generate by worker", + 'args': tuple(), + }, { + 'func': test, + 'help': ( + "Test a task instance. This will run a task without checking for " + "dependencies or recording it's state in the database."), + 'args': ( + 'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run', + 'task_params'), + }, { + 'func': scheduler, + 'help': "Start a Airflow webserver instance", + 'args': ('port', 'workers', 'workerclass', 'hostname', 'debug'), + }, { + 'func': resetdb, + 'help': "Burn down and rebuild the metadata database", + 'args': ('yes',), + }, { + 'func': upgradedb, + 'help': "Upgrade metadata database to latest version", + 'args': tuple(), + }, { + 'func': scheduler, + 'help': "Start a scheduler scheduler instance", + 'args': ('dag_id', 'subdir', 'num_runs', 'do_pickle'), + }, { + 'func': worker, + 'help': "Start a Celery worker node", + 'args': ('do_pickle', 'queues', 'concurrency'), + }, { + 'func': flower, + 'help': "Start a Celery Flower", + 'args': ('flower_port', 'broker_api'), + }, { + 'func': version, + 'help': "Show the version", + 'args': tuple(), + }, + ) + subparsers_dict = {sp['func'].__name__: sp for sp in subparsers} + dag_subparsers = ( + 'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause') + + @classmethod + def get_parser(cls, dag_parser=False): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers( + help='sub-command help', dest='subcommand') + subparsers.required = True + + subparser_list = cls.dag_subparsers if dag_parser else cls.subparsers_dict.keys() + for sub in subparser_list: + sub = cls.subparsers_dict[sub] + sp = subparsers.add_parser(sub['func'].__name__, help=sub['help']) + for arg in sub['args']: + if 'dag_id' in arg and dag_parser: + continue + arg = cls.args[arg] + kwargs = { + f: getattr(arg, f) + for f in arg._fields if f != 'flags' and getattr(arg, f)} + sp.add_argument(*arg.flags, **kwargs) + sp.set_defaults(func=sub['func']) + return parser diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index da2918113044f..4ab9144fa45e9 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -35,3 +35,6 @@ bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag) task.set_downstream(run_this_last) + +if __name__ == "__main__": + dag.cli() diff --git a/airflow/models.py b/airflow/models.py index ed69887d347b2..851bbdfc702a6 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -2610,6 +2610,15 @@ def run( pool=pool) job.run() + def cli(self): + """ + Exposes a CLI specific to this DAG + """ + from airflow.bin import cli + parser = cli.CLIFactory.get_parser(dag_parser=True) + args = parser.parse_args() + args.func(args, self) + class Chart(Base): __tablename__ = "chart" diff --git a/tests/core.py b/tests/core.py index bc284139f1458..6f36c207cd5f2 100644 --- a/tests/core.py +++ b/tests/core.py @@ -623,7 +623,7 @@ def setUp(self): configuration.test_mode() app = application.create_app() app.config['TESTING'] = True - self.parser = cli.get_parser() + self.parser = cli.CLIFactory.get_parser() self.dagbag = models.DagBag( dag_folder=DEV_NULL, include_examples=True)