Skip to content

Commit

Permalink
Action log on Browse Views (apache#21569)
Browse files Browse the repository at this point in the history
  • Loading branch information
pingzh authored Feb 15, 2022
1 parent 8d980cb commit a55d714
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
16 changes: 9 additions & 7 deletions airflow/www/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,21 @@ def wrapper(*args, **kwargs):
user = g.user.username

fields_skip_logging = {'csrf_token', '_csrf_token'}
log_fields = {
k: v
for k, v in chain(request.values.items(), request.view_args.items())
extra_fields = [
(k, v)
for k, v in chain(request.values.items(multi=True), request.view_args.items())
if k not in fields_skip_logging
}
]

params = {k: v for k, v in chain(request.values.items(), request.view_args.items())}

log = Log(
event=f.__name__,
task_instance=None,
owner=user,
extra=str([(k, log_fields[k]) for k in log_fields]),
task_id=log_fields.get('task_id'),
dag_id=log_fields.get('dag_id'),
extra=str(extra_fields),
task_id=params.get('task_id'),
dag_id=params.get('dag_id'),
)

if 'execution_date' in request.values:
Expand Down
13 changes: 13 additions & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4315,6 +4315,7 @@ def duration_f(self):

@action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
@action_has_dag_edit_access
@action_logging
def action_muldelete(self, items: List[DagRun]):
"""Multiple delete."""
self.datamodel.delete_all(items)
Expand All @@ -4323,12 +4324,14 @@ def action_muldelete(self, items: List[DagRun]):

@action('set_queued', "Set state to 'queued'", '', single=False)
@action_has_dag_edit_access
@action_logging
def action_set_queued(self, drs: List[DagRun]):
"""Set state to queued."""
return self._set_dag_runs_to_active_state(drs, State.QUEUED)

@action('set_running', "Set state to 'running'", '', single=False)
@action_has_dag_edit_access
@action_logging
def action_set_running(self, drs: List[DagRun]):
"""Set state to running."""
return self._set_dag_runs_to_active_state(drs, State.RUNNING)
Expand Down Expand Up @@ -4358,6 +4361,7 @@ def _set_dag_runs_to_active_state(self, drs: List[DagRun], state: str, session=N
)
@action_has_dag_edit_access
@provide_session
@action_logging
def action_set_failed(self, drs: List[DagRun], session=None):
"""Set state to failed."""
try:
Expand All @@ -4382,6 +4386,7 @@ def action_set_failed(self, drs: List[DagRun], session=None):
)
@action_has_dag_edit_access
@provide_session
@action_logging
def action_set_success(self, drs: List[DagRun], session=None):
"""Set state to success."""
try:
Expand All @@ -4401,6 +4406,7 @@ def action_set_success(self, drs: List[DagRun], session=None):
@action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
@action_has_dag_edit_access
@provide_session
@action_logging
def action_clear(self, drs: List[DagRun], session=None):
"""Clears the state."""
try:
Expand Down Expand Up @@ -4690,6 +4696,7 @@ def duration_f(self):
)
@action_has_dag_edit_access
@provide_session
@action_logging
def action_clear(self, task_instances, session=None):
"""Clears the action."""
try:
Expand All @@ -4711,6 +4718,7 @@ def action_clear(self, task_instances, session=None):

@action('muldelete', 'Delete', "Are you sure you want to delete selected records?", single=False)
@action_has_dag_edit_access
@action_logging
def action_muldelete(self, items):
self.datamodel.delete_all(items)
self.update_redirect()
Expand All @@ -4730,6 +4738,7 @@ def set_task_instance_state(self, tis, target_state, session=None):

@action('set_running', "Set state to 'running'", '', single=False)
@action_has_dag_edit_access
@action_logging
def action_set_running(self, tis):
"""Set state to 'running'"""
self.set_task_instance_state(tis, State.RUNNING)
Expand All @@ -4738,6 +4747,7 @@ def action_set_running(self, tis):

@action('set_failed', "Set state to 'failed'", '', single=False)
@action_has_dag_edit_access
@action_logging
def action_set_failed(self, tis):
"""Set state to 'failed'"""
self.set_task_instance_state(tis, State.FAILED)
Expand All @@ -4746,6 +4756,7 @@ def action_set_failed(self, tis):

@action('set_success', "Set state to 'success'", '', single=False)
@action_has_dag_edit_access
@action_logging
def action_set_success(self, tis):
"""Set state to 'success'"""
self.set_task_instance_state(tis, State.SUCCESS)
Expand All @@ -4754,6 +4765,7 @@ def action_set_success(self, tis):

@action('set_retry', "Set state to 'up_for_retry'", '', single=False)
@action_has_dag_edit_access
@action_logging
def action_set_retry(self, tis):
"""Set state to 'up_for_retry'"""
self.set_task_instance_state(tis, State.UP_FOR_RETRY)
Expand All @@ -4762,6 +4774,7 @@ def action_set_retry(self, tis):

@action('set_skipped', "Set state to 'skipped'", '', single=False)
@action_has_dag_edit_access
@action_logging
def action_set_skipped(self, tis):
"""Set state to skipped."""
self.set_task_instance_state(tis, TaskInstanceState.SKIPPED)
Expand Down

0 comments on commit a55d714

Please sign in to comment.