Skip to content

Commit

Permalink
Adding year month partitions to Trino temp table SQL (project-koku#4306)
Browse files Browse the repository at this point in the history
* Adding year month partitions to Trino temp table SQL for all ocp on cloud providers
  • Loading branch information
lcouzens authored Apr 14, 2023
1 parent 8a9defb commit 3dc9c29
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 133 deletions.
7 changes: 7 additions & 0 deletions koku/masu/database/aws_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ def populate_ocp_on_aws_cost_daily_summary_trino(
days = self.date_helper.list_days(start_date, end_date)
days_tup = tuple(str(day.day) for day in days)
self.delete_ocp_on_aws_hive_partition_by_day(days_tup, aws_provider_uuid, openshift_provider_uuid, year, month)
tables = [
"reporting_ocpawscostlineitem_project_daily_summary_temp",
"aws_openshift_daily_resource_matched_temp",
"aws_openshift_daily_tag_matched_temp",
]
for table in tables:
self.delete_hive_partition_by_month(table, openshift_provider_uuid, year, month)

pod_column = "pod_effective_usage_cpu_core_hours"
node_column = "node_capacity_cpu_core_hours"
Expand Down
2 changes: 2 additions & 0 deletions koku/masu/database/azure_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ def populate_ocp_on_azure_cost_daily_summary_trino(
"""Populate the daily cost aggregated summary for OCP on Azure."""
year = start_date.strftime("%Y")
month = start_date.strftime("%m")
table = "reporting_ocpazurecostlineitem_project_daily_summary_temp"
self.delete_hive_partition_by_month(table, openshift_provider_uuid, year, month)
days = self.date_helper.list_days(start_date, end_date)
days_tup = tuple(str(day.day) for day in days)
self.delete_ocp_on_azure_hive_partition_by_day(
Expand Down
16 changes: 16 additions & 0 deletions koku/masu/database/gcp_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,14 @@ def populate_ocp_on_gcp_cost_daily_summary_trino_by_node(
"""
year = start_date.strftime("%Y")
month = start_date.strftime("%m")
tables = [
"reporting_ocpgcpcostlineitem_project_daily_summary_temp",
"gcp_openshift_daily_resource_matched_temp",
"gcp_openshift_daily_tag_matched_temp",
]
for table in tables:
self.delete_hive_partition_by_month(table, openshift_provider_uuid, year, month)

days = self.date_helper.list_days(start_date, end_date)
days_tup = tuple(str(day.day) for day in days)
self.delete_ocp_on_gcp_hive_partition_by_day(days_tup, gcp_provider_uuid, openshift_provider_uuid, year, month)
Expand Down Expand Up @@ -449,6 +457,14 @@ def populate_ocp_on_gcp_cost_daily_summary_trino(

year = start_date.strftime("%Y")
month = start_date.strftime("%m")
tables = [
"reporting_ocpgcpcostlineitem_project_daily_summary_temp",
"gcp_openshift_daily_resource_matched_temp",
"gcp_openshift_daily_tag_matched_temp",
]
for table in tables:
self.delete_hive_partition_by_month(table, openshift_provider_uuid, year, month)

days = self.date_helper.list_days(start_date, end_date)
days_tup = tuple(str(day.day) for day in days)
self.delete_ocp_on_gcp_hive_partition_by_day(days_tup, gcp_provider_uuid, openshift_provider_uuid, year, month)
Expand Down
35 changes: 35 additions & 0 deletions koku/masu/database/report_db_accessor_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import ciso8601
import django.apps
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.db import connection
from django.db import OperationalError
from django.db import transaction
from jinjasql import JinjaSql
from tenant_schemas.utils import schema_context
from trino.exceptions import TrinoExternalError

import koku.trino_database as trino_db
from api.common import log_json
Expand Down Expand Up @@ -323,3 +325,36 @@ def execute_delete_sql(self, query):
query (QuerySet) : A valid django queryset
"""
return exec_del_sql(query)

def delete_hive_partition_by_month(self, table, source, year, month):
"""Deletes partitions individually by month."""
retries = settings.HIVE_PARTITION_DELETE_RETRIES
if self.schema_exists_trino() and self.table_exists_trino(table):
LOG.info(
"Deleting Hive partitions for the following: \n\tSchema: %s "
"\n\tOCP Source: %s \n\tTable: %s \n\tYear: %s \n\tMonths: %s",
self.schema,
source,
table,
year,
month,
)
for i in range(retries):
try:
sql = f"""
DELETE FROM hive.{self.schema}.{table}
WHERE ocp_source = '{source}'
AND year = '{year}'
AND (month = replace(ltrim(replace('{month}', '0', ' ')),' ', '0') OR month = '{month}')
"""
self._execute_trino_raw_sql_query(
sql,
log_ref=f"delete_hive_partition_by_month for {year}-{month}",
attempts_left=(retries - 1) - i,
)
break
except TrinoExternalError as err:
if err.error_name == "HIVE_METASTORE_ERROR" and i < (retries - 1):
continue
else:
raise err
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineite
data_source_rank integer,
cost_category_id int,
ocp_matched boolean,
ocp_source varchar
) WITH(format = 'PARQUET', partitioned_by=ARRAY['ocp_source'])
ocp_source varchar,
year varchar,
month varchar
) WITH(format = 'PARQUET', partitioned_by=ARRAY['ocp_source', 'year', 'month'])
;

-- Now create our proper table if it does not exist
Expand Down Expand Up @@ -111,9 +113,6 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineite
) WITH(format = 'PARQUET', partitioned_by=ARRAY['gcp_source', 'ocp_source', 'year', 'month', 'day'])
;

DELETE FROM hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_temp
;

-- OCP ON GCP kubernetes-io-cluster-{cluster_id} label is applied on the VM and is exclusively a pod cost
INSERT INTO hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_temp (
gcp_uuid,
Expand Down Expand Up @@ -160,7 +159,9 @@ INSERT INTO hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily
volume_labels,
tags,
cost_category_id,
ocp_source
ocp_source,
year,
month
)
SELECT gcp.uuid as gcp_uuid,
max(ocp.cluster_id) as cluster_id,
Expand Down Expand Up @@ -206,7 +207,9 @@ SELECT gcp.uuid as gcp_uuid,
NULL as volume_labels,
max(json_format(json_parse(gcp.labels))) as tags,
max(ocp.cost_category_id) as cost_category_id,
{{ocp_source_uuid}} as ocp_source
{{ocp_source_uuid}} as ocp_source,
max(gcp.year) as year,
max(gcp.month) as month
FROM hive.{{schema | sqlsafe}}.gcp_openshift_daily as gcp
JOIN hive.{{ schema | sqlsafe}}.reporting_ocpusagelineitem_daily_summary as ocp
ON date(gcp.usage_start_time) = ocp.usage_start
Expand Down Expand Up @@ -274,7 +277,9 @@ INSERT INTO hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily
volume_labels,
tags,
cost_category_id,
ocp_source
ocp_source,
year,
month
)
SELECT gcp.uuid as gcp_uuid,
max(ocp.cluster_id) as cluster_id,
Expand Down Expand Up @@ -320,7 +325,9 @@ SELECT gcp.uuid as gcp_uuid,
max(ocp.volume_labels) as volume_labels,
max(json_format(json_parse(gcp.labels))) as tags,
max(ocp.cost_category_id) as cost_category_id,
{{ocp_source_uuid}} as ocp_source
{{ocp_source_uuid}} as ocp_source,
max(gcp.year) as year,
max(gcp.month) as month
FROM hive.{{schema | sqlsafe}}.gcp_openshift_daily as gcp
JOIN hive.{{ schema | sqlsafe}}.reporting_ocpusagelineitem_daily_summary as ocp
ON date(gcp.usage_start_time) = ocp.usage_start
Expand Down Expand Up @@ -576,6 +583,3 @@ WHERE gcp_source = {{gcp_source_uuid}}
AND lpad(month, 2, '0') = {{month}} -- Zero pad the month when fewer than 2 characters
AND day IN {{days | inclause}}
;

DELETE FROM hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_temp
;
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineite
project_rank integer,
data_source_rank integer,
ocp_matched boolean,
ocp_source varchar
) WITH(format = 'PARQUET')
ocp_source varchar,
year varchar,
month varchar
) WITH(format = 'PARQUET', partitioned_by=ARRAY['year', 'month'])
;

-- Now create our proper table if it does not exist
CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary
(
gcp_uuid varchar,
gcp_uuid varchar,
cluster_id varchar,
cluster_alias varchar,
data_source varchar,
Expand Down Expand Up @@ -105,10 +107,6 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineite
) WITH(format = 'PARQUET', partitioned_by=ARRAY['gcp_source', 'ocp_source', 'year', 'month', 'day'])
;

DELETE FROM hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_temp
WHERE ocp_source = {{ocp_source_uuid}}
;

-- OCP ON GCP kubernetes-io-cluster-{cluster_id} label is applied on the VM and is exclusively a pod cost
INSERT INTO hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_temp (
gcp_uuid,
Expand Down Expand Up @@ -153,7 +151,9 @@ INSERT INTO hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily
cluster_capacity_memory_gigabyte_hours,
volume_labels,
tags,
cost_category_id
cost_category_id,
year,
month
)
SELECT gcp.uuid as gcp_uuid,
max(ocp.cluster_id) as cluster_id,
Expand Down Expand Up @@ -197,7 +197,9 @@ SELECT gcp.uuid as gcp_uuid,
max(ocp.cluster_capacity_memory_gigabyte_hours) as cluster_capacity_memory_gigabyte_hours,
NULL as volume_labels,
max(json_format(json_parse(gcp.labels))) as tags,
max(ocp.cost_category_id) as cost_category_id
max(ocp.cost_category_id) as cost_category_id,
max(gcp.year) as year,
max(gcp.month) as month
FROM hive.{{schema | sqlsafe}}.gcp_openshift_daily as gcp
JOIN hive.{{ schema | sqlsafe}}.reporting_ocpusagelineitem_daily_summary as ocp
ON date(gcp.usage_start_time) = ocp.usage_start
Expand Down Expand Up @@ -266,7 +268,9 @@ INSERT INTO hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily
cluster_capacity_memory_gigabyte_hours,
volume_labels,
tags,
cost_category_id
cost_category_id,
year,
month
)
SELECT gcp.uuid as gcp_uuid,
max(ocp.cluster_id) as cluster_id,
Expand Down Expand Up @@ -310,7 +314,9 @@ SELECT gcp.uuid as gcp_uuid,
max(ocp.cluster_capacity_memory_gigabyte_hours) as cluster_capacity_memory_gigabyte_hours,
max(ocp.volume_labels) as volume_labels,
max(json_format(json_parse(gcp.labels))) as tags,
max(ocp.cost_category_id) as cost_category_id
max(ocp.cost_category_id) as cost_category_id,
max(gcp.year) as year,
max(gcp.month) as month
FROM hive.{{schema | sqlsafe}}.gcp_openshift_daily as gcp
JOIN hive.{{ schema | sqlsafe}}.reporting_ocpusagelineitem_daily_summary as ocp
ON date(gcp.usage_start_time) = ocp.usage_start
Expand Down Expand Up @@ -477,7 +483,7 @@ SELECT pds.gcp_uuid,
FROM hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_temp as pds
JOIN cte_rankings as r
ON pds.gcp_uuid = r.gcp_uuid
WHERE pds.ocp_source = {{ocp_source_uuid}}
WHERE pds.ocp_source = {{ocp_source_uuid}} AND pds.year = {{year}} AND pds.month = {{month}}
;

INSERT INTO postgres.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_p (
Expand Down Expand Up @@ -561,7 +567,3 @@ WHERE gcp_source = {{gcp_source_uuid}}
AND lpad(month, 2, '0') = {{month}} -- Zero pad the month when fewer than 2 characters
AND day IN {{days | inclause}}
;

DELETE FROM hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineitem_project_daily_summary_temp
WHERE ocp_source = {{ocp_source_uuid}}
;
Loading

0 comments on commit 3dc9c29

Please sign in to comment.