Skip to content

Commit

Permalink
run create queries asynchronously, use start timestamp as last_created
Browse files Browse the repository at this point in the history
  • Loading branch information
razumau committed Sep 20, 2019
1 parent 4c0d8b9 commit 3d8fd1f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 23 deletions.
36 changes: 28 additions & 8 deletions duro/create/create_table.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio
from async_timeout import timeout

from create.data_tests import load_tests, run_tests
from create.process import process_and_upload_data
from create.redshift import (
Expand All @@ -8,17 +11,36 @@
create_connection,
make_snapshot,
)
from create.sqlite import update_last_created, log_timestamps, log_start
from create.sqlite import (
update_last_created,
log_timestamps,
log_start,
get_average_completion_time,
)
from create.timestamps import Timestamps
from utils.errors import TestsFailedError
from utils.errors import TestsFailedError, QueryTimeoutError
from utils.file_utils import load_processor
from utils.logger import setup_logger
from utils.table import Table


def run_create_table(table: Table, db_path: str, views_path: str):
asyncio.run(run_with_timeout(table, db_path, views_path))


async def run_with_timeout(table: Table, db_path: str, views_path: str):
timeout_length = 5 * get_average_completion_time(db_path, table.name)
print(f"timeout is {timeout_length}")
try:
async with timeout(timeout_length):
await create_table(table, db_path, views_path)
except asyncio.TimeoutError:
raise QueryTimeoutError(table, timeout_length)


# pylint: disable=no-member
# noinspection PyUnresolvedReferences
def create_table(table: Table, db_path: str, views_path: str):
async def create_table(table: Table, db_path: str, views_path: str):
logger = setup_logger(table.name)
ts = Timestamps()
ts.log("start")
Expand All @@ -31,11 +53,9 @@ def create_table(table: Table, db_path: str, views_path: str):

processor = load_processor(views_path, table.name)
if processor:
creation_timestamp = process_and_upload_data(
table, processor, connection, ts, views_path
)
process_and_upload_data(table, processor, connection, ts, views_path)
else:
creation_timestamp = create_temp_table(table, connection)
create_temp_table(table, connection)
ts.log("create_temp")

tests = load_tests(table.name, views_path)
Expand All @@ -59,5 +79,5 @@ def create_table(table: Table, db_path: str, views_path: str):

connection.close()

update_last_created(db_path, table.name, creation_timestamp, ts.duration)
update_last_created(db_path, table.name, ts.start, ts.duration)
log_timestamps(db_path, table.name, ts)
11 changes: 3 additions & 8 deletions duro/create/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

def process_and_upload_data(
table: Table, processor_name: str, connection, ts: Timestamps, views_path: str
) -> int:
):
data = select_data(table.query, connection)
ts.log("select")

Expand All @@ -40,13 +40,9 @@ def process_and_upload_data(
os.remove(filename)

drop_and_create_query = build_drop_and_create_query(table, views_path)
timestamp = copy_to_redshift(
filename, table.name, connection, drop_and_create_query
)
copy_to_redshift(filename, table.name, connection, drop_and_create_query)
ts.log("insert")

return timestamp


@log_action("select data for processing")
def select_data(query: str, connection) -> List[Dict]:
Expand Down Expand Up @@ -114,7 +110,7 @@ def build_drop_and_create_query(table: Table, views_path: str):
@log_action("insert processed data into Redshift table")
def copy_to_redshift(
filename: str, table_name: str, connection, drop_and_create_query: str
) -> int:
):
try:
with connection.cursor() as cursor:
cursor.execute(drop_and_create_query)
Expand All @@ -132,6 +128,5 @@ def copy_to_redshift(
"""
)
connection.commit()
return arrow.now().timestamp
except psycopg2.Error:
raise RedshiftCopyError(table_name)
2 changes: 1 addition & 1 deletion duro/create/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def get_tables_to_create(db_str: str) -> List[Tuple]:
WHERE
(
force = 1
OR (strftime('%s', 'now') - started) / 60 - interval > 0
OR (strftime('%s', 'now') - last_created) / 60 - interval > 0
OR last_created IS NULL
)
AND deleted IS NULL
Expand Down
4 changes: 2 additions & 2 deletions duro/create/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
mark_table_as_not_waiting,
mark_table_as_waiting,
)
from create.create_table import create_table
from create.create_table import run_create_table
from utils.errors import (
MaterializationError,
TableNotFoundInGraphError,
Expand Down Expand Up @@ -42,7 +42,7 @@ def create_tree(root: str, global_config: GlobalConfig, interval: int = None):

try:
logger.info(f"Creating {table.name}")
create_table(table, db, global_config.views_path)
run_create_table(table, db, global_config.views_path)
except RedshiftConnectionError as e:
logger.error(e)
reset_start(db, table.name)
Expand Down
10 changes: 6 additions & 4 deletions duro/scheduler/commits.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ def get_previous_commit(db: str) -> str:
cursor = connection.cursor()

cursor.execute(
"""CREATE TABLE IF NOT EXISTS commits
(hash text, processed integer)"""
"""
CREATE TABLE IF NOT EXISTS commits
(hash text, processed integer)"""
)
connection.commit()

cursor.execute(
"""SELECT hash FROM commits
ORDER BY processed DESC LIMIT 1"""
"""
SELECT hash FROM commits
ORDER BY processed DESC LIMIT 1"""
)
result = cursor.fetchone()

Expand Down
7 changes: 7 additions & 0 deletions duro/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,10 @@ class HistoryTableCreationError(CreationError):
def __init__(self, table, details=None):
message = f"""*Creation query failed for snapshots table for `{table}`* ```{details}```"""
super().__init__(table, message)


class QueryTimeoutError(CreationError):
"""Took longer than expected (average creation time × multiplier)"""

def __init__(self, table, timeout: float):
super().__init__(table, f"Took longer than {int(timeout)} seconds, resetting")
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ referer-parser==0.4.1
logzero==1.2.1
beautifulsoup4==4.5.0
boto3==1.9
async-timeout==3.0.1

0 comments on commit 3d8fd1f

Please sign in to comment.