Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
fix: Add postgres compatibility in HiveTableLastUpdatedExtractor (amu…
Browse files Browse the repository at this point in the history
…ndsen-io#1879)

* Add postgres compatibility in HiveTableLastUpdatedExtractor

Signed-off-by: Tony Chou <[email protected]>

* Fix ci UT: check the beginning of connection string to determine whether it's postgres or not

Signed-off-by: Tony Chou <[email protected]>
  • Loading branch information
chonyy authored Aug 2, 2022
1 parent ba5f2a4 commit ecfbf37
Showing 1 changed file with 54 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class HiveTableLastUpdatedExtractor(Extractor):
timestamp.
"""
PARTITION_TABLE_SQL_STATEMENT = """
DEFAULT_PARTITION_TABLE_SQL_STATEMENT = """
SELECT
DBS.NAME as `schema`,
TBL_NAME as table_name,
Expand All @@ -71,7 +71,20 @@ class HiveTableLastUpdatedExtractor(Extractor):
ORDER BY `schema`, table_name;
"""

NON_PARTITIONED_TABLE_SQL_STATEMENT = """
DEFAULT_POSTGRES_PARTITION_TABLE_SQL_STATEMENT = """
SELECT
d."NAME" as "schema",
t."TBL_NAME" as table_name,
MAX(p."CREATE_TIME") as last_updated_time
FROM "TBLS" t
JOIN "DBS" d ON t."DB_ID" = d."DB_ID"
JOIN "PARTITIONS" p ON t."TBL_ID" = p."TBL_ID"
{where_clause_suffix}
GROUP BY "schema", table_name
ORDER BY "schema", table_name;
"""

DEFAULT_NON_PARTITIONED_TABLE_SQL_STATEMENT = """
SELECT
DBS.NAME as `schema`,
TBL_NAME as table_name,
Expand All @@ -83,11 +96,27 @@ class HiveTableLastUpdatedExtractor(Extractor):
ORDER BY `schema`, table_name;
"""

DEFAULT_POSTGRES_NON_PARTITIONED_TABLE_SQL_STATEMENT = """
SELECT
d."NAME" as "schema",
t."TBL_NAME" as table_name,
s."LOCATION" as location
FROM "TBLS" t
JOIN "DBS" d ON t."DB_ID" = d."DB_ID"
JOIN "SDS" s ON t."SD_ID" = s."SD_ID"
{where_clause_suffix}
ORDER BY "schema", table_name;
"""

# Additional where clause for non partitioned table SQL
ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM PARTITIONS WHERE PARTITIONS.TBL_ID = TBLS.TBL_ID)
DEFAULT_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM PARTITIONS WHERE PARTITIONS.TBL_ID = TBLS.TBL_ID)
AND NOT EXISTS (SELECT * FROM PARTITION_KEYS WHERE PARTITION_KEYS.TBL_ID = TBLS.TBL_ID)
"""

DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE = """ NOT EXISTS (SELECT * FROM "PARTITIONS" p WHERE p."TBL_ID" = t."TBL_ID")
AND NOT EXISTS (SELECT * FROM "PARTITION_KEYS" pk WHERE pk."TBL_ID" = t."TBL_ID")
"""

DATABASE = 'hive'

# CONFIG KEYS
Expand Down Expand Up @@ -132,7 +161,7 @@ def _get_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
:return: SQLAlchemyExtractor
"""

sql_stmt = self.PARTITION_TABLE_SQL_STATEMENT.format(
sql_stmt = self._choose_default_partitioned_sql_stm().format(
where_clause_suffix=self._conf.get_string(
self.PARTITIONED_TABLE_WHERE_CLAUSE_SUFFIX_KEY, ' '))

Expand All @@ -144,24 +173,34 @@ def _get_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
sql_alchemy_extractor.init(sql_alchemy_conf)
return sql_alchemy_extractor

def _choose_default_partitioned_sql_stm(self) -> str:
conn_string = self._conf.get_string("extractor.sqlalchemy.conn_string")
if conn_string.startswith('postgres') or conn_string.startswith('postgresql'):
return self.DEFAULT_POSTGRES_PARTITION_TABLE_SQL_STATEMENT
else:
return self.DEFAULT_PARTITION_TABLE_SQL_STATEMENT

def _get_non_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
"""
Getting an SQLAlchemy extractor that extracts storage location for non-partitioned table for further probing
last updated timestamp
:return: SQLAlchemyExtractor
"""

default_sql_stmt, default_additional_where_clause = self._choose_default_non_partitioned_sql_stm()

if self.NON_PARTITIONED_TABLE_WHERE_CLAUSE_SUFFIX_KEY in self._conf:
where_clause_suffix = """
{}
AND {}
""".format(self._conf.get_string(
self.NON_PARTITIONED_TABLE_WHERE_CLAUSE_SUFFIX_KEY),
self.ADDTIONAL_WHERE_CLAUSE)
default_additional_where_clause)
else:
where_clause_suffix = 'WHERE {}'.format(self.ADDTIONAL_WHERE_CLAUSE)
where_clause_suffix = 'WHERE {}'.format(default_additional_where_clause)

sql_stmt = self.NON_PARTITIONED_TABLE_SQL_STATEMENT.format(
sql_stmt = default_sql_stmt.format(
where_clause_suffix=where_clause_suffix)

LOGGER.info('SQL for non-partitioned table against Hive metastore: %s', sql_stmt)
Expand All @@ -172,6 +211,14 @@ def _get_non_partitioned_table_sql_alchemy_extractor(self) -> Extractor:
sql_alchemy_extractor.init(sql_alchemy_conf)
return sql_alchemy_extractor

def _choose_default_non_partitioned_sql_stm(self) -> List[str]:
conn_string = self._conf.get_string("extractor.sqlalchemy.conn_string")
if conn_string.startswith('postgres') or conn_string.startswith('postgresql'):
return [self.DEFAULT_POSTGRES_NON_PARTITIONED_TABLE_SQL_STATEMENT,
self.DEFAULT_POSTGRES_ADDTIONAL_WHERE_CLAUSE]
else:
return [self.DEFAULT_NON_PARTITIONED_TABLE_SQL_STATEMENT, self.DEFAULT_ADDTIONAL_WHERE_CLAUSE]

def _get_filesystem(self) -> FileSystem:
fs = FileSystem()
fs.init(Scoped.get_scoped_conf(self._conf, fs.get_scope()))
Expand Down

0 comments on commit ecfbf37

Please sign in to comment.