Skip to content

Commit

Permalink
[AIRFLOW-385] Add symlink to latest scheduler log directory
Browse files Browse the repository at this point in the history
Create a symbolic link to the directory contaning
the latest scheduler logs, and update the link
when the target changes.

Update the test_scheduler_job test case to verify
that the symbolic link is created.

Implementation:
- Create a symbolic link to the directory
containing the latest scheduler logs, and update
the link when the target changes.

Testing Done:
- Extend test_scheduler_job test case to verify
that the correct symbolic link is created.

Closes apache#1842 from vijaysbhat/latest-log-symlink
  • Loading branch information
vijaysbhat authored and aoen committed Oct 21, 2016
1 parent e4cca0d commit 8911903
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
42 changes: 39 additions & 3 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,18 @@ def _split_path(file_path):
results.reverse()
return results

def _get_log_directory(self):
"""
Log output from processing DAGs for the current day should go into
this directory.
:return: the path to the corresponding log directory
:rtype: unicode
"""
now = datetime.now()
return os.path.join(self._child_process_log_directory,
now.strftime("%Y-%m-%d"))

def _get_log_file_path(self, dag_file_path):
"""
Log output from processing the specified file should go to this
Expand All @@ -475,11 +487,9 @@ def _get_log_file_path(self, dag_file_path):
:return: the path to the corresponding log file
:rtype: unicode
"""
log_directory = self._get_log_directory()
# General approach is to put the log file under the same relative path
# under the log directory as the DAG file in the DAG directory
now = datetime.now()
log_directory = os.path.join(self._child_process_log_directory,
now.strftime("%Y-%m-%d"))
relative_dag_file_path = os.path.relpath(dag_file_path, start=self._dag_directory)
path_elements = self._split_path(relative_dag_file_path)

Expand All @@ -488,6 +498,30 @@ def _get_log_file_path(self, dag_file_path):

return os.path.join(log_directory, *path_elements)

def symlink_latest_log_directory(self):
"""
Create symbolic link to the current day's log directory to
allow easy access to the latest scheduler log files.
:return: None
"""
log_directory = self._get_log_directory()
latest_log_directory_path = os.path.join(
self._child_process_log_directory, "latest")
if (os.path.isdir(log_directory)):
# if symlink exists but is stale, update it
if (os.path.islink(latest_log_directory_path)):
if(os.readlink(latest_log_directory_path) != log_directory):
os.unlink(latest_log_directory_path)
os.symlink(log_directory, latest_log_directory_path)
elif (os.path.isdir(latest_log_directory_path) or
os.path.isfile(latest_log_directory_path)):
self.logger.warn("{} already exists as a dir/file. "
"Skip creating symlink."
.format(latest_log_directory_path))
else:
os.symlink(log_directory, latest_log_directory_path)

def processing_count(self):
"""
:return: the number of files currently being processed
Expand Down Expand Up @@ -592,6 +626,8 @@ def heartbeat(self):

self._processors[file_path] = processor

self.symlink_latest_log_directory()

return simple_dags

def max_runs_reached(self):
Expand Down
11 changes: 11 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,21 @@ def test_local_task_job(self):
job = jobs.LocalTaskJob(task_instance=ti, ignore_ti_state=True)
job.run()

@mock.patch('airflow.utils.dag_processing.datetime', FakeDatetime)
def test_scheduler_job(self):
FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 1))
job = jobs.SchedulerJob(dag_id='example_bash_operator',
**self.default_scheduler_args)
job.run()
log_base_directory = configuration.conf.get("scheduler",
"child_process_log_directory")
latest_log_directory_path = os.path.join(log_base_directory, "latest")
# verify that the symlink to the latest logs exists
assert os.path.islink(latest_log_directory_path)

# verify that the symlink points to the correct log directory
log_directory = os.path.join(log_base_directory, "2016-01-01")
self.assertEqual(os.readlink(latest_log_directory_path), log_directory)

def test_raw_job(self):
TI = models.TaskInstance
Expand Down

0 comments on commit 8911903

Please sign in to comment.