Skip to content

Commit

Permalink
Improve handling of job_id in BigQuery operators (apache#11287)
Browse files Browse the repository at this point in the history
Make autogenerated job_id more unique by using microseconds and hash of configuration. Replace dots in job_id.
Closes: apache#11280
  • Loading branch information
turbaszek authored Oct 7, 2020
1 parent 18dcac8 commit 47b05a8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
14 changes: 13 additions & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
This module contains a BigQuery Hook, as well as a very basic PEP 249
implementation for BigQuery.
"""
import hashlib
import json
import logging
import time
import warnings
from copy import deepcopy
from datetime import timedelta, datetime
from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Type, Union

from google.api_core.retry import Retry
Expand Down Expand Up @@ -1443,6 +1446,15 @@ def get_job(
job = client.get_job(job_id=job_id, project=project_id, location=location)
return job

@staticmethod
def _custom_job_id(configuration: Dict[str, Any]) -> str:
hash_base = json.dumps(configuration, sort_keys=True)
uniqueness_suffix = hashlib.md5(hash_base.encode()).hexdigest()
microseconds_from_epoch = int(
(datetime.now() - datetime.fromtimestamp(0)) / timedelta(microseconds=1)
)
return f"airflow_{microseconds_from_epoch}_{uniqueness_suffix}"

@GoogleBaseHook.fallback_to_default_project_id
def insert_job(
self,
Expand Down Expand Up @@ -1472,7 +1484,7 @@ def insert_job(
:type location: str
"""
location = location or self.location
job_id = job_id or f"airflow_{int(time.time())}"
job_id = job_id or self._custom_job_id(configuration)

client = self.get_client(project_id=project_id, location=location)
job_data = {
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2068,8 +2068,9 @@ def _job_id(self, context):
if self.job_id:
return f"{self.job_id}_{uniqueness_suffix}"

exec_date = re.sub(r"\:|-|\+", "_", context['execution_date'].isoformat())
return f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}"
exec_date = context['execution_date'].isoformat()
job_id = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}"
return re.sub(r"\:|-|\+\.", "_", job_id)

def execute(self, context: Any):
hook = BigQueryHook(
Expand Down

0 comments on commit 47b05a8

Please sign in to comment.