Skip to content
This repository has been archived by the owner on Feb 23, 2022. It is now read-only.

Commit

Permalink
Merge branch 'main' into test
Browse files Browse the repository at this point in the history
  • Loading branch information
fpereiro committed Jun 7, 2021
2 parents 16dcdd7 + a015544 commit 23593b5
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 36 deletions.
13 changes: 4 additions & 9 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import collections
import hedy
import json
import jsonbin
import logging
import os
import csv
Expand All @@ -21,7 +20,7 @@
from flask_commonmark import Commonmark
from werkzeug.urls import url_encode
from config import config
from auth import auth_templates, current_user, requires_login, is_admin, is_teacher
from website.auth import auth_templates, current_user, requires_login, is_admin, is_teacher
from utils import db_get, db_get_many, db_create, db_update, timems, type_check, object_check, db_del, load_yaml, load_yaml_rt, dump_yaml_rt, version
import utils

Expand All @@ -33,11 +32,7 @@
# Hedy-specific modules
import courses
import hedyweb
import translating
import querylog
import aws_helpers
import ab_proxying
import cdn
from website import querylog, aws_helpers, jsonbin, translating, ab_proxying, cdn

# Set the current directory to the root Hedy folder
os.chdir(os.path.join (os.getcwd (), __file__.replace (os.path.basename (__file__), '')))
Expand Down Expand Up @@ -715,7 +710,7 @@ def gradual_error():
def internal_error(exception):
import traceback
print(traceback.format_exc())
return "<h1>500 Internal Server Error</h1>"
return "<h1>500 Internal Server Error</h1>", 500

@app.route('/index.html')
@app.route('/')
Expand Down Expand Up @@ -992,7 +987,7 @@ def update_yaml():

# *** AUTH ***

import auth
from website import auth
auth.routes (app, requested_lang)

# *** START SERVER ***
Expand Down
2 changes: 1 addition & 1 deletion flask_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import flask
import querylog
from website import querylog

@querylog.timed
def render_template(filename, **kwargs):
Expand Down
8 changes: 8 additions & 0 deletions gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This file is used to configure gunicorn,
# used on Heroku.

def worker_exit(server, worker):
# When the worker is being exited (perhaps because of a timeout),
# give the query_log handler a chance to flush to disk.
from website import querylog
querylog.emergency_shutdown()
2 changes: 1 addition & 1 deletion hedyweb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from flask_helpers import render_template

import courses
from auth import current_user
from website.auth import current_user
from utils import type_check
import re
import utils
Expand Down
2 changes: 1 addition & 1 deletion tests/test_abproxying.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ab_proxying
from website import ab_proxying
import unittest

class TestAbTesting(unittest.TestCase):
Expand Down
38 changes: 38 additions & 0 deletions tests/test_querylog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from website import querylog
import unittest

class TestQueryLog(unittest.TestCase):
def setUp(self):
self.records = []
querylog.LOG_QUEUE.set_transmitter(self._fake_transmitter)

def _fake_transmitter(self, ts, records):
self.records.extend(records)

def test_regular_xmit(self):
with querylog.LogRecord(banaan='geel') as record:
record.set(bloem='rood')

querylog.LOG_QUEUE.transmit_now()

self.assertEqual(len(self.records), 1)
self.assertEqual(self.records[0]['banaan'], 'geel')
self.assertEqual(self.records[0]['bloem'], 'rood')

def test_emergency_recovery(self):
querylog.begin_global_log_record(banaan='geel')
querylog.log_value(bloem='rood')

querylog.emergency_shutdown()

recovered_queue = querylog.LogQueue(batch_window_s=300)
recovered_queue.try_load_emergency_saves()
recovered_queue.set_transmitter(self._fake_transmitter)

self.assertEqual(self.records, [])

recovered_queue.transmit_now()

self.assertEqual(self.records[0]['banaan'], 'geel')
self.assertEqual(self.records[0]['bloem'], 'rood')
self.assertEqual(self.records[0]['terminated'], True)
2 changes: 1 addition & 1 deletion tests/test_translating.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
import utils
import translating
from website import translating

class TestTranslating(unittest.TestCase):
def test_change_nested_records(self):
Expand Down
2 changes: 1 addition & 1 deletion utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import re
from ruamel import yaml
import querylog
from website import querylog


class Timer:
Expand Down
Empty file added website/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion ab_proxying.py → website/ab_proxying.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import flask.sessions
import itsdangerous

from auth import current_user
from .auth import current_user
import utils

class ABProxying:
Expand Down
2 changes: 1 addition & 1 deletion auth.py → website/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from botocore.exceptions import ClientError as email_error
import json
import requests
import querylog
from website import querylog

cookie_name = config ['session'] ['cookie_name']
session_length = config ['session'] ['session_length'] * 60
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
131 changes: 111 additions & 20 deletions querylog.py → website/querylog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import threading
import logging
import os
import glob
import json
import datetime
import resource
import traceback
Expand All @@ -17,8 +19,11 @@ def __init__(self, **kwargs):
self.start_time = time.time()
self.start_rusage = resource.getrusage(resource.RUSAGE_SELF)
self.attributes = kwargs
self.running_timers = set([])
self.set(
start_time=dtfmt(self.start_time),
pid=os.getpid(),
loadavg=os.getloadavg()[0],
fault=0)

dyno = os.getenv('DYNO')
Expand All @@ -34,12 +39,17 @@ def finish(self):
sys_ms=ms_from_fsec(end_rusage.ru_stime - self.start_rusage.ru_stime),
duration_ms=ms_from_fsec(end_time - self.start_time))

# There should be 0, but who knows
self._terminate_running_timers()

LOG_QUEUE.add(self)

def set(self, **kwargs):
"""Set keys based on keyword arguments."""
self.attributes.update(kwargs)

def update(self, dict):
"""Set keys based on a dictionary."""
self.attributes.update(dict)

def timer(self, name):
Expand All @@ -61,6 +71,17 @@ def record_exception(self, exc):
def as_data(self):
return self.attributes

def _remember_timer(self, timer):
self.running_timers.add(timer)

def _forget_timer(self, timer):
if timer in self.running_timers:
self.running_timers.remove(timer)

def _terminate_running_timers(self):
for timer in list(self.running_timers):
timer.finish()

def __enter__(self):
return self

Expand Down Expand Up @@ -121,6 +142,13 @@ def wrapped(*args, **kwargs):
return wrapped


def emergency_shutdown():
"""The process is being killed. Do whatever needs to be done to save the logs."""
THREAD_LOCAL.current_log_record.set(terminated=True)
THREAD_LOCAL.current_log_record.finish()
LOG_QUEUE.emergency_save_to_disk()


def dtfmt(timestamp):
dt = datetime.datetime.utcfromtimestamp(timestamp)
return dt.isoformat() + 'Z'
Expand All @@ -131,13 +159,22 @@ class LogTimer:
def __init__(self, record, name):
self.record = record
self.name = name
self.running = False

def finish(self):
if self.running:
delta = ms_from_fsec(time.time() - self.start)
self.record.inc_timer(self.name, delta)
self.record._forget_timer(self)
self.running = False

def __enter__(self):
self.record._remember_timer(self)
self.start = time.time()
self.running = True

def __exit__(self, type, value, tb):
delta = ms_from_fsec(time.time() - self.start)
self.record.inc_timer(self.name, delta)
self.finish()


class LogQueue:
Expand Down Expand Up @@ -189,6 +226,76 @@ def transmitter(timestamp, records) -> Boolean:
"""
self.transmitter = transmitter

def emergency_save_to_disk(self):
"""Save all untransmitted records to disk.
They will be picked up and attempted to be transmitted by a future
(restarted) process.
"""
all_records = []
with self.mutex:
for records in self.records_queue.values():
all_records.extend(records)
self.records_queue.clear()

if not all_records:
return

filename = f'querylog_dump.{os.getpid()}.{time.time()}.jsonl'
with open(filename, 'w') as f:
json.dump(all_records, f)

def try_load_emergency_saves(self):
"""Try to load emergency saves from disk, if found.
There may be multiple LogQueues trying to load the same files at the
same time, so we need to be careful that only one of them actually
loads a single file (in order to avoid record duplication).
We use the atomicity of renaming the file as a way of claiming ownership of it.
"""
candidates = glob.glob('querylog_dump.*.jsonl')
for candidate in candidates:
try:
claim_name = candidate + '.claimed'
os.rename(candidate, claim_name)

# If this succeeded, we're guaranteed to be able to read this file (and because
# we renamed it to something not matching the glob pattern, no one else is going to
# try to pick it up later)
with open(claim_name, 'r') as f:
all_records = json.load(f)

bucket = div_clip(time.time(), self.batch_window_s)
with self.mutex:
self.records_queue[bucket].extend(all_records)
os.unlink(claim_name)
except OSError:
pass

def transmit_now(self, max_time=None):
"""(Try to) transmit all pending records with recording timestamps smaller than the given time now."""
with self.mutex:
keys = list(self.records_queue.keys())
keys.sort()

max_time = max_time or time.time()
buckets_to_send = [k for k in keys if k < max_time]

for bucket_ts in buckets_to_send:
# Get the records out of the queue
with self.mutex:
bucket_records = self.records_queue[bucket_ts]

# Try to send the records (to signal failure, this can either
# throw or return False, depending on how loud it wants to be).
success = self._save_records(bucket_ts, bucket_records)

# Only remove them from the queue if sending didn't fail
if success != False:
with self.mutex:
del self.records_queue[bucket_ts]

def _save_records(self, timestamp, records):
if self.transmitter:
return self.transmitter(timestamp, records)
Expand All @@ -206,23 +313,7 @@ def _write_thread(self):

# Once woken, see what buckets we have left to push (all buckets
# with numbers lower than next_wake)
with self.mutex:
keys = list(self.records_queue.keys())
keys.sort()
buckets_to_send = [k for k in keys if k < next_wake]

for bucket_ts in buckets_to_send:
# Get the records out of the queue
with self.mutex:
bucket_records = self.records_queue[bucket_ts]

# Try to send the records
success = self._save_records(bucket_ts, bucket_records)

# Only remove them from the queue if sending didn't fail
if success != False:
with self.mutex:
del self.records_queue[bucket_ts]
self.transmit_now(next_wake)
except Exception as e:
traceback.print_exc(e)
next_wake += self.batch_window_s
Expand All @@ -238,5 +329,5 @@ def ms_from_fsec(x):
return int(x * 1000)



LOG_QUEUE = LogQueue(batch_window_s=300)
LOG_QUEUE.try_load_emergency_saves()
File renamed without changes.

0 comments on commit 23593b5

Please sign in to comment.