Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
feat: Enable Support for Teradata Metadata Scanner (amundsen-io#1791)
Browse files Browse the repository at this point in the history
Signed-off-by: Deepak Bhatt <[email protected]>
  • Loading branch information
DeepakBhatt13 authored Apr 2, 2022
1 parent d2de0cb commit 7f269e3
Show file tree
Hide file tree
Showing 9 changed files with 915 additions and 1 deletion.
141 changes: 141 additions & 0 deletions databuilder/databuilder/extractor/base_teradata_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import abc
import logging
from collections import namedtuple
from itertools import groupby
from typing import (
Any, Dict, Iterator, Union,
)

from pyhocon import ConfigFactory, ConfigTree

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata

TableKey = namedtuple("TableKey", ["schema", "table_name"])

LOGGER = logging.getLogger(__name__)


class BaseTeradataMetadataExtractor(Extractor):
"""
Extracts Teradata table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = "where_clause_suffix"
CLUSTER_KEY = "cluster_key"
USE_CATALOG_AS_CLUSTER_NAME = "use_catalog_as_cluster_name"
DATABASE_KEY = "database_key"

# Default values
DEFAULT_CLUSTER_NAME = "master"

DEFAULT_CONFIG = ConfigFactory.from_dict(
{
WHERE_CLAUSE_SUFFIX_KEY: "true",
CLUSTER_KEY: DEFAULT_CLUSTER_NAME,
USE_CATALOG_AS_CLUSTER_NAME: True,
}
)

@abc.abstractmethod
def get_sql_statement(
self, use_catalog_as_cluster_name: bool, where_clause_suffix: str
) -> Any:
"""
:return: Provides a record or None if no more to extract
"""
return None

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(BaseTeradataMetadataExtractor.DEFAULT_CONFIG)
self._cluster = conf.get_string(BaseTeradataMetadataExtractor.CLUSTER_KEY)

self._database = conf.get_string(
BaseTeradataMetadataExtractor.DATABASE_KEY, default="teradata"
)

self.sql_stmt = self.get_sql_statement(
use_catalog_as_cluster_name=conf.get_bool(
BaseTeradataMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME
),
where_clause_suffix=conf.get_string(
BaseTeradataMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY
),
)

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(
conf, self._alchemy_extractor.get_scope()
).with_fallback(
ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})
)

self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)

LOGGER.info("SQL for teradata metadata: %s", self.sql_stmt)

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(
ColumnMetadata(
row["col_name"],
row["col_description"],
row["col_type"],
row["col_sort_order"],
)
)

yield TableMetadata(
self._database,
last_row["td_cluster"],
last_row["schema"],
last_row["name"],
last_row["description"],
columns,
)

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row["schema"], table_name=row["name"])

return None
46 changes: 46 additions & 0 deletions databuilder/databuilder/extractor/teradata_metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from typing import ( # noqa: F401
Any, Dict, Iterator, Union,
)

from pyhocon import ConfigFactory, ConfigTree # noqa: F401

from databuilder.extractor.base_teradata_metadata_extractor import BaseTeradataMetadataExtractor


class TeradataMetadataExtractor(BaseTeradataMetadataExtractor):
"""
Extracts Teradata table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""

def get_sql_statement(
self, use_catalog_as_cluster_name: bool, where_clause_suffix: str
) -> str:
if use_catalog_as_cluster_name:
cluster_source = "current_database()"
else:
cluster_source = f"'{self._cluster}'"

return """
SELECT
{cluster_source} as td_cluster,
c.DatabaseName as schema,
c.TableName as name,
c.CommentString as description,
d.ColumnName as col_name,
d.ColumnType as col_type,
d.CommentString as col_description,
d.ColumnId as col_sort_order
FROM dbc.Tables c, dbc.Columns d
WHERE c.DatabaseName = d.DatabaseName AND c.TableName = d.TableName
AND {where_clause_suffix}
ORDER by cluster_a, schema, name, col_sort_order;
""".format(
cluster_source=cluster_source,
where_clause_suffix=where_clause_suffix,
)

def get_scope(self) -> str:
return "extractor.teradata_metadata"
188 changes: 188 additions & 0 deletions databuilder/example/scripts/sample_teradata_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""

import logging
import sys
import textwrap
import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.extractor.teradata_metadata_extractor import TeradataMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer

es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]

es = Elasticsearch(
[
{"host": es_host if es_host else "localhost"},
]
)

DB_FILE = "/tmp/test.db"
SQLITE_CONN_STRING = "sqlite:////tmp/test.db"
Base = declarative_base()

NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = "neo4j"
neo4j_password = "test"

LOGGER = logging.getLogger(__name__)


# todo: connection string needs to change
def connection_string():
user = "username"
pwd = "password"
host = "localhost"
port = "1025"
return "teradatasql://%s:%s@%s:%s" % (user, pwd, host, port)


def run_teradata_job():
where_clause_suffix = textwrap.dedent(
"""
c.DatabaseName = 'Default'
"""
)

tmp_folder = "/var/tmp/amundsen/table_metadata"
node_files_folder = f"{tmp_folder}/nodes/"
relationship_files_folder = f"{tmp_folder}/relationships/"

job_config = ConfigFactory.from_dict(
{
f"extractor.teradata_metadata.{TeradataMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}": where_clause_suffix,
f"extractor.teradata_metadata.{TeradataMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}": True,
f"extractor.teradata_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}": connection_string(),
f"loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}": node_files_folder,
f"loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}": relationship_files_folder,
f"loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}": True,
f"publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}": node_files_folder,
f"publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}": relationship_files_folder,
f"publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}": neo4j_endpoint,
f"publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}": neo4j_user,
f"publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}": neo4j_password,
f"publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}": "unique_tag",
}
)
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=TeradataMetadataExtractor(), loader=FsNeo4jCSVLoader()
),
publisher=Neo4jCsvPublisher(),
)
return job


def create_es_publisher_sample_job(
elasticsearch_index_alias="table_search_index",
elasticsearch_doc_type_key="table",
model_name="databuilder.models.table_elasticsearch_document.TableESDocument",
cypher_query=None,
elasticsearch_mapping=None,
):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = "/var/tmp/amundsen/search_data.json"

task = DefaultTask(
loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer(),
)

# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = "tables" + str(uuid.uuid4())

job_config = ConfigFactory.from_dict(
{
"extractor.search_data.entity_type": "table",
f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}": neo4j_endpoint,
f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}": model_name,
f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}": neo4j_user,
f"extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}": neo4j_password,
f"loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}":
extracted_search_data_path,
f"loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}": "w",
f"publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}": extracted_search_data_path,
f"publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}": "r",
f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}":
elasticsearch_client,
f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}":
elasticsearch_new_index_key,
f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}":
elasticsearch_doc_type_key,
f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}":
elasticsearch_index_alias,
}
)

# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put(
f"extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}",
cypher_query,
)
if elasticsearch_mapping:
job_config.put(
f"publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}",
elasticsearch_mapping,
)

job = DefaultJob(conf=job_config, task=task, publisher=ElasticsearchPublisher())
return job


if __name__ == "__main__":
# Uncomment next line to get INFO level logging
# logging.basicConfig(level=logging.INFO)

loading_job = run_teradata_job()
loading_job.launch()

job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias="table_search_index",
elasticsearch_doc_type_key="table",
model_name="databuilder.models.table_elasticsearch_document.TableESDocument",
)
job_es_table.launch()
7 changes: 6 additions & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@
'simple-salesforce>=1.11.2'
]

teradata = [
'teradatasqlalchemy==17.0.0.0'
]

all_deps = requirements + requirements_dev + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds + atlas + salesforce + oracle
bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds + atlas + salesforce + oracle + teradata

setup(
name='amundsen-databuilder',
Expand Down Expand Up @@ -122,6 +126,7 @@
'rds': rds,
'salesforce': salesforce,
'oracle': oracle,
'teradata': teradata,
},
classifiers=[
'Programming Language :: Python :: 3.7',
Expand Down
Loading

0 comments on commit 7f269e3

Please sign in to comment.