Skip to content

Commit

Permalink
Extract and emit bigquery input schema (MarquezProject#104)
Browse files Browse the repository at this point in the history
* Extract and emit bigquery input schema

Signed-off-by: henneberger <[email protected]>

* Check for nulls & include debug info

Signed-off-by: henneberger <[email protected]>

* Add empty property check

Signed-off-by: henneberger <[email protected]>

Co-authored-by: Willy Lulciuc <[email protected]>
  • Loading branch information
henneberger and wslulciuc authored Nov 30, 2020
1 parent 965a794 commit 9575a17
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 6 deletions.
58 changes: 52 additions & 6 deletions marquez_airflow/extractors/bigquery_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Source,
Dataset
)
from marquez_airflow.models import DbTableSchema, DbColumn
from marquez_airflow.extractors.sql.experimental.parser import SqlParser
from marquez_airflow.utils import (
get_job_name
Expand Down Expand Up @@ -95,12 +96,24 @@ def extract_on_complete(self, task_instance) -> [StepMetadata]:
input_table_names = [
self._bq_table_name(bq_t) for bq_t in bq_input_tables
]
inputs = [
Dataset.from_table(source, table)
for table in input_table_names
]
bq_output_table = job_properties.get('configuration')\
.get('query')\
try:
inputs = [
Dataset.from_table_schema(
source=source,
table_schema=table_schema
)
for table_schema in self._get_table_schemas(
input_table_names, client
)
]
except Exception as e:
log.warn(f'Could not extract schema from bigquery. {e}')
inputs = [
Dataset.from_table(source, table)
for table in input_table_names
]
bq_output_table = job_properties.get('configuration') \
.get('query') \
.get('destinationTable')
output_table_name = self._bq_table_name(bq_output_table)
outputs = [
Expand All @@ -123,6 +136,39 @@ def extract_on_complete(self, task_instance) -> [StepMetadata]:
context=context
)]

def _get_table_schemas(self, tables: [str], client: bigquery.Client) \
-> [DbTableSchema]:
# Avoid querying postgres by returning an empty array
# if no tables have been provided.
if not tables:
return []

return [self._get_table(table, client) for table in tables]

def _get_table(self, table: str, client: bigquery.Client) -> DbTableSchema:
bq_table = client.get_table(table)
if not bq_table._properties:
return
table = bq_table._properties

if not table.get('schema') or not table.get('schema').get('fields'):
return

fields = table.get('schema').get('fields')
columns = [DbColumn(
name=fields[i].get('name'),
type=fields[i].get('type'),
description=fields[i].get('description'),
ordinal_position=i
) for i in range(len(fields))]

return DbTableSchema(
schema_name=table.get('tableReference').get('projectId') + '.' +
table.get('tableReference').get('datasetId'),
table_name=table.get('tableReference').get('tableId'),
columns=columns
)

def _get_bigquery_job_id(self, task_instance):
bigquery_job_id = task_instance.xcom_pull(
task_ids=task_instance.task_id, key='job_id')
Expand Down
53 changes: 53 additions & 0 deletions tests/extractors/table_details.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"kind": "bigquery#table",
"etag": "L7YwMYGtkofoiqqF8tXSDA==",
"id": "bigquery-public-data:usa_names.usa_1910_2013",
"selfLink": "https://bigquery.googleapis.com/bigquery/v2/projects/bigquery-public-data/datasets/usa_names/tables/usa_1910_2013",
"tableReference": {
"projectId": "bigquery-public-data",
"datasetId": "usa_names",
"tableId": "usa_1910_2013"
},
"description": "The table contains the number of applicants for a Social Security card by year of birth and sex. The number of such applicants is restricted to U.S. births where the year of birth, sex, State of birth (50 States and District of Columbia) are known, and where the given name is at least 2 characters long.\n\nsource: http://www.ssa.gov/OACT/babynames/limits.html",
"schema": {
"fields": [
{
"name": "state",
"type": "STRING",
"mode": "NULLABLE",
"description": "2-digit state code"
},
{
"name": "gender",
"type": "STRING",
"mode": "NULLABLE",
"description": "Sex (M=male or F=female)"
},
{
"name": "year",
"type": "INTEGER",
"mode": "NULLABLE",
"description": "4-digit year of birth"
},
{
"name": "name",
"type": "STRING",
"mode": "NULLABLE",
"description": "Given name of a person at birth"
},
{
"name": "number",
"type": "INTEGER",
"mode": "NULLABLE",
"description": "Number of occurrences of the name"
}
]
},
"numBytes": "171432506",
"numLongTermBytes": "171432506",
"numRows": "5552452",
"creationTime": "1457744542425",
"lastModifiedTime": "1457746213452",
"type": "TABLE",
"location": "US"
}
14 changes: 14 additions & 0 deletions tests/extractors/test_bigquery_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def test_extract(self, mock_client, mock_hook):
job_details = json.loads(job_details_file.read())
job_details_file.close()

table_details_file = open(
file="tests/extractors/table_details.json",
mode="r"
)
table_details = json.loads(table_details_file.read())
job_details_file.close()

bq_job_id = "foo.bq.job_id"

mock_hook.return_value \
Expand All @@ -55,6 +62,10 @@ def test_extract(self, mock_client, mock_hook):
.get_job.return_value \
._properties = job_details

mock_client.return_value \
.get_table.return_value \
._properties = table_details

mock_client.return_value.close.return_value

mock.seal(mock_hook)
Expand Down Expand Up @@ -99,6 +110,9 @@ def test_extract(self, mock_client, mock_hook):
assert len(steps_meta[0].inputs) == 1
assert steps_meta[0].inputs[0].name == \
'bigquery-public-data.usa_names.usa_1910_2013'

assert steps_meta[0].inputs[0].fields is not None
assert len(steps_meta[0].inputs[0].fields) == 5
assert steps_meta[0].outputs is not None
assert len(steps_meta[0].outputs) == 1
assert steps_meta[0].outputs[0].name == \
Expand Down

0 comments on commit 9575a17

Please sign in to comment.