Skip to content

Commit

Permalink
Added SDFtoGCSOperator (apache#8740)
Browse files Browse the repository at this point in the history
Co-authored-by: michalslowikowski00 <[email protected]>
  • Loading branch information
michalslowikowski00 and michalslowikowski00 authored May 8, 2020
1 parent b7566e1 commit 58aefb2
Show file tree
Hide file tree
Showing 10 changed files with 718 additions and 50 deletions.
22 changes: 21 additions & 1 deletion airflow/providers/google/common/hooks/base_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from google.auth import _cloud_sdk
from google.auth.environment_vars import CREDENTIALS
from googleapiclient.errors import HttpError
from googleapiclient.http import set_user_agent
from googleapiclient.http import MediaIoBaseDownload, set_user_agent

from airflow import version
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -456,3 +456,23 @@ def provide_authorized_gcloud(self):
creds_content["refresh_token"],
])
yield

@staticmethod
def download_content_from_request(file_handle, request, chunk_size):
"""
Download media resources.
Note that the Python file object is compatible with io.Base and can be used with this class also.
:param file_handle: io.Base or file object. The stream in which to write the downloaded
bytes.
:type file_handle: io.Base or file object
:param request: googleapiclient.http.HttpRequest, the media request to perform in chunks.
:type request: Dict
:param chunk_size: int, File will be downloaded in chunks of this many bytes.
:type chunk_size: int
"""
downloader = MediaIoBaseDownload(file_handle, request, chunksize=chunk_size)
done = False
while done is False:
_, done = downloader.next_chunk()
file_handle.flush()
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,34 @@
Example Airflow DAG that shows how to use DisplayVideo.
"""
import os
from typing import Dict

from airflow import models
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator,
GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator,
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360ReportSensor,
GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor,
)
from airflow.utils import dates

# [START howto_display_video_env_variables]
BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket")
ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567)
OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv")
PATH_TO_UPLOAD_FILE = os.environ.get(
"GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt"
)
PATH_TO_SAVED_FILE = os.environ.get(
"GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt"
)
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")

REPORT = {
"kind": "doubleclickbidmanager#query",
Expand All @@ -55,14 +67,16 @@
}

PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}

BODY_REQUEST: Dict = {
"version": SDF_VERSION,
"advertiserId": ADVERTISER_ID,
"inventorySourceFilter": {"inventorySourceIds": []},
}
# [END howto_display_video_env_variables]

# download_line_items variables
REQUEST_BODY = {
"filterType": ADVERTISER_ID,
"format": "CSV",
"fileSpec": "EWF"
}
REQUEST_BODY = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"}

default_args = {"start_date": dates.days_ago(1)}

Expand Down Expand Up @@ -119,7 +133,47 @@
upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator(
task_id="upload_line_items",
bucket_name=BUCKET,
object_name=OBJECT_NAME,
object_name=BUCKET_FILE_LOCATION,
)
# [END howto_google_display_video_upload_line_items_operator]

# [START howto_google_display_video_create_sdf_download_task_operator]
create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator(
task_id="create_sdf_download_task", body_request=BODY_REQUEST
)
operation_name = '{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'
# [END howto_google_display_video_create_sdf_download_task_operator]

# [START howto_google_display_video_wait_for_operation_sensor]
wait_for_operation = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
task_id="wait_for_operation", operation_name=operation_name,
)
# [END howto_google_display_video_wait_for_operation_sensor]

# [START howto_google_display_video_save_sdf_in_gcs_operator]
save_sdf_in_gcs = GoogleDisplayVideo360SDFtoGCSOperator(
task_id="save_sdf_in_gcs",
operation_name=operation_name,
bucket_name=BUCKET,
object_name=BUCKET_FILE_LOCATION,
gzip=False,
)
# [END howto_google_display_video_save_sdf_in_gcs_operator]

# [START howto_google_display_video_gcs_to_big_query_operator]
upload_sdf_to_big_query = GCSToBigQueryOperator(
task_id="upload_sdf_to_big_query",
bucket=BUCKET,
source_objects=['{{ task_instance.xcom_pull("upload_sdf_to_bigquery")}}'],
destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table",
schema_fields=[
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
],
write_disposition="WRITE_TRUNCATE",
dag=dag,
)
# [END howto_google_display_video_gcs_to_big_query_operator]

create_report >> run_report >> wait_for_report >> get_report >> delete_report
create_sdf_download_task >> wait_for_operation >> save_sdf_in_gcs >> upload_sdf_to_big_query
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ def get_conn(self) -> Resource:
)
return self._conn

def get_conn_to_display_video(self) -> Resource:
"""
Retrieves connection to DisplayVideo.
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"displayvideo",
self.api_version,
http=http_authorized,
cache_discovery=False,
)
return self._conn

def create_query(self, query: Dict[str, Any]) -> Dict:
"""
Creates a query.
Expand Down Expand Up @@ -111,7 +125,7 @@ def list_queries(self, ) -> List[Dict]:
.listqueries()
.execute(num_retries=self.num_retries)
)
return response.get('queries', [])
return response.get("queries", [])

def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
"""
Expand Down Expand Up @@ -170,3 +184,54 @@ def download_line_items(self, request_body: Dict[str, Any]) -> List[Any]:
.execute(num_retries=self.num_retries)
)
return response["lineItems"]

def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]:
"""
Creates an SDF Download Task and Returns an Operation.
:param body_request: Body request.
:type body_request: Dict[str, Any]
More information about body request n be found here:
https://developers.google.com/display-video/api/reference/rest/v1/sdfdownloadtasks/create
"""

result = (
self.get_conn_to_display_video() # pylint: disable=no-member
.sdfdownloadtasks()
.create(body=body_request)
.execute(num_retries=self.num_retries)
)
return result

def get_sdf_download_operation(self, operation_name: str):
"""
Gets the latest state of an asynchronous SDF download task operation.
:param operation_name: The name of the operation resource.
:type operation_name: str
"""

result = (
self.get_conn_to_display_video() # pylint: disable=no-member
.sdfdownloadtasks()
.operation()
.get(name=operation_name)
.execute(num_retries=self.num_retries)
)
return result

def download_media(self, resource_name: str):
"""
Downloads media.
:param resource_name: of the media that is being downloaded.
:type resource_name: str
"""

request = (
self.get_conn_to_display_video() # pylint: disable=no-member
.media()
.download_media(resource_name=resource_name)
)
return request
Loading

0 comments on commit 58aefb2

Please sign in to comment.