diff --git a/.gitignore b/.gitignore index 853bad70e1a..0fe92a9e826 100644 --- a/.gitignore +++ b/.gitignore @@ -146,3 +146,6 @@ logs.txt *.swo .local node_modules + +# Ignore things that start with underscores +_* diff --git a/app.py b/app.py index 2c650754ef9..42434b2950d 100644 --- a/app.py +++ b/app.py @@ -187,8 +187,10 @@ def before_request_https(): Compress(app) Commonmark(app) -logger = jsonbin.JsonBinLogger.from_env_vars() -querylog.LOG_QUEUE.set_transmitter(aws_helpers.s3_transmitter_from_env()) +parse_logger = jsonbin.MultiParseLogger( + jsonbin.JsonBinLogger.from_env_vars(), + jsonbin.S3ParseLogger.from_env_vars()) +querylog.LOG_QUEUE.set_transmitter(aws_helpers.s3_querylog_transmitter_from_env()) # Check that requested language is supported, otherwise return 404 @app.before_request @@ -309,7 +311,7 @@ def parse(): session ['code'] = code querylog.log_value(server_error=response.get('Error')) - logger.log ({ + parse_logger.log ({ 'session': session_id(), 'date': str(datetime.datetime.now()), 'level': level, @@ -472,7 +474,7 @@ def initialize_gfi_session(language): def report_error(): post_body = request.json - logger.log ({ + parse_logger.log ({ 'session': session_id(), 'date': str(datetime.datetime.now()), 'level': post_body.get('level'), diff --git a/config.py b/config.py index 9205d480fa5..6e4c827974e 100644 --- a/config.py +++ b/config.py @@ -27,4 +27,11 @@ 'postfix': ('-' + dyno if dyno else '') + '-' + str(os.getpid()), 'region': 'eu-west-1' }, + 's3-parse-logs': { + 'bucket': 'hedy-parse-logs', + 'prefix': app_name + '/', + # Make logs from different instances/processes unique + 'postfix': ('-' + dyno if dyno else '') + '-' + str(os.getpid()), + 'region': 'eu-west-1' + }, } diff --git a/coursedata/texts/de.yaml b/coursedata/texts/de.yaml index ba13022bda5..e306b878c47 100644 --- a/coursedata/texts/de.yaml +++ b/coursedata/texts/de.yaml @@ -127,6 +127,9 @@ Auth: unsaved_changes: "Du hast ein nicht gespeichertes Programm. Möchtest du es verwerfen, ohne zu speichern?" save_success: "Erfolgreich" save_success_detail: "Programm erfolgreich gespeichert" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "Du kannst das Programm erst ausführen, nachdem du die Frage beantwortet hast." Programs: recent: "Zuletzt verwendete Programme" @@ -139,5 +142,10 @@ Programs: open: "Öffnen" delete: "Löschen" delete_confirm: "Bist du sicher, dass du das Programm löschen möchtest?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "Du hast noch kein Programm." write_first: "Schreibe dein erstes Programm!" diff --git a/coursedata/texts/el.yaml b/coursedata/texts/el.yaml index 1a0a4e2b406..ead017fb1cf 100644 --- a/coursedata/texts/el.yaml +++ b/coursedata/texts/el.yaml @@ -125,6 +125,9 @@ Auth: unsaved_changes: 'Έχεις ένα μη αποθηκευμένο πρόγραμμα. Θέλεις να αποχωρήσεις χωρίς να το αποθηκεύσεις;' save_success: 'Επιτυχία' save_success_detail: 'Το πρόγραμμα αποθηκεύτηκε επιτυχώς' + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: 'Δεν μπορείς να εκτελέσεις το πρόγραμμα πριν να απαντήσεις την ερώτηση' login_long: Συνδέσου στο λογαριασμό σου Programs: @@ -138,5 +141,10 @@ Programs: open: 'Άνοιγμα' delete: 'Διαγραφή' delete_confirm: 'Είσαι σίγουρος/η ότι θέλεις να διαγράψεις το πρόγραμμα;' + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: 'Δεν έχεις προγράμματα ακόμα.' write_first: 'Γράψε το πρώτο σου πρόγραμμα!' diff --git a/coursedata/texts/en.yaml b/coursedata/texts/en.yaml index 22a964d54e5..0c83983a8a1 100644 --- a/coursedata/texts/en.yaml +++ b/coursedata/texts/en.yaml @@ -172,6 +172,8 @@ Auth: save_success: "Success" save_success_detail: "Program saved successfully" share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "You can't run the program until you answer the question first" Programs: recent: "My recent programs" @@ -188,5 +190,6 @@ Programs: share_confirm: "Are you sure you want to make the program public?" unshare: "Unshare" unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "You have no programs yet." write_first: "Write your first program!" diff --git a/coursedata/texts/es.yaml b/coursedata/texts/es.yaml index 4e1b2e80bcf..ce350c2ff4d 100644 --- a/coursedata/texts/es.yaml +++ b/coursedata/texts/es.yaml @@ -166,6 +166,9 @@ Auth: unsaved_changes: "Tu programa no se ha salvado. ¿Deseas irte sin salvarlo?" save_success: "Éxito" save_success_detail: "Tu programa se ha salvado exitosamente" + share_success_detail: "Tu programa es ahora público" + unshare_success_detail: "Tu programa es ahora privado" + copy_clipboard: "Copiado al portapapeles" answer_question: "No puedes ejecutar el programa hasta que hayas contestado la pregunta" Programs: recent: "Mis programas recientes" @@ -178,5 +181,10 @@ Programs: open: "Abrir" delete: "Borrar" delete_confirm: "Estás seguro de querer borrar este programa?" + share: "Compartir públicamente" + share_confirm: "Estás seguro/a de querer volver público tu programa?" + unshare: "Dejar de compartir" + unshare_confirm: "Estás seguro/a de querer volver privado tu programa?" + copy_link_to_share: "Copiar vínculo para compartir" no_programs: "Todavía no tienes programas guardados." write_first: "¡Escribe tu primer programa!" diff --git a/coursedata/texts/fr.yaml b/coursedata/texts/fr.yaml index bd39b3eb3e3..d3b88f07474 100644 --- a/coursedata/texts/fr.yaml +++ b/coursedata/texts/fr.yaml @@ -166,6 +166,9 @@ Auth: unsaved_changes: "Le programme en cours n'a pas été sauvegardé. Souhaites-tu vraiment quitter sans le sauvegarder ?" save_success: "Succès" save_success_detail: "Le programme a été sauvegardé avec succès" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "Tu ne peux pas exécuter le programme avant d'avoir répondu à la question." Programs: recent: "Mes programmes récents" @@ -178,5 +181,10 @@ Programs: open: "Ouvrir" delete: "Supprimer" delete_confirm: "Es-tu sûr de vouloir supprimer ce programme ?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "Tu n'as pas encore de programme." write_first: "Écris ton premier programme !" diff --git a/coursedata/texts/hu.yaml b/coursedata/texts/hu.yaml index b27772251cc..c57c918d2b3 100644 --- a/coursedata/texts/hu.yaml +++ b/coursedata/texts/hu.yaml @@ -122,6 +122,9 @@ Auth: unsaved_changes: "Nem mentett programod van. El akarsz menni mentés nélkül?" save_success: "Siker" save_success_detail: "A programot sikeresen mentetted" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "Addig nem futtathatod a programot, amíg nem válaszolsz a kérdésre " Programs: recent: "Legutóbbi programjaim " @@ -134,5 +137,10 @@ Programs: open: "Nyitva" delete: "Törlés" delete_confirm: "Biztosan törlöd a programot?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "Még nincs programod." write_first: "Írd meg az első programot!" diff --git a/coursedata/texts/it.yaml b/coursedata/texts/it.yaml index 172bf3ef314..02d846146c4 100644 --- a/coursedata/texts/it.yaml +++ b/coursedata/texts/it.yaml @@ -126,6 +126,9 @@ Auth: unsaved_changes: "Hai un programma non salvato. Vuoi uscire senza salvare?" save_success: "Successo" save_success_detail: "Programma salvato con successo" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "Non puoi eseguire il programma prima di rispondere alla domanda" Programs: recent: "I miei programmi recenti" @@ -138,5 +141,10 @@ Programs: open: "Apri" delete: "Elimina" delete_confirm: "Sei sicura\\o di voler eliminare questo programma?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "Non hai ancora nessun programma." write_first: "Scrivi il tuo primo programma!" diff --git a/coursedata/texts/nl.yaml b/coursedata/texts/nl.yaml index 95b7c621718..df2e3ca748a 100644 --- a/coursedata/texts/nl.yaml +++ b/coursedata/texts/nl.yaml @@ -171,6 +171,9 @@ Auth: unsaved_changes: "Jouw programma is niet opgeslagen. Wil je weggaan zonder het op te slaan?" save_success: "Gelukt!" save_success_detail: "Je programma is opgeslagen" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "Je kunt het programma pas uitvoeren als je de vraag hebt beantwoord." Programs: recent: "Mijn programma's" @@ -183,5 +186,10 @@ Programs: open: "Openen" delete: "Verwijderen" delete_confirm: "Weet je zeker dat je het programma wil verwijderen?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "Nog geen programma's." write_first: "Schrijf je eerste programma!" diff --git a/coursedata/texts/pt_br.yaml b/coursedata/texts/pt_br.yaml index 65f3f07f41e..898c13721e1 100644 --- a/coursedata/texts/pt_br.yaml +++ b/coursedata/texts/pt_br.yaml @@ -166,6 +166,9 @@ Auth: unsaved_changes: "You have an unsaved program. Do you want to leave without saving it?" save_success: "Success" save_success_detail: "Program saved successfully" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "You can't run the program until you answer the question first" Programs: recent: "My recent programs" @@ -178,5 +181,10 @@ Programs: open: "Open" delete: "Delete" delete_confirm: "Are you sure you want to delete the program?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "You have no programs yet." write_first: "Write your first program!" diff --git a/coursedata/texts/sw.yaml b/coursedata/texts/sw.yaml index 007682c953d..2c92ee68867 100644 --- a/coursedata/texts/sw.yaml +++ b/coursedata/texts/sw.yaml @@ -114,6 +114,9 @@ Auth: unsaved_changes: "Una programu ambao haujahifadhiwa. Je! Unataka kuondoka bila kuiokoa?" save_success: "Fanikio" save_success_detail: "Programu imehifadhiwa kwa mafanikio" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "Huwezi kuendesha programu hadi ujibu swali kwanza" Programs: recent: "Programu zangu za hivi karibuni" @@ -126,5 +129,10 @@ Programs: open: "Fungua" delete: "Futa" delete_confirm: "Je! Una uhakika unataka kufuta programu?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "Bado hauna programu" - write_first: "Andika programu yako ya kwanza." \ No newline at end of file + write_first: "Andika programu yako ya kwanza." diff --git a/coursedata/texts/zh.yaml b/coursedata/texts/zh.yaml index 51a6884bdfe..3946f58514a 100644 --- a/coursedata/texts/zh.yaml +++ b/coursedata/texts/zh.yaml @@ -162,6 +162,9 @@ Auth: unsaved_changes: "你有一个未保存的程序. 你想不保存就离开吗?" save_success: "成功" save_success_detail: "程序保存成功" + share_success_detail: "Program shared successfully" + unshare_success_detail: "Program unshared successfully" + copy_clipboard: "Successfully copied to clipboard" answer_question: "你要先回答问题,才能运行程序." Programs: recent: "我最近的程序" @@ -174,5 +177,10 @@ Programs: open: "打开" delete: "删除" delete_confirm: "你确定要删除这个程序吗?" + share: "Share" + share_confirm: "Are you sure you want to make the program public?" + unshare: "Unshare" + unshare_confirm: "Are you sure you want to make the program private?" + copy_link_to_share: "Copy link to share" no_programs: "你还没有程序." write_first: "编写你的第一个程序!" diff --git a/gunicorn.conf.py b/gunicorn.conf.py index 2c9b95f11d1..18da0717ed4 100644 --- a/gunicorn.conf.py +++ b/gunicorn.conf.py @@ -4,5 +4,6 @@ def worker_exit(server, worker): # When the worker is being exited (perhaps because of a timeout), # give the query_log handler a chance to flush to disk. - from website import querylog + from website import querylog, jsonbin querylog.emergency_shutdown() + jsonbin.emergency_shutdown() diff --git a/requirements.txt b/requirements.txt index 9aa979edd7a..a1d8bcbc2cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,8 @@ PyYAML==5.4 attrs==19.3.0 Flask-Commonmark==0.8 bcrypt==3.2.0 -boto3==1.16.50 levenshtein==0.12.0 +boto3>=1.16.50 ruamel.yaml==0.17.4 pylint==2.8.2 +awscli>=1.19.88 diff --git a/static/js/app.js b/static/js/app.js index 7a9743086c0..f8e62071d90 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -376,16 +376,40 @@ window.share_program = function share_program (id, Public, reload) { contentType: 'application/json', dataType: 'json' }).done(function(response) { - $ ('#okbox').show (); - $ ('#okbox .caption').html (window.auth.texts.save_success); - $ ('#okbox .details').html (window.auth.texts.share_success_detail); - if (reload) location.reload (); + if ($ ('#okbox') && $ ('#okbox').length) { + $ ('#okbox').show (); + $ ('#okbox .caption').html (window.auth.texts.save_success); + $ ('#okbox .details').html (Public ? window.auth.texts.share_success_detail : window.auth.texts.unshare_success_detail); + } + else { + alert (Public ? window.auth.texts.share_success_detail : window.auth.texts.unshare_success_detail); + } + if (reload) setTimeout (function () {location.reload ()}, 1000); }).fail(function(err) { console.error(err); error.show(ErrorMessages.Connection_error, JSON.stringify(err)); }); } +window.copy_to_clipboard = function copy_to_clipboard (string) { + // https://hackernoon.com/copying-text-to-clipboard-with-javascript-df4d4988697f + var el = document.createElement ('textarea'); + el.value = string; + el.setAttribute ('readonly', ''); + el.style.position = 'absolute'; + el.style.left = '-9999px'; + document.body.appendChild (el); + var selected = document.getSelection ().rangeCount > 0 ? document.getSelection ().getRangeAt (0) : false; + el.select (); + document.execCommand ('copy'); + document.body.removeChild (el); + if (selected) { + document.getSelection ().removeAllRanges (); + document.getSelection ().addRange (selected); + } + alert (window.auth.texts.copy_clipboard); +} + /** * Do a POST with the error to the server so we can log it */ diff --git a/templates/programs.html b/templates/programs.html index bfe0593a946..b59fcb6100e 100644 --- a/templates/programs.html +++ b/templates/programs.html @@ -20,6 +20,7 @@ {{ texts.delete }} {% if program.public %} {{ texts.unshare}} + {{ texts.copy_link_to_share }} {% else %} {{ texts.share}} {% endif %} diff --git a/tests/test_querylog.py b/tests/test_querylog.py index eb0088fb812..f30c69be1d2 100644 --- a/tests/test_querylog.py +++ b/tests/test_querylog.py @@ -1,4 +1,4 @@ -from website import querylog +from website import querylog, log_queue import unittest class TestQueryLog(unittest.TestCase): @@ -25,7 +25,7 @@ def test_emergency_recovery(self): querylog.emergency_shutdown() - recovered_queue = querylog.LogQueue(batch_window_s=300) + recovered_queue = log_queue.LogQueue('querylog', batch_window_s=300) recovered_queue.try_load_emergency_saves() recovered_queue.set_transmitter(self._fake_transmitter) diff --git a/tools/view-logs b/tools/download-logs similarity index 79% rename from tools/view-logs rename to tools/download-logs index 7b70363483f..01417890350 100755 --- a/tools/view-logs +++ b/tools/download-logs @@ -26,4 +26,4 @@ sed -e '$a\' $cache_dir/* > $finalfile echo "Now run a command like:" echo "" -echo " cat $finalfile | recs grep '{{duration_ms}} > 10000' | recs totable -k start_time,duration_ms,method,path,'!_ms\$!' | less -S" +echo " cat $finalfile | recs grep '{{duration_ms}} > 1000' | tee _lastquery.jsonl | recs totable -k start_time,duration_ms,method,path,'!_ms\$!' | less -S" diff --git a/tools/download-programs b/tools/download-programs new file mode 100755 index 00000000000..892609fe994 --- /dev/null +++ b/tools/download-programs @@ -0,0 +1,60 @@ +#!/bin/bash +set -eu +scriptdir=$(cd $(dirname $0) && pwd) + +if ! type aws > /dev/null; then + echo "Install the AWS CLI before running this script." >&2 + exit 1 +fi + +if ! grep '\[hedy-logs-viewer\]' ~/.aws/credentials > /dev/null; then + echo "Add the following block to your ~/.aws/credentials file:" >&2 + echo "" >&2 + echo "[hedy-logs-viewer]" >&2 + echo "aws_access_key_id = AKIA***********" >&2 + echo "aws_secret_access_key = **************" >&2 + echo "" + echo "(Ask someone from the team for the actual keys)" >&2 + exit 1 +fi + +#---------------------------------------------------------------------- +hedy_env="" +prefix="" + +usage() { + echo "download-programs -e [-d ] " >&2 +} + +while getopts "he:d:" OPTION +do + case $OPTION in + e) + hedy_env="$OPTARG" + ;; + d) + prefix="/$OPTARG" + ;; + h) + usage + exit 0 + ;; + esac +done +shift $((OPTIND -1)) + +dir="${1:-}" +if [[ "${dir}" == "" ]]; then + usage + exit 1 +fi + +#---------------------------------------------------------------------- + +mkdir -p "$dir" + +export AWS_DEFAULT_REGION=eu-west-1 +export AWS_PROFILE=hedy-logs-viewer +bucket=hedy-parse-logs + +aws s3 sync s3://${bucket}/${hedy_env}/${prefix} $dir diff --git a/website/aws_helpers.py b/website/aws_helpers.py index da337a44c7e..df84f52e9fa 100644 --- a/website/aws_helpers.py +++ b/website/aws_helpers.py @@ -6,7 +6,8 @@ import config import utils -def s3_transmitter_from_env(): + +def s3_querylog_transmitter_from_env(): """Return an S3 transmitter, or return None.""" have_aws_creds = os.getenv('AWS_ACCESS_KEY_ID') and os.getenv('AWS_SECRET_ACCESS_KEY') @@ -14,26 +15,39 @@ def s3_transmitter_from_env(): logging.warning('Unable to initialize S3 querylogger (missing AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY)') return None - return transmit_to_s3 + return make_s3_transmitter(config.config['s3-query-logs']) -def transmit_to_s3(timestamp, records): - """Transmit logfiles to S3 with default config.""" - s3config = config.config['s3-query-logs'] +def s3_parselog_transmitter_from_env(): + """Return an S3 transmitter, or return None.""" + have_aws_creds = os.getenv('AWS_ACCESS_KEY_ID') and os.getenv('AWS_SECRET_ACCESS_KEY') - # No need to configure credentials, we've already confirmed they are in the environment. - s3 = boto3.client('s3', region_name=s3config['region']) + if not have_aws_creds: + logging.warning('Unable to initialize S3 parse logger (missing AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY)') + return None - # Grouping in the key is important, we need this to zoom into an interesting - # log period. - key = s3config['prefix'] + utils.isoformat(timestamp).replace('T', '/') + s3config['postfix'] + '.jsonl' + return make_s3_transmitter(config.config['s3-parse-logs']) - # Store as json-lines format - body = '\n'.join(json.dumps(r) for r in records) - s3.put_object( - Bucket=s3config['bucket'], - Key=key, - StorageClass='STANDARD_IA', # Cheaper, applicable for logs - Body=body) - logging.debug(f'Wrote {len(records)} query logs to s3://{s3config["bucket"]}/{key}') \ No newline at end of file +def make_s3_transmitter(s3config): + """Make a transmitter function (for use with a LogQueue) which will save records to S3.""" + def transmit_to_s3(timestamp, records): + """Transmit logfiles to S3 with default config.""" + + # No need to configure credentials, we've already confirmed they are in the environment. + s3 = boto3.client('s3', region_name=s3config['region']) + + # Grouping in the key is important, we need this to zoom into an interesting + # log period. + key = s3config.get('prefix', '') + utils.isoformat(timestamp).replace('T', '/') + s3config.get('postfix', '') + '.jsonl' + + # Store as json-lines format + body = '\n'.join(json.dumps(r) for r in records) + + s3.put_object( + Bucket=s3config['bucket'], + Key=key, + StorageClass='STANDARD_IA', # Cheaper, applicable for logs + Body=body) + logging.debug(f'Wrote {len(records)} query logs to s3://{s3config["bucket"]}/{key}') + return transmit_to_s3 diff --git a/website/jsonbin.py b/website/jsonbin.py index a8ba5ed0972..7c70fcd78c6 100644 --- a/website/jsonbin.py +++ b/website/jsonbin.py @@ -5,6 +5,9 @@ import requests import logging +from . import log_queue +from . import aws_helpers + logger = logging.getLogger('jsonbin') class JsonBinLogger: @@ -69,7 +72,46 @@ def _run(self): except Exception: logger.exception(f'Error posting to jsonbin.') + class NullJsonbinLogger(): """A jsonbin logger that doesn't actually do anything.""" def log(self, obj): - pass \ No newline at end of file + pass + + +class MultiParseLogger(): + """A logger that forwards to other loggers.""" + def __init__(self, *loggers): + self.loggers = loggers + + def log(self, obj): + for logger in self.loggers: + logger.log(obj) + + +class S3ParseLogger(): + """A logger that logs to S3. + + - Well then why is it in a file called 'jsonbin.py'? + + - Legacy, young grasshopper. Legacy. + """ + @staticmethod + def from_env_vars(): + transmitter = aws_helpers.s3_parselog_transmitter_from_env() + if not transmitter: + return NullJsonbinLogger() + + S3_LOG_QUEUE.set_transmitter(transmitter) + return S3ParseLogger() + + def log(self, obj): + S3_LOG_QUEUE.add(obj) + + +S3_LOG_QUEUE = log_queue.LogQueue('parse', batch_window_s=300) +S3_LOG_QUEUE.try_load_emergency_saves() + +def emergency_shutdown(): + """The process is being killed. Do whatever needs to be done to save the logs.""" + S3_LOG_QUEUE.emergency_save_to_disk() \ No newline at end of file diff --git a/website/log_queue.py b/website/log_queue.py new file mode 100644 index 00000000000..8076b46f80d --- /dev/null +++ b/website/log_queue.py @@ -0,0 +1,155 @@ +import collections +import glob +import json +import logging +import os +import threading +import time +import traceback + +class LogQueue: + """A queue of records that still need to be written out. + + For efficiency's sake, records are grouped into time windows of + 'batch_window_s' seconds (say, 5 minutes). x'es below indicate + log record events: + + 300 600 900 + | x x x | x | + --+-----------------+-----------------+--------- + ^ ^ ^ + wake wake wake + + Upon 'wake' events (every batch window seconds), a background thread + wakes up and collects all events from previous time windows. + + We need to use a mutex since the dict we keep the queue in is not + thread-safe. We do as little work as possible every time we hold the mutex + to allow for maximum parallelism. + """ + def __init__(self, name, batch_window_s, do_print=False): + self.name = name + self.records_queue = collections.defaultdict(list) + self.batch_window_s = batch_window_s + self.transmitter = None + self.do_print = do_print + self.mutex = threading.Lock() + self.thread = threading.Thread(target=self._write_thread, name=f'{name}Writer', daemon=True) + self.thread.start() + + def add(self, data): + bucket = div_clip(time.time(), self.batch_window_s) + + if self.do_print: + logging.debug(repr(data)) + + with self.mutex: + self.records_queue[bucket].append(data) + + def set_transmitter(self, transmitter): + """Configure a function that will be called for every set of records. + + The function looks like: + + def transmitter(timestamp, records) -> Boolean: + ... + """ + self.transmitter = transmitter + + def emergency_save_to_disk(self): + """Save all untransmitted records to disk. + + They will be picked up and attempted to be transmitted by a future + (restarted) process. + """ + all_records = [] + with self.mutex: + for records in self.records_queue.values(): + all_records.extend(records) + self.records_queue.clear() + + if not all_records: + return + + filename = f'{self.name}_dump.{os.getpid()}.{time.time()}.jsonl' + with open(filename, 'w') as f: + json.dump(all_records, f) + + def try_load_emergency_saves(self): + """Try to load emergency saves from disk, if found. + + There may be multiple LogQueues trying to load the same files at the + same time, so we need to be careful that only one of them actually + loads a single file (in order to avoid record duplication). + + We use the atomicity of renaming the file as a way of claiming ownership of it. + """ + candidates = glob.glob(f'{self.name}_dump.*.jsonl') + for candidate in candidates: + try: + claim_name = candidate + '.claimed' + os.rename(candidate, claim_name) + + # If this succeeded, we're guaranteed to be able to read this file (and because + # we renamed it to something not matching the glob pattern, no one else is going to + # try to pick it up later) + with open(claim_name, 'r') as f: + all_records = json.load(f) + + bucket = div_clip(time.time(), self.batch_window_s) + with self.mutex: + self.records_queue[bucket].extend(all_records) + os.unlink(claim_name) + except OSError: + pass + + def transmit_now(self, max_time=None): + """(Try to) transmit all pending records with recording timestamps smaller than the given time now.""" + with self.mutex: + keys = list(self.records_queue.keys()) + keys.sort() + + max_time = max_time or time.time() + buckets_to_send = [k for k in keys if k < max_time] + + for bucket_ts in buckets_to_send: + # Get the records out of the queue + with self.mutex: + bucket_records = self.records_queue[bucket_ts] + + # Try to send the records (to signal failure, this can either + # throw or return False, depending on how loud it wants to be). + success = self._save_records(bucket_ts, bucket_records) + + # Only remove them from the queue if sending didn't fail + if success != False: + with self.mutex: + del self.records_queue[bucket_ts] + + def _save_records(self, timestamp, records): + if self.transmitter: + return self.transmitter(timestamp, records) + else: + count = len(records) + logging.warn(f'No querylog transmitter configured, {count} records dropped') + + def _write_thread(self): + """Background thread which will wake up every batch_window_s seconds to emit records from the queue.""" + next_wake = div_clip(time.time(), self.batch_window_s) + self.batch_window_s + while True: + try: + # Wait for next wake time + time.sleep(max(0, next_wake - time.time())) + + # Once woken, see what buckets we have left to push (all buckets + # with numbers lower than next_wake) + self.transmit_now(next_wake) + except Exception as e: + traceback.print_exc(e) + next_wake += self.batch_window_s + + +def div_clip(x, y): + """Return the highest value < x that's a multiple of y.""" + return int(x // y) * y + diff --git a/website/querylog.py b/website/querylog.py index 6c35d697d57..bff0c66d4bb 100644 --- a/website/querylog.py +++ b/website/querylog.py @@ -1,16 +1,14 @@ import time -import collections import functools import threading import logging import os -import glob -import json import datetime import resource -import traceback import logging +from . import log_queue + logger = logging.getLogger('querylog') class LogRecord: @@ -42,7 +40,7 @@ def finish(self): # There should be 0, but who knows self._terminate_running_timers() - LOG_QUEUE.add(self) + LOG_QUEUE.add(self.as_data()) def set(self, **kwargs): """Set keys based on keyword arguments.""" @@ -177,157 +175,10 @@ def __exit__(self, type, value, tb): self.finish() -class LogQueue: - """A queue of records that still need to be written out. - - For efficiency's sake, records are grouped into time windows of - 'batch_window_s' seconds (say, 5 minutes). x'es below indicate - log record events: - - 300 600 900 - | x x x | x | - --+-----------------+-----------------+--------- - ^ ^ ^ - wake wake wake - - Upon 'wake' events (every batch window seconds), a background thread - wakes up and collects all events from previous time windows. - - We need to use a mutex since the dict we keep the queue in is not - thread-safe. We do as little work as possible every time we hold the mutex - to allow for maximum parallelism. - """ - def __init__(self, batch_window_s, do_print=False): - self.records_queue = collections.defaultdict(list) - self.batch_window_s = batch_window_s - self.transmitter = None - self.do_print = do_print - self.mutex = threading.Lock() - self.thread = threading.Thread(target=self._write_thread, name='QueryLogWriter', daemon=True) - self.thread.start() - - def add(self, record): - bucket = div_clip(time.time(), self.batch_window_s) - data = record.as_data() - - if self.do_print: - logging.debug(repr(data)) - - with self.mutex: - self.records_queue[bucket].append(data) - - def set_transmitter(self, transmitter): - """Configure a function that will be called for every set of records. - - The function looks like: - - def transmitter(timestamp, records) -> Boolean: - ... - """ - self.transmitter = transmitter - - def emergency_save_to_disk(self): - """Save all untransmitted records to disk. - - They will be picked up and attempted to be transmitted by a future - (restarted) process. - """ - all_records = [] - with self.mutex: - for records in self.records_queue.values(): - all_records.extend(records) - self.records_queue.clear() - - if not all_records: - return - - filename = f'querylog_dump.{os.getpid()}.{time.time()}.jsonl' - with open(filename, 'w') as f: - json.dump(all_records, f) - - def try_load_emergency_saves(self): - """Try to load emergency saves from disk, if found. - - There may be multiple LogQueues trying to load the same files at the - same time, so we need to be careful that only one of them actually - loads a single file (in order to avoid record duplication). - - We use the atomicity of renaming the file as a way of claiming ownership of it. - """ - candidates = glob.glob('querylog_dump.*.jsonl') - for candidate in candidates: - try: - claim_name = candidate + '.claimed' - os.rename(candidate, claim_name) - - # If this succeeded, we're guaranteed to be able to read this file (and because - # we renamed it to something not matching the glob pattern, no one else is going to - # try to pick it up later) - with open(claim_name, 'r') as f: - all_records = json.load(f) - - bucket = div_clip(time.time(), self.batch_window_s) - with self.mutex: - self.records_queue[bucket].extend(all_records) - os.unlink(claim_name) - except OSError: - pass - - def transmit_now(self, max_time=None): - """(Try to) transmit all pending records with recording timestamps smaller than the given time now.""" - with self.mutex: - keys = list(self.records_queue.keys()) - keys.sort() - - max_time = max_time or time.time() - buckets_to_send = [k for k in keys if k < max_time] - - for bucket_ts in buckets_to_send: - # Get the records out of the queue - with self.mutex: - bucket_records = self.records_queue[bucket_ts] - - # Try to send the records (to signal failure, this can either - # throw or return False, depending on how loud it wants to be). - success = self._save_records(bucket_ts, bucket_records) - - # Only remove them from the queue if sending didn't fail - if success != False: - with self.mutex: - del self.records_queue[bucket_ts] - - def _save_records(self, timestamp, records): - if self.transmitter: - return self.transmitter(timestamp, records) - else: - count = len(records) - logging.warn(f'No querylog transmitter configured, {count} records dropped') - - def _write_thread(self): - """Background thread which will wake up every batch_window_s seconds to emit records from the queue.""" - next_wake = div_clip(time.time(), self.batch_window_s) + self.batch_window_s - while True: - try: - # Wait for next wake time - time.sleep(max(0, next_wake - time.time())) - - # Once woken, see what buckets we have left to push (all buckets - # with numbers lower than next_wake) - self.transmit_now(next_wake) - except Exception as e: - traceback.print_exc(e) - next_wake += self.batch_window_s - - -def div_clip(x, y): - """Return the highest value < x that's a multiple of y.""" - return int(x // y) * y - - def ms_from_fsec(x): """Milliseconds from fractional seconds.""" return int(x * 1000) -LOG_QUEUE = LogQueue(batch_window_s=300) +LOG_QUEUE = log_queue.LogQueue('querylog', batch_window_s=300) LOG_QUEUE.try_load_emergency_saves()