-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbigquery_eventtracking_regular_report.py
59 lines (47 loc) · 2.15 KB
/
bigquery_eventtracking_regular_report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from bigquery_eventtracking_regular_report_module import api
from bigquery_eventtracking_regular_report_module import storage
import logging
import time
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
def get_bigquery_eventtracking():
# for loading past data
for n in range(0,2):
# convert time zone from utc+0 to utc+8
eventTime = (datetime.now() + timedelta(days=-n)).strftime('%Y-%m-%d')
startTime = (datetime.now() + timedelta(days=-n-1)).strftime('%Y-%m-%d') + 'T16:00:00'
endTime = (datetime.now() + timedelta(days=-n)).strftime('%Y-%m-%d') + 'T16:00:00'
logging.info('loading data by date:' + eventTime)
# download data (Extrat+Transform)
transaction_rate = api.get_daily_transaction_rate(startTime, endTime, eventTime)
transaction_error = api.get_daily_transaction_error(startTime, endTime, eventTime)
sport_transaction_result = api.get_daily_sport_transaction(startTime, endTime, eventTime)
sport_transaction_error = api.get_daily_sport_error(startTime, endTime, eventTime)
# storage data (Load)
storage.save_transaction_rate(transaction_rate, eventTime)
storage.save_transaction_error(transaction_error, eventTime)
storage.save_daily_sport_transaction(sport_transaction_result, eventTime)
storage.save_daily_sport_error(sport_transaction_error, eventTime)
time.sleep(0.5)
default_args = {
'owner': 'angel',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'bigquery_eventtracking_regular_report',
default_args=default_args,
description='download eventtracking from bigquery server to make a regular report',
schedule_interval=timedelta(minutes=10),
start_date=datetime(2022, 7, 1),
catchup=False,
tags=['report'],
) as dag:
p1 = PythonOperator(task_id='bigquery_eventtracking_report',
python_callable=get_bigquery_eventtracking)
p1