Skip to content

Commit

Permalink
Add job name and progress logs to Cloud Storage Transfer Hook (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcarp authored Nov 3, 2020
1 parent 5c199fb commit e324b37
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class GcpTransferOperationStatus:
AWS_S3_DATA_SOURCE = 'awsS3DataSource'
BODY = 'body'
BUCKET_NAME = 'bucketName'
JOB_NAME = 'name'
COUNTERS = 'counters'
DAY = 'day'
DESCRIPTION = "description"
FILTER = 'filter'
Expand All @@ -72,6 +72,7 @@ class GcpTransferOperationStatus:
GCS_DATA_SOURCE = 'gcsDataSource'
HOURS = "hours"
HTTP_DATA_SOURCE = 'httpDataSource'
JOB_NAME = 'name'
LIST_URL = 'list_url'
METADATA = 'metadata'
MINUTES = "minutes"
Expand All @@ -89,8 +90,8 @@ class GcpTransferOperationStatus:
STATUS = "status"
STATUS1 = 'status'
TRANSFER_JOB = 'transfer_job'
TRANSFER_JOB_FIELD_MASK = 'update_transfer_job_field_mask'
TRANSFER_JOBS = 'transferJobs'
TRANSFER_JOB_FIELD_MASK = 'update_transfer_job_field_mask'
TRANSFER_OPERATIONS = 'transferOperations'
TRANSFER_OPTIONS = 'transfer_options'
TRANSFER_SPEC = 'transferSpec'
Expand Down Expand Up @@ -193,6 +194,7 @@ def create_transfer_job(self, body: dict) -> dict:
return self.enable_transfer_job(job_name=job_name, project_id=body.get(PROJECT_ID))
else:
raise e
self.log.info("Created job %s", transfer_job[NAME])
return transfer_job

@GoogleBaseHook.fallback_to_default_project_id
Expand Down Expand Up @@ -467,13 +469,13 @@ def wait_for_transfer_job(

start_time = time.time()
while time.time() - start_time < timeout:
operations = self.list_transfer_operations(
request_filter={FILTER_PROJECT_ID: job[PROJECT_ID], FILTER_JOB_NAMES: [job[NAME]]}
)
request_filter = {FILTER_PROJECT_ID: job[PROJECT_ID], FILTER_JOB_NAMES: [job[NAME]]}
operations = self.list_transfer_operations(request_filter=request_filter)

for operation in operations:
self.log.info("Progress for operation %s: %s", operation[NAME], operation[METADATA][COUNTERS])

if CloudDataTransferServiceHook.operations_contain_expected_statuses(
operations, expected_statuses
):
if self.operations_contain_expected_statuses(operations, expected_statuses):
return
time.sleep(TIME_TO_SLEEP_IN_SECONDS)
raise AirflowException("Timeout. The operation could not be completed within the allotted time.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
"""This module contains a Google Cloud Transfer sensor."""
from typing import Optional, Sequence, Set, Union

from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import CloudDataTransferServiceHook
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
CloudDataTransferServiceHook,
COUNTERS,
METADATA,
NAME,
)
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -91,6 +96,9 @@ def poke(self, context: dict) -> bool:
request_filter={'project_id': self.project_id, 'job_names': [self.job_name]}
)

for operation in operations:
self.log.info("Progress for operation %s: %s", operation[NAME], operation[METADATA][COUNTERS])

check = CloudDataTransferServiceHook.operations_contain_expected_statuses(
operations=operations, expected_statuses=self.expected_statuses
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service import (
COUNTERS,
DESCRIPTION,
FILTER_JOB_NAMES,
FILTER_PROJECT_ID,
Expand Down Expand Up @@ -81,6 +82,12 @@
TEST_RESULT_STATUS_DISABLED = {STATUS: GcpTransferJobsStatus.DISABLED}
TEST_RESULT_STATUS_DELETED = {STATUS: GcpTransferJobsStatus.DELETED}

TEST_NAME = "transferOperations/transferJobs-123-456"
TEST_COUNTERS = {
"bytesFoundFromSource": 512,
"bytesCopiedToSink": 1024,
}


def _without_key(body, key):
obj = deepcopy(body)
Expand Down Expand Up @@ -370,8 +377,24 @@ def test_resume_transfer_operation(self, get_conn):
)
def test_wait_for_transfer_job(self, mock_list, mock_sleep, mock_project_id):
mock_list.side_effect = [
[{METADATA: {STATUS: GcpTransferOperationStatus.IN_PROGRESS}}],
[{METADATA: {STATUS: GcpTransferOperationStatus.SUCCESS}}],
[
{
NAME: TEST_NAME,
METADATA: {
STATUS: GcpTransferOperationStatus.IN_PROGRESS,
COUNTERS: TEST_COUNTERS,
},
},
],
[
{
NAME: TEST_NAME,
METADATA: {
STATUS: GcpTransferOperationStatus.SUCCESS,
COUNTERS: TEST_COUNTERS,
},
},
],
]

job_name = 'transferJobs/test-job'
Expand Down Expand Up @@ -400,7 +423,13 @@ def test_wait_for_transfer_job_failed(self, mock_get_conn, mock_sleep, mock_proj
list_execute_method = list_method.return_value.execute
list_execute_method.return_value = {
OPERATIONS: [
{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: GcpTransferOperationStatus.FAILED}}
{
NAME: TEST_TRANSFER_OPERATION_NAME,
METADATA: {
STATUS: GcpTransferOperationStatus.FAILED,
COUNTERS: TEST_COUNTERS,
},
}
]
}

Expand All @@ -427,7 +456,13 @@ def test_wait_for_transfer_job_expect_failed(
list_execute_method = list_method.return_value.execute
list_execute_method.return_value = {
OPERATIONS: [
{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: GcpTransferOperationStatus.FAILED}}
{
NAME: TEST_TRANSFER_OPERATION_NAME,
METADATA: {
STATUS: GcpTransferOperationStatus.FAILED,
COUNTERS: TEST_COUNTERS,
},
}
]
}

Expand Down Expand Up @@ -498,7 +533,10 @@ def test_operations_contain_expected_statuses_red_path(self, statuses, expected_
]
)
def test_operations_contain_expected_statuses_green_path(self, statuses, expected_statuses):
operations = [{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: status}} for status in statuses]
operations = [
{NAME: TEST_TRANSFER_OPERATION_NAME, METADATA: {STATUS: status, COUNTERS: TEST_COUNTERS}}
for status in statuses
]

result = CloudDataTransferServiceHook.operations_contain_expected_statuses(
operations, expected_statuses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,27 @@
CloudDataTransferServiceJobStatusSensor,
)

TEST_NAME = "transferOperations/transferJobs-123-456"
TEST_COUNTERS = {
"bytesFoundFromSource": 512,
"bytesCopiedToSink": 1024,
}


class TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
@mock.patch(
'airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook'
)
def test_wait_for_status_success(self, mock_tool):
operations = [{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}]
operations = [
{
'name': TEST_NAME,
'metadata': {
'status': GcpTransferOperationStatus.SUCCESS,
'counters': TEST_COUNTERS,
},
}
]
mock_tool.return_value.list_transfer_operations.return_value = operations
mock_tool.operations_contain_expected_statuses.return_value = True

Expand Down Expand Up @@ -79,8 +93,24 @@ def test_wait_for_status_success_default_expected_status(self, mock_tool):
)
def test_wait_for_status_after_retry(self, mock_tool):
operations_set = [
[{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}],
[{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}],
[
{
'name': TEST_NAME,
'metadata': {
'status': GcpTransferOperationStatus.SUCCESS,
'counters': TEST_COUNTERS,
},
},
],
[
{
'name': TEST_NAME,
'metadata': {
'status': GcpTransferOperationStatus.SUCCESS,
'counters': TEST_COUNTERS,
},
},
],
]

mock_tool.return_value.list_transfer_operations.side_effect = operations_set
Expand Down Expand Up @@ -124,7 +154,15 @@ def test_wait_for_status_after_retry(self, mock_tool):
'airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook'
)
def test_wait_for_status_normalize_status(self, expected_status, received_status, mock_tool):
operations = [{'metadata': {'status': GcpTransferOperationStatus.SUCCESS}}]
operations = [
{
'name': TEST_NAME,
'metadata': {
'status': GcpTransferOperationStatus.SUCCESS,
'counters': TEST_COUNTERS,
},
}
]

mock_tool.return_value.list_transfer_operations.return_value = operations
mock_tool.operations_contain_expected_statuses.side_effect = [False, True]
Expand Down

0 comments on commit e324b37

Please sign in to comment.