From c6476590206a5e99ee7d2e77eee1d3906ef1c18a Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sat, 8 Jun 2024 18:08:19 +0200 Subject: [PATCH 01/10] Fetch results of evening voting sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While on most plenary days, there is only one voting session around midday, on some days there is another sesssion in the evening, usually around 17:00. The vote results of the evening sessions are appended to the same source document that also contains the results of the midday votes. Currently, we only run the `RCVListPipeline` between 12:00 and 15:00 until we’ve been able to fetch vote results successfully. Once we’ve fetched the results, we do not attempt to fetch them again. That means that so far we did not (automatically) fetch results of the evening voting session. In addition to the current behavior, this change tries to fetch vote results between 17:00 and 20:00, until we’ve been able to fetch them successfully a second time. This is only the first part of the solution, as we also need to check that we only stop scraping vote results once we’ve been able to fetch updated results (e.g. by storing a hash of the source data for every successful pipeline run). --- backend/howtheyvote/worker/__init__.py | 51 +++++++++++++++++--------- backend/howtheyvote/worker/worker.py | 19 ++++++++++ backend/tests/worker/test_worker.py | 44 +++++++++++++++++++++- 3 files changed, 95 insertions(+), 19 deletions(-) diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index d50718d8f..36ac5f28a 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -2,7 +2,7 @@ import time from prometheus_client import Gauge -from sqlalchemy import exists, func, select +from sqlalchemy import select from structlog import get_logger from .. import config @@ -12,12 +12,12 @@ from ..models import PipelineRun, PipelineRunResult, PlenarySession from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline from ..query import session_is_current_at -from .worker import SkipPipelineError, Weekday, Worker +from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully log = get_logger(__name__) -def op_rcv() -> None: +def op_rcv_midday() -> None: """Checks if there is a current plenary session and, if yes, fetches the latest roll-call vote results.""" today = datetime.date.today() @@ -25,7 +25,25 @@ def op_rcv() -> None: if not _is_session_day(today): raise SkipPipelineError() - if _ran_successfully(RCVListPipeline, today): + if pipeline_ran_successfully(RCVListPipeline, today): + raise SkipPipelineError() + + pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) + pipeline.run() + + +def op_rcv_evening() -> None: + """While on most plenary days, there’s only one voting session around midday, on some days + there is another sesssion in the evening, usually around 17:00. The vote results of the + evening sessions are appended to the same source document that also contains the results + of the midday votes. This method fetches the latest roll-call vote results, even if they + have been fetched successfully earlier on the same day.""" + today = datetime.date.today() + + if not _is_session_day(today): + raise SkipPipelineError() + + if pipeline_ran_successfully(RCVListPipeline, today, count=2): raise SkipPipelineError() pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) @@ -75,19 +93,6 @@ def _is_session_day(date: datetime.date) -> bool: return session is not None -def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: - """Check if a given pipeline has been run successfully on a given day.""" - query = ( - exists() - .where(PipelineRun.pipeline == pipeline.__name__) - .where(func.date(PipelineRun.started_at) == func.date(date)) - .where(PipelineRun.result == PipelineRunResult.SUCCESS) - .select() - ) - - return bool(Session.execute(query).scalar()) - - worker = Worker() # Mon at 04:00 @@ -110,7 +115,7 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: # Mon-Thu between 12:00 and 15:00, every 10 mins worker.schedule_pipeline( - op_rcv, + op_rcv_midday, name=RCVListPipeline.__name__, weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, hours=range(12, 15), @@ -118,6 +123,16 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: tz=config.TIMEZONE, ) +# Mon-Thu between 17:00 and 20:00, every 10 mins +worker.schedule_pipeline( + op_rcv_evening, + name=RCVListPipeline.__name__, + weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, + hours=range(17, 20), + minutes=range(0, 60, 10), + tz=config.TIMEZONE, +) + # Mon-Thu, between 13:00 and 20:00, every 30 mins worker.schedule_pipeline( op_press, diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index c17621e0f..91667c618 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -9,6 +9,7 @@ from prometheus_client import Counter, Gauge, Histogram from prometheus_client import start_http_server as start_metrics_server from schedule import Scheduler +from sqlalchemy import func, select from structlog import get_logger from .. import config @@ -58,6 +59,24 @@ class Weekday(enum.Enum): Handler = Callable[..., Any] +def pipeline_ran_successfully( + pipeline: type[object], + date: datetime.date, + count: int = 1, +) -> bool: + """Check if a given pipeline has been run successfully on a given day.""" + query = ( + select(func.count()) + .select_from(PipelineRun) + .where(PipelineRun.pipeline == pipeline.__name__) + .where(func.date(PipelineRun.started_at) == func.date(date)) + .where(PipelineRun.result == PipelineRunResult.SUCCESS) + ) + result = Session.execute(query).scalar() or 0 + + return result >= count + + class Worker: """Running a worker starts a long-running process that executes data pipelines in regular intervals and stores the result of the pipeline runs in the database.""" diff --git a/backend/tests/worker/test_worker.py b/backend/tests/worker/test_worker.py index 3943ae2a9..61b54319a 100644 --- a/backend/tests/worker/test_worker.py +++ b/backend/tests/worker/test_worker.py @@ -5,7 +5,7 @@ from howtheyvote.models import PipelineRun, PipelineRunResult from howtheyvote.pipelines import DataUnavailableError, PipelineError -from howtheyvote.worker.worker import Weekday, Worker +from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully def get_handler(): @@ -170,3 +170,45 @@ def pipeline_error(): assert runs[1].pipeline == "pipeline_error" assert runs[1].result == PipelineRunResult.FAILURE + + +def test_pipeline_ran_successfully(db_session): + class TestPipeline: + pass + + now = datetime.datetime.now() + today = now.date() + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.FAILURE, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is True + assert pipeline_ran_successfully(TestPipeline, today, count=2) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today, count=2) is True From b49b945e2117804043fd7fe4c2fcc1b52a71baf5 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sun, 29 Dec 2024 13:41:01 +0100 Subject: [PATCH 02/10] Move `load_fixture` helper to parent package as it's useful for other tests, not just scraper tests --- backend/tests/helpers.py | 6 +++++ backend/tests/scrapers/helpers.py | 7 ------ backend/tests/scrapers/test_members.py | 19 ++++++-------- backend/tests/scrapers/test_sessions.py | 4 +-- backend/tests/scrapers/test_votes.py | 33 ++++++++++++------------- 5 files changed, 32 insertions(+), 37 deletions(-) delete mode 100644 backend/tests/scrapers/helpers.py diff --git a/backend/tests/helpers.py b/backend/tests/helpers.py index b25e3abb3..ce9bd421b 100644 --- a/backend/tests/helpers.py +++ b/backend/tests/helpers.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Any from sqlalchemy.orm import DeclarativeBase @@ -5,3 +6,8 @@ def record_to_dict(record: DeclarativeBase) -> dict[str, Any]: return {c.name: getattr(record, c.name) for c in record.__table__.columns} + + +def load_fixture(path: str) -> str: + base = Path(__file__).resolve().parent + return base.joinpath(path).read_text() diff --git a/backend/tests/scrapers/helpers.py b/backend/tests/scrapers/helpers.py deleted file mode 100644 index c8be91285..000000000 --- a/backend/tests/scrapers/helpers.py +++ /dev/null @@ -1,7 +0,0 @@ -from pathlib import Path - -FIXTURES_BASE = Path(__file__).resolve().parent / "data" - - -def load_fixture(path: str) -> str: - return FIXTURES_BASE.joinpath(path).read_text() diff --git a/backend/tests/scrapers/test_members.py b/backend/tests/scrapers/test_members.py index 1ef309149..607ea5bd1 100644 --- a/backend/tests/scrapers/test_members.py +++ b/backend/tests/scrapers/test_members.py @@ -1,17 +1,14 @@ from datetime import date -from pathlib import Path from howtheyvote.scrapers import MemberGroupsScraper, MemberInfoScraper, MembersScraper -from .helpers import load_fixture - -TEST_DATA_DIR = Path(__file__).resolve().parent / "data" +from ..helpers import load_fixture def test_members_scraper(responses): responses.get( "https://www.europarl.europa.eu/meps/en/directory/xml/?leg=9", - body=load_fixture("members/members_directory_term_9.xml"), + body=load_fixture("scrapers/data/members/members_directory_term_9.xml"), ) scraper = MembersScraper(term=9) @@ -27,7 +24,7 @@ def test_members_scraper(responses): def test_member_info_scraper(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124834/NAME/home", - body=load_fixture("members/member_info_sonneborn_home.html"), + body=load_fixture("scrapers/data/members/member_info_sonneborn_home.html"), ) scraper = MemberInfoScraper(web_id=124834) @@ -48,7 +45,7 @@ def test_member_info_scraper(responses): def test_member_info_scraper_date_of_birth_without(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124831/NAME/home", - body=load_fixture("members/member_info_adinolfi_home.html"), + body=load_fixture("scrapers/data/members/member_info_adinolfi_home.html"), ) scraper = MemberInfoScraper(web_id=124831) @@ -59,7 +56,7 @@ def test_member_info_scraper_date_of_birth_without(responses): def test_member_info_scraper_multiple_emails(responses): responses.get( "https://www.europarl.europa.eu/meps/en/28229/NAME/home", - body=load_fixture("members/member_info_weber_home.html"), + body=load_fixture("scrapers/data/members/member_info_weber_home.html"), ) scraper = MemberInfoScraper(web_id=28229) @@ -70,7 +67,7 @@ def test_member_info_scraper_multiple_emails(responses): def test_member_info_scraper_no_social_media(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124831/NAME/home", - body=load_fixture("members/member_info_adinolfi_home.html"), + body=load_fixture("scrapers/data/members/member_info_adinolfi_home.html"), ) scraper = MemberInfoScraper(web_id=124831) @@ -81,7 +78,7 @@ def test_member_info_scraper_no_social_media(responses): def test_member_groups_scraper(responses): responses.get( "https://www.europarl.europa.eu/meps/en/124831/NAME/history/8", - body=load_fixture("members/member_groups_adinolfi_term_8.html"), + body=load_fixture("scrapers/data/members/member_groups_adinolfi_term_8.html"), ) scraper = MemberGroupsScraper(web_id=124831, term=8) @@ -114,7 +111,7 @@ def test_member_groups_scraper(responses): def test_member_groups_scraper_ongoing(responses): responses.get( "https://www.europarl.europa.eu/meps/en/28229/NAME/history/10", - body=load_fixture("members/member_groups_weber_term_10.html"), + body=load_fixture("scrapers/data/members/member_groups_weber_term_10.html"), ) scraper = MemberGroupsScraper(web_id=28229, term=10) diff --git a/backend/tests/scrapers/test_sessions.py b/backend/tests/scrapers/test_sessions.py index dbc5a567e..2cc305e76 100644 --- a/backend/tests/scrapers/test_sessions.py +++ b/backend/tests/scrapers/test_sessions.py @@ -3,13 +3,13 @@ from howtheyvote.models import PlenarySessionLocation from howtheyvote.scrapers import ODPSessionScraper -from .helpers import load_fixture +from ..helpers import load_fixture def test_odp_session_scraper(responses): responses.get( "https://data.europarl.europa.eu/api/v1/meetings/MTG-PL-2024-07-16", - body=load_fixture("sessions/odp_mtg-pl-2024-07-16.xml"), + body=load_fixture("scrapers/data/sessions/odp_mtg-pl-2024-07-16.xml"), ) scraper = ODPSessionScraper(start_date=datetime.date(2024, 7, 16)) diff --git a/backend/tests/scrapers/test_votes.py b/backend/tests/scrapers/test_votes.py index d66ba5058..c2b21b9ad 100644 --- a/backend/tests/scrapers/test_votes.py +++ b/backend/tests/scrapers/test_votes.py @@ -11,15 +11,14 @@ RCVListScraper, ) -from ..helpers import record_to_dict -from .helpers import load_fixture +from ..helpers import load_fixture, record_to_dict @pytest.mark.always_mock_requests def test_rcv_list_scraper(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-07-23-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -65,7 +64,7 @@ def test_rcv_list_scraper(responses): def test_rcv_list_scraper_incorrect_totals(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-07-23-RCV_FR.xml", - body=load_fixture("votes/rcv_list_incorrect_pv-9-2020-07-23-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_incorrect_pv-9-2020-07-23-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -88,7 +87,7 @@ def test_rcv_list_scraper_incorrect_totals(responses): def test_rcv_list_scraper_did_not_vote(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-07-23-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2020-07-23-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -118,7 +117,7 @@ def test_rcv_list_scraper_did_not_vote(responses): def test_rcv_list_scraper_same_name(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2020-09-15-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2020-09-15-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2020-09-15-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -144,7 +143,7 @@ def test_rcv_list_scraper_same_name(responses): def test_rcv_list_scraper_pers_id(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2023-12-12-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), ) # The voting list has a different spelling ("Glueck" instead of "Glück"). Cases like this @@ -171,7 +170,7 @@ def test_rcv_list_scraper_pers_id(responses): def test_rcv_list_scraper_pers_id_unknown(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2023-12-12-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2023-12-12-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -192,7 +191,7 @@ def test_rcv_list_scraper_document_register(responses): ) register_mock = responses.get( "https://www.europarl.europa.eu/RegData/seance_pleniere/proces_verbal/2020/07-23/liste_presence/P9_PV(2020)07-23(RCV)_XC.xml", - body=load_fixture("votes/rcv_list_p9-pv(2020)07-23(rcv)_xc.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_p9-pv(2020)07-23(rcv)_xc.xml"), ) scraper = RCVListScraper( @@ -216,7 +215,7 @@ def test_rcv_list_scraper_document_register(responses): def test_rcv_list_scraper_timestamp_from_text(responses): responses.get( "https://www.europarl.europa.eu/doceo/document/PV-9-2019-07-15-RCV_FR.xml", - body=load_fixture("votes/rcv_list_pv-9-2019-07-15-rcv-fr.xml"), + body=load_fixture("scrapers/data/votes/rcv_list_pv-9-2019-07-15-rcv-fr.xml"), ) scraper = RCVListScraper( @@ -234,7 +233,7 @@ def test_rcv_list_scraper_timestamp_from_text(responses): def test_procedure_scraper(responses): responses.get( "https://oeil.secure.europarl.europa.eu/oeil/en/procedure-file?reference=2023/2019(INI)", - body=load_fixture("votes/oeil-procedure-file_2023-2019-ini.html"), + body=load_fixture("scrapers/data/votes/oeil-procedure-file_2023-2019-ini.html"), ) scraper = ProcedureScraper(vote_id=162214, procedure_reference="2023/2019(INI)") @@ -258,7 +257,7 @@ def test_procedure_scraper(responses): def test_procedure_scraper_geo_areas(responses): responses.get( "https://oeil.secure.europarl.europa.eu/oeil/en/procedure-file?reference=2022/2852(RSP)", - body=load_fixture("votes/oeil-procedure-file_2022-2852-rsp.html"), + body=load_fixture("scrapers/data/votes/oeil-procedure-file_2022-2852-rsp.html"), ) scraper = ProcedureScraper(vote_id=149218, procedure_reference="2022/2852(RSP)") @@ -269,7 +268,7 @@ def test_procedure_scraper_geo_areas(responses): def test_procedure_scraper_geo_areas_fuzzy(responses): responses.get( "https://oeil.secure.europarl.europa.eu/oeil/en/procedure-file?reference=2022/2201(INI)", - body=load_fixture("votes/oeil-procedure-file_2022-2201-ini.html"), + body=load_fixture("scrapers/data/votes/oeil-procedure-file_2022-2201-ini.html"), ) scraper = ProcedureScraper(vote_id=155056, procedure_reference="2022/2201(INI)") @@ -280,7 +279,7 @@ def test_procedure_scraper_geo_areas_fuzzy(responses): def test_eurlex_procedure_scraper_eurovoc_concepts(responses): responses.get( "https://eur-lex.europa.eu/procedure/EN/2021_106", - body=load_fixture("votes/eurlex-procedure_2021-106.html"), + body=load_fixture("scrapers/data/votes/eurlex-procedure_2021-106.html"), ) scraper = EurlexProcedureScraper(vote_id=166051, procedure_reference="2021/0106(COD)") @@ -304,7 +303,7 @@ def test_eurlex_procedure_scraper_eurovoc_concepts(responses): def test_eurlex_procedure_scraper_geo_areas(responses): responses.get( "https://eur-lex.europa.eu/procedure/EN/2023_102", - body=load_fixture("votes/eurlex-procedure_2023-102.html"), + body=load_fixture("scrapers/data/votes/eurlex-procedure_2023-102.html"), ) scraper = EurlexProcedureScraper(vote_id=161383, procedure_reference="2023/0102(NLE)") @@ -326,7 +325,7 @@ def test_eurlex_procedure_scraper_geo_areas(responses): def test_eurlex_document_scraper_eurovoc_concepts(responses): responses.get( "https://eur-lex.europa.eu/legal-content/EN/ALL/?uri=EP:P9_A(2021)0270", - body=load_fixture("votes/eurlex-document_p9-a-2021-0270.html"), + body=load_fixture("scrapers/data/votes/eurlex-document_p9-a-2021-0270.html"), ) scraper = EurlexDocumentScraper(vote_id=136238, reference="A9-0270/2021") @@ -348,7 +347,7 @@ def test_eurlex_document_scraper_eurovoc_concepts(responses): def test_eurlex_document_scraper_geo_areas(responses): responses.get( "https://eur-lex.europa.eu/legal-content/EN/ALL/?uri=EP:P9_A(2023)0369", - body=load_fixture("votes/eurlex-document_p9-a-2023-0369.html"), + body=load_fixture("scrapers/data/votes/eurlex-document_p9-a-2023-0369.html"), ) scraper = EurlexDocumentScraper(vote_id=136238, reference="A9-0369/2023") From c28a7e489bfa1341f5dc2d8f5295d69526c99a75 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sat, 7 Dec 2024 17:59:14 +0100 Subject: [PATCH 03/10] Allow passing a checksum of a previous run to `RCVListPipeline` and exit early if data has not changed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This allows us to repeatedly run the pipeline to check if the data has been updated, while stopping the pipeline as soon as possible if it remains unchanged. In case of this pipeline that means we send one request for the RCV list in XML format, but we do not send a requests to fetch pages from EUR-Lex, OEIL, etc. for each of the votes if the RCV list hasn’t changed. --- backend/howtheyvote/pipelines/__init__.py | 3 +- backend/howtheyvote/pipelines/common.py | 4 + backend/howtheyvote/pipelines/rcv_list.py | 24 ++++- backend/howtheyvote/scrapers/common.py | 7 ++ ...cv-list_pv-9-2024-04-24-rcv-fr-evening.xml | 92 +++++++++++++++++++ .../rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml | 83 +++++++++++++++++ backend/tests/pipelines/test_rcv_list.py | 66 ++++++++++++- 7 files changed, 274 insertions(+), 5 deletions(-) create mode 100644 backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml create mode 100644 backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml diff --git a/backend/howtheyvote/pipelines/__init__.py b/backend/howtheyvote/pipelines/__init__.py index f4d6566c4..817c08c5b 100644 --- a/backend/howtheyvote/pipelines/__init__.py +++ b/backend/howtheyvote/pipelines/__init__.py @@ -1,4 +1,4 @@ -from .common import DataUnavailableError, PipelineError +from .common import DataUnavailableError, DataUnchangedError, PipelineError from .members import MembersPipeline from .press import PressPipeline from .rcv_list import RCVListPipeline @@ -7,6 +7,7 @@ __all__ = [ "PipelineError", "DataUnavailableError", + "DataUnchangedError", "RCVListPipeline", "PressPipeline", "MembersPipeline", diff --git a/backend/howtheyvote/pipelines/common.py b/backend/howtheyvote/pipelines/common.py index 26cf91486..ae5505b1e 100644 --- a/backend/howtheyvote/pipelines/common.py +++ b/backend/howtheyvote/pipelines/common.py @@ -4,3 +4,7 @@ class PipelineError(Exception): class DataUnavailableError(PipelineError): pass + + +class DataUnchangedError(PipelineError): + pass diff --git a/backend/howtheyvote/pipelines/rcv_list.py b/backend/howtheyvote/pipelines/rcv_list.py index 9c11ffcff..af7c0f4af 100644 --- a/backend/howtheyvote/pipelines/rcv_list.py +++ b/backend/howtheyvote/pipelines/rcv_list.py @@ -27,7 +27,7 @@ ) from ..sharepics import generate_vote_sharepic from ..store import Aggregator, BulkWriter, index_records, map_vote, map_vote_group -from .common import DataUnavailableError, PipelineError +from .common import DataUnavailableError, DataUnchangedError, PipelineError log = get_logger(__name__) @@ -37,9 +37,16 @@ class RCVListPipeline: extracted votes and scrapes additional information such as data about legislative procedures.""" - def __init__(self, term: int, date: datetime.date): + def __init__( + self, + term: int, + date: datetime.date, + last_run_checksum: str | None = None, + ): self.term = term self.date = date + self.last_run_checksum = last_run_checksum + self.checksum: str | None = None self._vote_ids: set[str] = set() self._vote_group_ids: set[str] = set() self._request_cache: RequestCache = LRUCache(maxsize=25) @@ -106,9 +113,20 @@ def _scrape_rcv_list(self) -> None: date=self.date, active_members=active_members, ) + fragments = scraper.run() + + if ( + self.last_run_checksum is not None + and self.last_run_checksum == scraper.response_checksum + ): + raise DataUnchangedError( + "The data source hasn't changed since the last pipeline run." + ) + + self.checksum = scraper.response_checksum writer = BulkWriter() - writer.add(scraper.run()) + writer.add(fragments) writer.flush() self._vote_ids = writer.get_touched() diff --git a/backend/howtheyvote/scrapers/common.py b/backend/howtheyvote/scrapers/common.py index 3b9e9db92..82b707792 100644 --- a/backend/howtheyvote/scrapers/common.py +++ b/backend/howtheyvote/scrapers/common.py @@ -1,3 +1,4 @@ +import hashlib import html import random import time @@ -94,14 +95,17 @@ def get_url( class BaseScraper(ABC, Generic[ResourceType]): REQUEST_MAX_RETRIES: int = 0 + response_checksum: str | None def __init__(self, request_cache: RequestCache | None = None, **kwargs: Any) -> None: self._request_cache = request_cache self._log = log.bind(scraper=type(self).__name__, **kwargs) + self.response_checksum = None def run(self) -> Any: self._log.info("Running scraper") self._response = self._fetch() + self.response_checksum = self._compute_checksum(self._response) doc = self._parse(self._response) return self._extract_data(doc) @@ -164,6 +168,9 @@ def _headers(self) -> dict[str, str]: "user-agent": random.choice(USER_AGENTS), } + def _compute_checksum(self, response: Response) -> str: + return hashlib.sha256(response.content).hexdigest() + class BeautifulSoupScraper(BaseScraper[BeautifulSoup]): BS_PARSER: str = "lxml" diff --git a/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml new file mode 100644 index 000000000..86b6e13d6 --- /dev/null +++ b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml @@ -0,0 +1,92 @@ + + + + + AVERTISSEMENT + NOTICE + HINWEIS + + + Les corrections et intentions de vote sont mentionnées dans ce document sous les points de vote correspondants. Elles sont publiées pour information uniquement et ne modifient en rien le résultat de vote tel qu’annoncé en plénière. Pendant la session, les demandes de corrections et intentions de vote reçues avant 18h30 sont publiées le jour même. Les demandes ultérieures sont publiées à mesure des mises à jour successives de ce document, pendant une durée maximale de deux semaines. Signification des sigles: + (pour), - (contre), 0 (abstention) + + Corrections to votes and voting intentions appear below in the section relating to the vote concerned. They are published for information purposes only and do not alter the result of the vote as announced in plenary. During the part-session, requests for corrections to votes and voting intentions received before 18.30 will be published the same day. Subsequent requests will be included in this document each time it is updated in the two weeks following the part-session. Key to symbols: + (in favour), - (against), 0 (abstentions) + + In diesem Dokument sind unter den betreffenden Abstimmungspunkten die Berichtigungen des Stimmverhaltens und das beabsichtigte Stimmverhalten aufgeführt. Diese Angaben dienen ausschließlich der Information; keinesfalls wird durch sie das Abstimmungsergebnis geändert, das im Plenum bekannt gegeben wurde. Während der Tagung werden Anträge zu Berichtigungen des Stimmverhaltens und zum beabsichtigten Stimmverhalten, die bis 18.30 Uhr eingehen, am selben Tag veröffentlicht. Später eingehende Anträge werden sukzessive veröffentlicht, indem dieses Dokument während höchstens zwei Wochen regelmäßig aktualisiert wird. Zeichenerklärung: + (dafür), - (dagegen), 0 (Enthaltung) + + + + + ПРОТОКОЛРезултат от поименни гласувания - Приложение 2 + ZÁPISVýsledek jmenovitého hlasování - Příloha 2 + PROTOKOLResultat af afstemningerne ved navneopråb - Bilag 2 + PROTOKOLLErgebnis der namentlichen Abstimmungen - Anlage 2 + ΣΥΝΟΠΤIΚΑ ΠΡΑΚΤIΚΑΑποτέλεσμα των ψηφοφοριών με ονομαστική κλήση - Παράρτηµα 2 + MINUTESResult of roll-call votes - Annex 2 + ACTAResultados de las votaciones nominales - Anexo 2 + PROTOKOLLNimelise hääletuse tulemused - lisa 2 + PÖYTÄKIRJANimenhuutoäänestysten tulokset - Liite 2 + PROCÈS-VERBALRésultat des votes par appel nominal - Annexe 2 + MIONTUAIRISCÍTorthaí na vótála le glaoch rolla - Iarscríbhinn 2 + ZAPISNIKRezultat poimeničnog glasovanja - Prilog 2 + JEGYZŐKÖNYVA név szerinti szavazások eredménye - melléklet 2 + PROCESSO VERBALERisultato delle votazioni per appello nominale - Allegato 2 + PROTOKOLASVardinio balsavimo rezultatai - priedas 2 + PROTOKOLSRezultāti balsošanai pēc saraksta - pielikums 2 + MINUTIRiżultat tal-votazzjoni bis-sejħa tal-ismijiet - Anness 2 + NOTULENUitslag van de hoofdelijke stemmingen - Bijlage 2 + PROTOKÓŁWyniki głosowań imiennych - Załącznik 2 + ATAResultados das votações nominais - Anexo 2 + PROCES-VERBALRezultatul voturilor prin apel nominal - Anexa 2 + ZÁPISNICAVýsledok hlasovania podľa mien - Príloha 2 + ZAPISNIKIzid poimenskega glasovanja - Priloga 2 + PROTOKOLLResultat av omröstningarna med namnupprop - Bilaga 2 + + + A9-0163/2024 - Gabriele Bischoff - Article 10, § 6, alinéa 2 - Am 1 + + + Adamowicz + + + + + C9-0120/2024 - Rejet - Am 13= 23= + + + Adamowicz + + + + + Amendments to Parliament’s Rules of Procedure concerning the training on preventing conflict and harassment in the workplace and on good office management + Good agricultural and environmental condition standards, schemes for climate, environment and animal welfare + + + + BERICHTIGUNGEN DES STIMMVERHALTENS UND BEABSICHTIGTES STIMMVERHALTEN + RÄTTELSER/AVSIKTSFÖRKLARINGAR TILL AVGIVNA RÖSTER + ÄÄNESTYSKÄYTTÄYTYMISTÄ JA ÄÄNESTYSAIKEITA KOSKEVAT ILMOITUKSET + CORREÇÕES E INTENÇÕES DE VOTO + ПОПРАВКИ В ПОДАДЕНИТЕ ГЛАСОВЕ И НАМЕРЕНИЯ ЗА ГЛАСУВАНЕ + KORREZZJONIJIET U INTENZJONIJIET GĦALL-VOT + ΔΙΟΡΘΩΣΕΙΣ ΚΑΙ ΠΡΟΘΕΣΕΙΣ ΨΗΦΟΥ + BALSAVIMO PATAISYMAI IR KETINIMAI + CORRECTIONS TO VOTES AND VOTING INTENTIONS + BALSOJUMU LABOJUMI UN NODOMI BALSOT + IZMJENE DANIH GLASOVA I NAMJERE GLASAČA + CORREZIONI E INTENZIONI DI VOTO + CORRECTIONS ET INTENTIONS DE VOTE + SZAVAZATOK HELYESBÍTÉSEI ÉS SZAVAZÁSI SZÁNDÉKOK + CORRECCIONES E INTENCIONES DE VOTO + HÄÄLETUSE PARANDUSED JA HÄÄLETUSKAVATSUSED + OPRAVY HLASOVÁNÍ A SDĚLENÍ O ÚMYSLU HLASOVAT + OPRAVY HLASOVANIA A ZÁMERY PRI HLASOVANÍ + POPRAVKI IN NAMERE GLASOVANJA + CEARTÚCHÁIN AR AN VÓTA AGUS INTINNÍ VÓTÁLA + KOREKTY GŁOSOWANIA I ZAMIAR GŁOSOWANIA + CORECTĂRI ŞI INTENŢII DE VOT + STEMMERETTELSER OG -INTENTIONER + RECTIFICATIES STEMGEDRAG/ VOORGENOMEN STEMGEDRAG + + + diff --git a/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml new file mode 100644 index 000000000..42f4481af --- /dev/null +++ b/backend/tests/pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml @@ -0,0 +1,83 @@ + + + + + AVERTISSEMENT + NOTICE + HINWEIS + + + Les corrections et intentions de vote sont mentionnées dans ce document sous les points de vote correspondants. Elles sont publiées pour information uniquement et ne modifient en rien le résultat de vote tel qu’annoncé en plénière. Pendant la session, les demandes de corrections et intentions de vote reçues avant 18h30 sont publiées le jour même. Les demandes ultérieures sont publiées à mesure des mises à jour successives de ce document, pendant une durée maximale de deux semaines. Signification des sigles: + (pour), - (contre), 0 (abstention) + + Corrections to votes and voting intentions appear below in the section relating to the vote concerned. They are published for information purposes only and do not alter the result of the vote as announced in plenary. During the part-session, requests for corrections to votes and voting intentions received before 18.30 will be published the same day. Subsequent requests will be included in this document each time it is updated in the two weeks following the part-session. Key to symbols: + (in favour), - (against), 0 (abstentions) + + In diesem Dokument sind unter den betreffenden Abstimmungspunkten die Berichtigungen des Stimmverhaltens und das beabsichtigte Stimmverhalten aufgeführt. Diese Angaben dienen ausschließlich der Information; keinesfalls wird durch sie das Abstimmungsergebnis geändert, das im Plenum bekannt gegeben wurde. Während der Tagung werden Anträge zu Berichtigungen des Stimmverhaltens und zum beabsichtigten Stimmverhalten, die bis 18.30 Uhr eingehen, am selben Tag veröffentlicht. Später eingehende Anträge werden sukzessive veröffentlicht, indem dieses Dokument während höchstens zwei Wochen regelmäßig aktualisiert wird. Zeichenerklärung: + (dafür), - (dagegen), 0 (Enthaltung) + + + + + ПРОТОКОЛРезултат от поименни гласувания - Приложение 2 + ZÁPISVýsledek jmenovitého hlasování - Příloha 2 + PROTOKOLResultat af afstemningerne ved navneopråb - Bilag 2 + PROTOKOLLErgebnis der namentlichen Abstimmungen - Anlage 2 + ΣΥΝΟΠΤIΚΑ ΠΡΑΚΤIΚΑΑποτέλεσμα των ψηφοφοριών με ονομαστική κλήση - Παράρτηµα 2 + MINUTESResult of roll-call votes - Annex 2 + ACTAResultados de las votaciones nominales - Anexo 2 + PROTOKOLLNimelise hääletuse tulemused - lisa 2 + PÖYTÄKIRJANimenhuutoäänestysten tulokset - Liite 2 + PROCÈS-VERBALRésultat des votes par appel nominal - Annexe 2 + MIONTUAIRISCÍTorthaí na vótála le glaoch rolla - Iarscríbhinn 2 + ZAPISNIKRezultat poimeničnog glasovanja - Prilog 2 + JEGYZŐKÖNYVA név szerinti szavazások eredménye - melléklet 2 + PROCESSO VERBALERisultato delle votazioni per appello nominale - Allegato 2 + PROTOKOLASVardinio balsavimo rezultatai - priedas 2 + PROTOKOLSRezultāti balsošanai pēc saraksta - pielikums 2 + MINUTIRiżultat tal-votazzjoni bis-sejħa tal-ismijiet - Anness 2 + NOTULENUitslag van de hoofdelijke stemmingen - Bijlage 2 + PROTOKÓŁWyniki głosowań imiennych - Załącznik 2 + ATAResultados das votações nominais - Anexo 2 + PROCES-VERBALRezultatul voturilor prin apel nominal - Anexa 2 + ZÁPISNICAVýsledok hlasovania podľa mien - Príloha 2 + ZAPISNIKIzid poimenskega glasovanja - Priloga 2 + PROTOKOLLResultat av omröstningarna med namnupprop - Bilaga 2 + + + A9-0163/2024 - Gabriele Bischoff - Article 10, § 6, alinéa 2 - Am 1 + + + Adamowicz + + + + + Amendments to Parliament’s Rules of Procedure concerning the training on preventing conflict and harassment in the workplace and on good office management + + + + BERICHTIGUNGEN DES STIMMVERHALTENS UND BEABSICHTIGTES STIMMVERHALTEN + RÄTTELSER/AVSIKTSFÖRKLARINGAR TILL AVGIVNA RÖSTER + ÄÄNESTYSKÄYTTÄYTYMISTÄ JA ÄÄNESTYSAIKEITA KOSKEVAT ILMOITUKSET + CORREÇÕES E INTENÇÕES DE VOTO + ПОПРАВКИ В ПОДАДЕНИТЕ ГЛАСОВЕ И НАМЕРЕНИЯ ЗА ГЛАСУВАНЕ + KORREZZJONIJIET U INTENZJONIJIET GĦALL-VOT + ΔΙΟΡΘΩΣΕΙΣ ΚΑΙ ΠΡΟΘΕΣΕΙΣ ΨΗΦΟΥ + BALSAVIMO PATAISYMAI IR KETINIMAI + CORRECTIONS TO VOTES AND VOTING INTENTIONS + BALSOJUMU LABOJUMI UN NODOMI BALSOT + IZMJENE DANIH GLASOVA I NAMJERE GLASAČA + CORREZIONI E INTENZIONI DI VOTO + CORRECTIONS ET INTENTIONS DE VOTE + SZAVAZATOK HELYESBÍTÉSEI ÉS SZAVAZÁSI SZÁNDÉKOK + CORRECCIONES E INTENCIONES DE VOTO + HÄÄLETUSE PARANDUSED JA HÄÄLETUSKAVATSUSED + OPRAVY HLASOVÁNÍ A SDĚLENÍ O ÚMYSLU HLASOVAT + OPRAVY HLASOVANIA A ZÁMERY PRI HLASOVANÍ + POPRAVKI IN NAMERE GLASOVANJA + CEARTÚCHÁIN AR AN VÓTA AGUS INTINNÍ VÓTÁLA + KOREKTY GŁOSOWANIA I ZAMIAR GŁOSOWANIA + CORECTĂRI ŞI INTENŢII DE VOT + STEMMERETTELSER OG -INTENTIONER + RECTIFICATIES STEMGEDRAG/ VOORGENOMEN STEMGEDRAG + + + diff --git a/backend/tests/pipelines/test_rcv_list.py b/backend/tests/pipelines/test_rcv_list.py index 371e1fa9d..bdeb24744 100644 --- a/backend/tests/pipelines/test_rcv_list.py +++ b/backend/tests/pipelines/test_rcv_list.py @@ -1,8 +1,12 @@ import datetime import pytest +from sqlalchemy import select -from howtheyvote.pipelines import DataUnavailableError, RCVListPipeline +from howtheyvote.models import Group, GroupMembership, Member, Vote +from howtheyvote.pipelines import DataUnavailableError, DataUnchangedError, RCVListPipeline + +from ..helpers import load_fixture @pytest.mark.always_mock_requests @@ -10,3 +14,63 @@ def test_run_source_not_available(responses, db_session): with pytest.raises(DataUnavailableError): pipe = RCVListPipeline(term=9, date=datetime.date(2024, 4, 10)) pipe.run() + + +def test_run_data_unchanged(responses, db_session): + responses.get( + "https://www.europarl.europa.eu/doceo/document/PV-9-2024-04-24-RCV_FR.xml", + body=load_fixture("pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-noon.xml"), + ) + + member = Member( + id=197490, + first_name="Magdalena", + last_name="ADAMOWICZ", + group_memberships=[ + GroupMembership( + term=9, + start_date=datetime.datetime(2019, 7, 2), + end_date=datetime.datetime(2024, 7, 15), + group=Group["EPP"], + ), + ], + ) + db_session.add(member) + db_session.commit() + + # Run the pipeline for the first time + pipe = RCVListPipeline( + term=9, + date=datetime.date(2024, 4, 24), + ) + pipe.run() + last_run_checksum = pipe.checksum + + vote_ids = list(db_session.execute(select(Vote.id)).scalars()) + assert vote_ids == [168834] + + # Run the pipeline again and provide the checksum of the first run + with pytest.raises(DataUnchangedError): + pipe = RCVListPipeline( + term=9, + date=datetime.date(2024, 4, 24), + last_run_checksum=last_run_checksum, + ) + pipe.run() + + # Simulate that the source data has been updated in the meantime + responses.get( + "https://www.europarl.europa.eu/doceo/document/PV-9-2024-04-24-RCV_FR.xml", + body=load_fixture("pipelines/data/rcv-list_pv-9-2024-04-24-rcv-fr-evening.xml"), + ) + + # Run the pipeline again and provide the checksum of the first run + pipe = RCVListPipeline( + term=9, + date=datetime.date(2024, 4, 24), + last_run_checksum=last_run_checksum, + ) + pipe.run() + + vote_ids = list(db_session.execute(select(Vote.id)).scalars()) + assert vote_ids == [168834, 168864] From 67c23540d059b2b7adaa97b9ddc83fdf3c99e934 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sat, 7 Dec 2024 18:41:02 +0100 Subject: [PATCH 04/10] Add common base class for all pipelines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is analogous to the `BaseScraper` class for scrapers. In contrast to the scrapers, pipelines don’t share a lot of code (except for some error handling and logging), so there wasn’t really a need to add this before. However, having a common base class is also helpful for typing. --- backend/howtheyvote/pipelines/__init__.py | 3 ++- backend/howtheyvote/pipelines/common.py | 32 +++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/backend/howtheyvote/pipelines/__init__.py b/backend/howtheyvote/pipelines/__init__.py index 817c08c5b..68d8b3c57 100644 --- a/backend/howtheyvote/pipelines/__init__.py +++ b/backend/howtheyvote/pipelines/__init__.py @@ -1,10 +1,11 @@ -from .common import DataUnavailableError, DataUnchangedError, PipelineError +from .common import BasePipeline, DataUnavailableError, DataUnchangedError, PipelineError from .members import MembersPipeline from .press import PressPipeline from .rcv_list import RCVListPipeline from .sessions import SessionsPipeline __all__ = [ + "BasePipeline", "PipelineError", "DataUnavailableError", "DataUnchangedError", diff --git a/backend/howtheyvote/pipelines/common.py b/backend/howtheyvote/pipelines/common.py index ae5505b1e..ced947541 100644 --- a/backend/howtheyvote/pipelines/common.py +++ b/backend/howtheyvote/pipelines/common.py @@ -1,3 +1,13 @@ +from abc import ABC, abstractmethod +from typing import Any + +from structlog import get_logger + +from ..scrapers import ScrapingError + +log = get_logger(__name__) + + class PipelineError(Exception): pass @@ -8,3 +18,25 @@ class DataUnavailableError(PipelineError): class DataUnchangedError(PipelineError): pass + + +class BasePipeline(ABC): + last_run_checksum: str | None + checksum: str | None + + def __init__(self, last_run_checksum: str | None = None, **kwargs: Any) -> None: + self.last_run_checksum = last_run_checksum + self.checksum = None + self._log = log.bind(pipeline=type(self).__name__, **kwargs) + + def run(self) -> None: + self._log.info("Running pipeline") + + try: + self._run() + except ScrapingError: + self._log.exception("Failed running pipeline") + + @abstractmethod + def _run(self) -> None: + raise NotImplementedError From 6784181762641118a991ce94be9693158d4545c4 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sat, 7 Dec 2024 18:49:10 +0100 Subject: [PATCH 05/10] Refactor pipelines to use `BasePipeline` --- backend/howtheyvote/pipelines/members.py | 29 +++------ backend/howtheyvote/pipelines/press.py | 35 ++++------- backend/howtheyvote/pipelines/rcv_list.py | 71 +++++++++-------------- backend/howtheyvote/pipelines/sessions.py | 17 +++--- 4 files changed, 53 insertions(+), 99 deletions(-) diff --git a/backend/howtheyvote/pipelines/members.py b/backend/howtheyvote/pipelines/members.py index e1c4d6f4d..5570426c6 100644 --- a/backend/howtheyvote/pipelines/members.py +++ b/backend/howtheyvote/pipelines/members.py @@ -13,34 +13,23 @@ ScrapingError, ) from ..store import Aggregator, BulkWriter, index_records, map_member +from .common import BasePipeline log = get_logger(__name__) -class MembersPipeline: +class MembersPipeline(BasePipeline): def __init__(self, term: int): + super().__init__(term=term) self.term = term self._member_ids: set[str] = set() - def run(self) -> None: - log.info( - "Running pipeline", - name=type(self).__name__, - term=self.term, - ) - - try: - self._scrape_members() - self._scrape_member_groups() - self._scrape_member_infos() - self._download_member_photos() - self._index_members() - except ScrapingError: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - term=self.term, - ) + def _run(self) -> None: + self._scrape_members() + self._scrape_member_groups() + self._scrape_member_infos() + self._download_member_photos() + self._index_members() def _scrape_members(self) -> None: log.info("Scraping RCV lists", term=self.term) diff --git a/backend/howtheyvote/pipelines/press.py b/backend/howtheyvote/pipelines/press.py index e1f1dccea..ceecad203 100644 --- a/backend/howtheyvote/pipelines/press.py +++ b/backend/howtheyvote/pipelines/press.py @@ -15,11 +15,12 @@ ScrapingError, ) from ..store import Aggregator, BulkWriter, index_records, map_press_release, map_vote +from .common import BasePipeline log = get_logger(__name__) -class PressPipeline: +class PressPipeline(BasePipeline): # At the time we introduced this constant, the value covered roughly one term. However, # this obviously depends on the amount of press releases published and might need to be # adjusted or made configurable in the future. @@ -30,35 +31,21 @@ def __init__( date: datetime.date | None = None, with_rss: bool | None = False, ): + super().__init__(date=date, with_rss=with_rss) self.date = date self.with_rss = with_rss self._release_ids: set[str] = set() self._vote_ids: set[str] = set() - def run(self) -> None: - log.info( - "Running pipeline", - name=type(self).__name__, - date=self.date, - with_rss=self.with_rss, - ) + def _run(self) -> None: + if self.with_rss: + self._scrape_press_releases_rss() - try: - if self.with_rss: - self._scrape_press_releases_rss() - - self._scrape_press_releases_index() - self._scrape_press_releases() - self._analyze_featured_votes() - self._index_press_releases() - self._index_votes() - except ScrapingError: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - date=self.date, - with_rss=self.with_rss, - ) + self._scrape_press_releases_index() + self._scrape_press_releases() + self._analyze_featured_votes() + self._index_press_releases() + self._index_votes() def _scrape_press_releases_rss(self) -> None: log.info("Fetching press releases from RSS", date=self.date) diff --git a/backend/howtheyvote/pipelines/rcv_list.py b/backend/howtheyvote/pipelines/rcv_list.py index af7c0f4af..f2e942f63 100644 --- a/backend/howtheyvote/pipelines/rcv_list.py +++ b/backend/howtheyvote/pipelines/rcv_list.py @@ -27,12 +27,12 @@ ) from ..sharepics import generate_vote_sharepic from ..store import Aggregator, BulkWriter, index_records, map_vote, map_vote_group -from .common import DataUnavailableError, DataUnchangedError, PipelineError +from .common import BasePipeline, DataUnavailableError, DataUnchangedError log = get_logger(__name__) -class RCVListPipeline: +class RCVListPipeline(BasePipeline): """Scrapes the RCV vote results for a single day, then runs analysis on the extracted votes and scrapes additional information such as data about legislative procedures.""" @@ -43,6 +43,7 @@ def __init__( date: datetime.date, last_run_checksum: str | None = None, ): + super().__init__(term=term, date=date, last_run_checksum=last_run_checksum) self.term = term self.date = date self.last_run_checksum = last_run_checksum @@ -51,48 +52,24 @@ def __init__( self._vote_group_ids: set[str] = set() self._request_cache: RequestCache = LRUCache(maxsize=25) - def run(self) -> None: - log.info( - "Running pipeline", - name=type(self).__name__, - term=self.term, - date=self.date, - ) - - try: - self._scrape_rcv_list() - self._scrape_documents() - self._scrape_eurlex_documents() - self._scrape_procedures() - self._scrape_eurlex_procedures() - self._analyze_main_votes() - self._analyze_vote_groups() - self._analyze_vote_data_issues() - self._index_votes() - - # Share pictures have to be generated after the votes are indexed. Otherwise, - # rendering the share pictures fails as data about new votes hasn’t yet been - # written to the database. - self._generate_vote_sharepics() - - self._analyze_vote_groups_data_issues() - self._index_vote_groups() - except NoWorkingUrlError as exc: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - term=self.term, - date=self.date, - ) - raise DataUnavailableError("Pipeline data source is not available") from exc - except ScrapingError as exc: - log.exception( - "Failed running pipeline", - name=type(self).__name__, - term=self.term, - date=self.date, - ) - raise PipelineError("Failed running pipeline") from exc + def _run(self) -> None: + self._scrape_rcv_list() + self._scrape_documents() + self._scrape_eurlex_documents() + self._scrape_procedures() + self._scrape_eurlex_procedures() + self._analyze_main_votes() + self._analyze_vote_groups() + self._analyze_vote_data_issues() + self._index_votes() + + # Share pictures have to be generated after the votes are indexed. Otherwise, + # rendering the share pictures fails as data about new votes hasn’t yet been + # written to the database. + self._generate_vote_sharepics() + + self._analyze_vote_groups_data_issues() + self._index_vote_groups() def _scrape_rcv_list(self) -> None: log.info("Fetching active members", date=self.date) @@ -113,7 +90,11 @@ def _scrape_rcv_list(self) -> None: date=self.date, active_members=active_members, ) - fragments = scraper.run() + + try: + fragments = scraper.run() + except NoWorkingUrlError as exc: + raise DataUnavailableError("Pipeline data source is not available") from exc if ( self.last_run_checksum is not None diff --git a/backend/howtheyvote/pipelines/sessions.py b/backend/howtheyvote/pipelines/sessions.py index 32459267f..d677e96a8 100644 --- a/backend/howtheyvote/pipelines/sessions.py +++ b/backend/howtheyvote/pipelines/sessions.py @@ -5,24 +5,21 @@ from ..models import PlenarySession from ..scrapers import CalendarSessionsScraper, ODPSessionScraper, ScrapingError from ..store import Aggregator, BulkWriter, index_records, map_plenary_session +from .common import BasePipeline log = get_logger(__name__) -class SessionsPipeline: +class SessionsPipeline(BasePipeline): def __init__(self, term: int): + super().__init__(term=term) self.term = term self._session_ids: set[str] = set() - def run(self) -> None: - log.info("Running pipeline", name=type(self).__name__, term=self.term) - - try: - self._scrape_sessions() - self._scrape_session_locations() - self._index_sessions() - except ScrapingError: - log.exception("Failed running pipeline", name=type(self).__name__, term=self.term) + def _run(self) -> None: + self._scrape_sessions() + self._scrape_session_locations() + self._index_sessions() def _scrape_sessions(self) -> None: log.info("Scrapping plenary sessions", term=self.term) From 31433f064297cd31208c99b8bd4da304c51f9c89 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sun, 8 Dec 2024 13:00:02 +0100 Subject: [PATCH 06/10] Rename `PipelineRunResult` to `PipelineStatus` In preparation for changes in the next commit to avoid ambiguous naming --- ...6b18c4f6_rename_result_column_to_status.py | 23 +++++++++++++++++++ backend/howtheyvote/models/__init__.py | 4 ++-- backend/howtheyvote/models/common.py | 5 ++-- backend/howtheyvote/worker/__init__.py | 2 +- backend/howtheyvote/worker/worker.py | 22 +++++++++--------- backend/tests/worker/test_worker.py | 14 +++++------ 6 files changed, 47 insertions(+), 23 deletions(-) create mode 100644 backend/howtheyvote/alembic/versions/1f516b18c4f6_rename_result_column_to_status.py diff --git a/backend/howtheyvote/alembic/versions/1f516b18c4f6_rename_result_column_to_status.py b/backend/howtheyvote/alembic/versions/1f516b18c4f6_rename_result_column_to_status.py new file mode 100644 index 000000000..b4cbe85a9 --- /dev/null +++ b/backend/howtheyvote/alembic/versions/1f516b18c4f6_rename_result_column_to_status.py @@ -0,0 +1,23 @@ +"""Rename result column to status + +Revision ID: 1f516b18c4f6 +Revises: 9b35d19b64c4 +Create Date: 2024-12-08 11:25:26.051408 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "1f516b18c4f6" +down_revision = "9b35d19b64c4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.alter_column("pipeline_runs", column_name="result", new_column_name="status") + + +def downgrade() -> None: + op.alter_column("pipeline_runs", column_name="status", new_column_name="result") diff --git a/backend/howtheyvote/models/__init__.py b/backend/howtheyvote/models/__init__.py index 0aa3a2d89..75ece9a2d 100644 --- a/backend/howtheyvote/models/__init__.py +++ b/backend/howtheyvote/models/__init__.py @@ -1,4 +1,4 @@ -from .common import Base, BaseWithId, DataIssue, Fragment, PipelineRun, PipelineRunResult +from .common import Base, BaseWithId, DataIssue, Fragment, PipelineRun, PipelineStatus from .country import Country, CountryType from .eurovoc import EurovocConcept, EurovocConceptType from .group import Group @@ -24,7 +24,7 @@ "BaseWithId", "Fragment", "PipelineRun", - "PipelineRunResult", + "PipelineStatus", "DataIssue", "Country", "CountryType", diff --git a/backend/howtheyvote/models/common.py b/backend/howtheyvote/models/common.py index ef084442d..ecc40c932 100644 --- a/backend/howtheyvote/models/common.py +++ b/backend/howtheyvote/models/common.py @@ -36,10 +36,11 @@ class DataIssue(Enum): VOTE_GROUP_NO_MAIN_VOTE = "VOTE_GROUP_NO_MAIN_VOTE" -class PipelineRunResult(Enum): +class PipelineStatus(Enum): SUCCESS = "SUCCESS" FAILURE = "FAILURE" DATA_UNAVAILABLE = "DATA_UNAVAILABLE" + DATA_UNCHANGED = "DATA_UNCHANGED" class PipelineRun(Base): @@ -49,4 +50,4 @@ class PipelineRun(Base): started_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime) finished_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime) pipeline: Mapped[str] = mapped_column(sa.Unicode) - result: Mapped[PipelineRunResult] = mapped_column(sa.Enum(PipelineRunResult)) + status: Mapped[PipelineStatus] = mapped_column(sa.Enum(PipelineStatus)) diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index 36ac5f28a..e88e3041a 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -9,7 +9,7 @@ from ..db import Session from ..export import generate_export from ..files import file_path -from ..models import PipelineRun, PipelineRunResult, PlenarySession +from ..models import PipelineRun, PlenarySession from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline from ..query import session_is_current_at from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index 91667c618..e093dba13 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -14,7 +14,7 @@ from .. import config from ..db import Session -from ..models import PipelineRun, PipelineRunResult +from ..models import PipelineRun, PipelineStatus from ..pipelines import DataUnavailableError log = get_logger(__name__) @@ -26,13 +26,13 @@ PIPELINE_RUN_DURATION = Histogram( "htv_worker_pipeline_run_duration_seconds", "Duration of pipeline runs executed by the worker", - ["pipeline", "result"], + ["pipeline", "status"], ) PIPELINE_RUNS = Counter( "htv_worker_pipeline_runs_total", "Total number of pipeline runs executed by the worker", - ["pipeline", "result"], + ["pipeline", "status"], ) PIPELINE_NEXT_RUN = Gauge( @@ -70,7 +70,7 @@ def pipeline_ran_successfully( .select_from(PipelineRun) .where(PipelineRun.pipeline == pipeline.__name__) .where(func.date(PipelineRun.started_at) == func.date(date)) - .where(PipelineRun.result == PipelineRunResult.SUCCESS) + .where(PipelineRun.status == PipelineStatus.SUCCESS) ) result = Session.execute(query).scalar() or 0 @@ -79,7 +79,7 @@ def pipeline_ran_successfully( class Worker: """Running a worker starts a long-running process that executes data pipelines in regular - intervals and stores the result of the pipeline runs in the database.""" + intervals and stores the status of the pipeline runs in the database.""" def __init__(self) -> None: self._scheduler = Scheduler() @@ -165,19 +165,19 @@ def wrapped_handler() -> None: try: handler() - result = PipelineRunResult.SUCCESS + status = PipelineStatus.SUCCESS except SkipPipelineError: # Do not log skipped pipeline runs return except DataUnavailableError: - result = PipelineRunResult.DATA_UNAVAILABLE + status = PipelineStatus.DATA_UNAVAILABLE except Exception: - result = PipelineRunResult.FAILURE + status = PipelineStatus.FAILURE duration = time.time() - start_time finished_at = datetime.datetime.now(datetime.UTC) - labels = {"pipeline": name, "result": result.value} + labels = {"pipeline": name, "status": status.value} PIPELINE_RUNS.labels(**labels).inc() PIPELINE_RUN_DURATION.labels(**labels).observe(duration) @@ -185,7 +185,7 @@ def wrapped_handler() -> None: pipeline=name, started_at=started_at, finished_at=finished_at, - result=result.value, + status=status, ) Session.add(run) @@ -198,7 +198,7 @@ def wrapped_handler() -> None: started_at=started_at.isoformat(), finished_at=finished_at.isoformat(), duration=duration, - result=result.value, + status=status.value, ) Session.remove() diff --git a/backend/tests/worker/test_worker.py b/backend/tests/worker/test_worker.py index 61b54319a..cd994760a 100644 --- a/backend/tests/worker/test_worker.py +++ b/backend/tests/worker/test_worker.py @@ -3,7 +3,7 @@ import time_machine from sqlalchemy import select -from howtheyvote.models import PipelineRun, PipelineRunResult +from howtheyvote.models import PipelineRun, PipelineStatus from howtheyvote.pipelines import DataUnavailableError, PipelineError from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully @@ -133,7 +133,7 @@ def test_worker_schedule_pipeline_log_runs(db_session): run = runs[0] assert run.pipeline == "test" - assert run.result == PipelineRunResult.SUCCESS + assert run.status == PipelineStatus.SUCCESS assert run.started_at.date() == datetime.date(2024, 1, 1) assert run.finished_at.date() == datetime.date(2024, 1, 1) @@ -166,10 +166,10 @@ def pipeline_error(): assert len(runs) == 2 assert runs[0].pipeline == "data_unavailable_error" - assert runs[0].result == PipelineRunResult.DATA_UNAVAILABLE + assert runs[0].status == PipelineStatus.DATA_UNAVAILABLE assert runs[1].pipeline == "pipeline_error" - assert runs[1].result == PipelineRunResult.FAILURE + assert runs[1].status == PipelineStatus.FAILURE def test_pipeline_ran_successfully(db_session): @@ -183,7 +183,7 @@ class TestPipeline: started_at=now, finished_at=now, pipeline=TestPipeline.__name__, - result=PipelineRunResult.FAILURE, + status=PipelineStatus.FAILURE, ) db_session.add(run) db_session.commit() @@ -194,7 +194,7 @@ class TestPipeline: started_at=now, finished_at=now, pipeline=TestPipeline.__name__, - result=PipelineRunResult.SUCCESS, + status=PipelineStatus.SUCCESS, ) db_session.add(run) db_session.commit() @@ -206,7 +206,7 @@ class TestPipeline: started_at=now, finished_at=now, pipeline=TestPipeline.__name__, - result=PipelineRunResult.SUCCESS, + status=PipelineStatus.SUCCESS, ) db_session.add(run) db_session.commit() From 910832a83a46b36c609363f12535851d4752982b Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sun, 8 Dec 2024 14:15:34 +0100 Subject: [PATCH 07/10] Move error handling from worker to pipeline base class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Right now, the worker’s `schedule_pipeline` method also handles all the exceptions that might be raised during pipeline execution, e.g. `DataUnavailableError`. This has worked fine so far, but it’s getting a little difficult to test now that we also want to store the checksum of a pipeline run. So instead, pipelines now return a `PipelineResult`, which includes information about the status (i.e. whether the run was successful or failed because the source data was unavailable, etc.) and `schedule_pipeline` doesn’t need to handle the different ways a pipeline could fail. --- backend/howtheyvote/pipelines/__init__.py | 9 +++- backend/howtheyvote/pipelines/common.py | 21 +++++++- backend/howtheyvote/worker/__init__.py | 28 ++++++----- backend/howtheyvote/worker/worker.py | 11 ++--- backend/tests/pipelines/test_rcv_list.py | 40 +++++++++------ backend/tests/worker/test_worker.py | 59 +++++++++++++++++------ 6 files changed, 117 insertions(+), 51 deletions(-) diff --git a/backend/howtheyvote/pipelines/__init__.py b/backend/howtheyvote/pipelines/__init__.py index 68d8b3c57..20f776135 100644 --- a/backend/howtheyvote/pipelines/__init__.py +++ b/backend/howtheyvote/pipelines/__init__.py @@ -1,4 +1,10 @@ -from .common import BasePipeline, DataUnavailableError, DataUnchangedError, PipelineError +from .common import ( + BasePipeline, + DataUnavailableError, + DataUnchangedError, + PipelineError, + PipelineResult, +) from .members import MembersPipeline from .press import PressPipeline from .rcv_list import RCVListPipeline @@ -6,6 +12,7 @@ __all__ = [ "BasePipeline", + "PipelineResult", "PipelineError", "DataUnavailableError", "DataUnchangedError", diff --git a/backend/howtheyvote/pipelines/common.py b/backend/howtheyvote/pipelines/common.py index ced947541..774eb1b0f 100644 --- a/backend/howtheyvote/pipelines/common.py +++ b/backend/howtheyvote/pipelines/common.py @@ -1,13 +1,21 @@ from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import Any from structlog import get_logger +from ..models import PipelineStatus from ..scrapers import ScrapingError log = get_logger(__name__) +@dataclass +class PipelineResult: + status: PipelineStatus + checksum: str | None + + class PipelineError(Exception): pass @@ -29,14 +37,25 @@ def __init__(self, last_run_checksum: str | None = None, **kwargs: Any) -> None: self.checksum = None self._log = log.bind(pipeline=type(self).__name__, **kwargs) - def run(self) -> None: + def run(self) -> PipelineResult: self._log.info("Running pipeline") try: self._run() + status = PipelineStatus.SUCCESS + except DataUnavailableError: + status = PipelineStatus.DATA_UNAVAILABLE + except DataUnchangedError: + status = PipelineStatus.DATA_UNCHANGED except ScrapingError: + status = PipelineStatus.FAILURE self._log.exception("Failed running pipeline") + return PipelineResult( + status=status, + checksum=self.checksum, + ) + @abstractmethod def _run(self) -> None: raise NotImplementedError diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index e88e3041a..955219f7f 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -10,14 +10,20 @@ from ..export import generate_export from ..files import file_path from ..models import PipelineRun, PlenarySession -from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline +from ..pipelines import ( + MembersPipeline, + PipelineResult, + PressPipeline, + RCVListPipeline, + SessionsPipeline, +) from ..query import session_is_current_at from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully log = get_logger(__name__) -def op_rcv_midday() -> None: +def op_rcv_midday() -> PipelineResult: """Checks if there is a current plenary session and, if yes, fetches the latest roll-call vote results.""" today = datetime.date.today() @@ -29,10 +35,10 @@ def op_rcv_midday() -> None: raise SkipPipelineError() pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) - pipeline.run() + return pipeline.run() -def op_rcv_evening() -> None: +def op_rcv_evening() -> PipelineResult: """While on most plenary days, there’s only one voting session around midday, on some days there is another sesssion in the evening, usually around 17:00. The vote results of the evening sessions are appended to the same source document that also contains the results @@ -47,10 +53,10 @@ def op_rcv_evening() -> None: raise SkipPipelineError() pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) - pipeline.run() + return pipeline.run() -def op_press() -> None: +def op_press() -> PipelineResult: """Checks if there is a current plenary session and, if yes, fetches the latest press releases from the Parliament’s news hub.""" today = datetime.date.today() @@ -59,19 +65,19 @@ def op_press() -> None: raise SkipPipelineError() pipeline = PressPipeline(date=today, with_rss=True) - pipeline.run() + return pipeline.run() -def op_sessions() -> None: +def op_sessions() -> PipelineResult: """Fetches plenary session dates.""" pipeline = SessionsPipeline(term=config.CURRENT_TERM) - pipeline.run() + return pipeline.run() -def op_members() -> None: +def op_members() -> PipelineResult: """Fetches information about all members of the current term.""" pipeline = MembersPipeline(term=config.CURRENT_TERM) - pipeline.run() + return pipeline.run() EXPORT_LAST_RUN = Gauge( diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index e093dba13..b34fa6454 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -15,7 +15,7 @@ from .. import config from ..db import Session from ..models import PipelineRun, PipelineStatus -from ..pipelines import DataUnavailableError +from ..pipelines import PipelineResult log = get_logger(__name__) @@ -150,7 +150,7 @@ def schedule( def schedule_pipeline( self, - handler: Handler, + handler: Callable[..., PipelineResult], name: str, weekdays: Iterable[Weekday] = set(Weekday), hours: Iterable[int] = {0}, @@ -164,15 +164,14 @@ def wrapped_handler() -> None: started_at = datetime.datetime.now(datetime.UTC) try: - handler() - status = PipelineStatus.SUCCESS + result = handler() + status = result.status except SkipPipelineError: # Do not log skipped pipeline runs return - except DataUnavailableError: - status = PipelineStatus.DATA_UNAVAILABLE except Exception: status = PipelineStatus.FAILURE + log.exception("Unhandled exception during pipeline run", pipeline=name) duration = time.time() - start_time finished_at = datetime.datetime.now(datetime.UTC) diff --git a/backend/tests/pipelines/test_rcv_list.py b/backend/tests/pipelines/test_rcv_list.py index bdeb24744..55e78358a 100644 --- a/backend/tests/pipelines/test_rcv_list.py +++ b/backend/tests/pipelines/test_rcv_list.py @@ -3,17 +3,17 @@ import pytest from sqlalchemy import select -from howtheyvote.models import Group, GroupMembership, Member, Vote -from howtheyvote.pipelines import DataUnavailableError, DataUnchangedError, RCVListPipeline +from howtheyvote.models import Group, GroupMembership, Member, PipelineStatus, Vote +from howtheyvote.pipelines import RCVListPipeline from ..helpers import load_fixture @pytest.mark.always_mock_requests def test_run_source_not_available(responses, db_session): - with pytest.raises(DataUnavailableError): - pipe = RCVListPipeline(term=9, date=datetime.date(2024, 4, 10)) - pipe.run() + pipe = RCVListPipeline(term=9, date=datetime.date(2024, 4, 10)) + result = pipe.run() + assert result.status == PipelineStatus.DATA_UNAVAILABLE def test_run_data_unchanged(responses, db_session): @@ -43,20 +43,24 @@ def test_run_data_unchanged(responses, db_session): term=9, date=datetime.date(2024, 4, 24), ) - pipe.run() - last_run_checksum = pipe.checksum + result = pipe.run() + assert result.status == PipelineStatus.SUCCESS + assert ( + result.checksum == "c01379e8e00e9d8e60c71eebf90941c3318be9751a911d4f08b24aa9d0be26af" + ) vote_ids = list(db_session.execute(select(Vote.id)).scalars()) assert vote_ids == [168834] # Run the pipeline again and provide the checksum of the first run - with pytest.raises(DataUnchangedError): - pipe = RCVListPipeline( - term=9, - date=datetime.date(2024, 4, 24), - last_run_checksum=last_run_checksum, - ) - pipe.run() + pipe = RCVListPipeline( + term=9, + date=datetime.date(2024, 4, 24), + last_run_checksum="c01379e8e00e9d8e60c71eebf90941c3318be9751a911d4f08b24aa9d0be26af", + ) + result = pipe.run() + assert result.status == PipelineStatus.DATA_UNCHANGED + assert result.checksum is None # Simulate that the source data has been updated in the meantime responses.get( @@ -68,9 +72,13 @@ def test_run_data_unchanged(responses, db_session): pipe = RCVListPipeline( term=9, date=datetime.date(2024, 4, 24), - last_run_checksum=last_run_checksum, + last_run_checksum="c01379e8e00e9d8e60c71eebf90941c3318be9751a911d4f08b24aa9d0be26af", + ) + result = pipe.run() + assert result.status == PipelineStatus.SUCCESS + assert ( + result.checksum == "743bf734045d7c797afea9e8c1127c047a4924bcd5090883ff8a74421376d511" ) - pipe.run() vote_ids = list(db_session.execute(select(Vote.id)).scalars()) assert vote_ids == [168834, 168864] diff --git a/backend/tests/worker/test_worker.py b/backend/tests/worker/test_worker.py index cd994760a..2d833b43b 100644 --- a/backend/tests/worker/test_worker.py +++ b/backend/tests/worker/test_worker.py @@ -4,7 +4,7 @@ from sqlalchemy import select from howtheyvote.models import PipelineRun, PipelineStatus -from howtheyvote.pipelines import DataUnavailableError, PipelineError +from howtheyvote.pipelines import PipelineResult from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully @@ -114,19 +114,22 @@ def test_worker_schedule_timezone_dst(db_session): def test_worker_schedule_pipeline_log_runs(db_session): worker = Worker() - handler = get_handler() + + def pipeline_handler(): + return PipelineResult( + status=PipelineStatus.SUCCESS, + checksum=None, + ) with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): - worker.schedule_pipeline(handler, name="test", hours={10}) + worker.schedule_pipeline(pipeline_handler, name="test", hours={10}) worker.run_pending() - assert handler.calls == 0 runs = list(db_session.execute(select(PipelineRun)).scalars()) assert len(runs) == 0 with time_machine.travel(datetime.datetime(2024, 1, 1, 10, 0)): worker.run_pending() - assert handler.calls == 1 runs = list(db_session.execute(select(PipelineRun)).scalars()) assert len(runs) == 1 @@ -138,24 +141,30 @@ def test_worker_schedule_pipeline_log_runs(db_session): assert run.finished_at.date() == datetime.date(2024, 1, 1) -def test_worker_schedule_pipeline_log_runs_exceptions(db_session): +def test_worker_schedule_pipeline_log_runs_status(db_session): worker = Worker() - def data_unavailable_error(): - raise DataUnavailableError() + def data_unavailable(): + return PipelineResult( + status=PipelineStatus.DATA_UNAVAILABLE, + checksum=None, + ) - def pipeline_error(): - raise PipelineError() + def failure(): + return PipelineResult( + status=PipelineStatus.FAILURE, + checksum=None, + ) with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): worker.schedule_pipeline( - data_unavailable_error, - name="data_unavailable_error", + data_unavailable, + name="data_unavailable", hours={10}, ) worker.schedule_pipeline( - pipeline_error, - name="pipeline_error", + failure, + name="failure", hours={10}, ) @@ -165,13 +174,31 @@ def pipeline_error(): runs = list(db_session.execute(select(PipelineRun)).scalars()) assert len(runs) == 2 - assert runs[0].pipeline == "data_unavailable_error" + assert runs[0].pipeline == "data_unavailable" assert runs[0].status == PipelineStatus.DATA_UNAVAILABLE - assert runs[1].pipeline == "pipeline_error" + assert runs[1].pipeline == "failure" assert runs[1].status == PipelineStatus.FAILURE +def test_worker_schedule_pipeline_unhandled_exceptions(db_session): + worker = Worker() + + def woops(): + raise Exception() + + with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): + worker.schedule_pipeline(woops, name="woops", hours={10}) + + with time_machine.travel(datetime.datetime(2024, 1, 1, 10, 0)): + worker.run_pending() + + runs = list(db_session.execute(select(PipelineRun)).scalars()) + assert len(runs) == 1 + assert runs[0].pipeline == "woops" + assert runs[0].status == PipelineStatus.FAILURE + + def test_pipeline_ran_successfully(db_session): class TestPipeline: pass From c93fa8e1322b8038810bd22171baf486a8078288 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sun, 8 Dec 2024 15:21:38 +0100 Subject: [PATCH 08/10] Store pipeline checksums and provide them to subsequent runs on the same day --- ...d_add_checksum_column_to_pipeline_runs_.py | 24 +++++++ backend/howtheyvote/models/common.py | 1 + backend/howtheyvote/worker/__init__.py | 18 +++++- backend/howtheyvote/worker/worker.py | 15 +++++ backend/tests/worker/test_worker.py | 62 ++++++++++++++++++- 5 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py diff --git a/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py b/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py new file mode 100644 index 000000000..efbc165ca --- /dev/null +++ b/backend/howtheyvote/alembic/versions/2f958a6f147d_add_checksum_column_to_pipeline_runs_.py @@ -0,0 +1,24 @@ +"""Add checksum column to pipeline_runs table + +Revision ID: 2f958a6f147d +Revises: 1f516b18c4f6 +Create Date: 2024-12-07 17:12:10.792707 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "2f958a6f147d" +down_revision = "1f516b18c4f6" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column("pipeline_runs", sa.Column("checksum", sa.Unicode)) + + +def downgrade() -> None: + op.drop_column("pipeline_runs", "checksum") diff --git a/backend/howtheyvote/models/common.py b/backend/howtheyvote/models/common.py index ecc40c932..d5c2d2502 100644 --- a/backend/howtheyvote/models/common.py +++ b/backend/howtheyvote/models/common.py @@ -51,3 +51,4 @@ class PipelineRun(Base): finished_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime) pipeline: Mapped[str] = mapped_column(sa.Unicode) status: Mapped[PipelineStatus] = mapped_column(sa.Enum(PipelineStatus)) + checksum: Mapped[str] = mapped_column(sa.Unicode) diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index 955219f7f..dadaf6d86 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -18,7 +18,13 @@ SessionsPipeline, ) from ..query import session_is_current_at -from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully +from .worker import ( + SkipPipelineError, + Weekday, + Worker, + last_pipeline_run_checksum, + pipeline_ran_successfully, +) log = get_logger(__name__) @@ -52,7 +58,15 @@ def op_rcv_evening() -> PipelineResult: if pipeline_ran_successfully(RCVListPipeline, today, count=2): raise SkipPipelineError() - pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) + last_run_checksum = last_pipeline_run_checksum( + pipeline=RCVListPipeline, + date=today, + ) + pipeline = RCVListPipeline( + term=config.CURRENT_TERM, + date=today, + last_run_checksum=last_run_checksum, + ) return pipeline.run() diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index b34fa6454..6aec5548c 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -77,6 +77,18 @@ def pipeline_ran_successfully( return result >= count +def last_pipeline_run_checksum(pipeline: type[object], date: datetime.date) -> str | None: + """Returns the checksum of the most recent pipeline run on a given day.""" + query = ( + select(PipelineRun.checksum) + .where(PipelineRun.pipeline == pipeline.__name__) + .where(func.date(PipelineRun.started_at) == func.date(date)) + .where(PipelineRun.status == PipelineStatus.SUCCESS) + .order_by(PipelineRun.finished_at.desc()) + ) + return Session.execute(query).scalar() + + class Worker: """Running a worker starts a long-running process that executes data pipelines in regular intervals and stores the status of the pipeline runs in the database.""" @@ -166,11 +178,13 @@ def wrapped_handler() -> None: try: result = handler() status = result.status + checksum = result.checksum except SkipPipelineError: # Do not log skipped pipeline runs return except Exception: status = PipelineStatus.FAILURE + checksum = None log.exception("Unhandled exception during pipeline run", pipeline=name) duration = time.time() - start_time @@ -185,6 +199,7 @@ def wrapped_handler() -> None: started_at=started_at, finished_at=finished_at, status=status, + checksum=checksum, ) Session.add(run) diff --git a/backend/tests/worker/test_worker.py b/backend/tests/worker/test_worker.py index 2d833b43b..e6cc59009 100644 --- a/backend/tests/worker/test_worker.py +++ b/backend/tests/worker/test_worker.py @@ -5,7 +5,12 @@ from howtheyvote.models import PipelineRun, PipelineStatus from howtheyvote.pipelines import PipelineResult -from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully +from howtheyvote.worker.worker import ( + Weekday, + Worker, + last_pipeline_run_checksum, + pipeline_ran_successfully, +) def get_handler(): @@ -118,7 +123,7 @@ def test_worker_schedule_pipeline_log_runs(db_session): def pipeline_handler(): return PipelineResult( status=PipelineStatus.SUCCESS, - checksum=None, + checksum="123abc", ) with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): @@ -137,6 +142,7 @@ def pipeline_handler(): run = runs[0] assert run.pipeline == "test" assert run.status == PipelineStatus.SUCCESS + assert run.checksum == "123abc" assert run.started_at.date() == datetime.date(2024, 1, 1) assert run.finished_at.date() == datetime.date(2024, 1, 1) @@ -239,3 +245,55 @@ class TestPipeline: db_session.commit() assert pipeline_ran_successfully(TestPipeline, today, count=2) is True + + +def test_last_pipeline_run_checksum(db_session): + class TestPipeline: + pass + + with time_machine.travel(datetime.datetime(2024, 1, 1, 0, 0)): + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 1), + ) + assert checksum is None + + run = PipelineRun( + started_at=datetime.datetime(2024, 1, 1, 0, 0, 0), + finished_at=datetime.datetime(2024, 1, 1, 0, 0, 0), + pipeline=TestPipeline.__name__, + status=PipelineStatus.SUCCESS, + checksum="123abc", + ) + db_session.add(run) + db_session.commit() + + with time_machine.travel(datetime.datetime(2024, 1, 1, 1, 0, 0)): + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 1), + ) + assert checksum == "123abc" + + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 2), + ) + assert checksum is None + + run = PipelineRun( + started_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + finished_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + pipeline=TestPipeline.__name__, + status=PipelineStatus.SUCCESS, + checksum="456def", + ) + db_session.add(run) + db_session.commit() + + with time_machine.travel(datetime.datetime(2024, 1, 1, 13, 0, 0)): + checksum = last_pipeline_run_checksum( + pipeline=TestPipeline, + date=datetime.date(2024, 1, 1), + ) + assert checksum == "456def" From 1660a373a60dc453e1b947aaa7da5480ee64a312 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sun, 8 Dec 2024 16:09:27 +0100 Subject: [PATCH 09/10] =?UTF-8?q?Refactor:=20Don=E2=80=99t=20compute=20che?= =?UTF-8?q?cksums=20in=20scraper?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I initially added some code to `BaseScraper` that automatically computes the response checksum and exposes it as a property. This is only ever used by pipelines, so in order to remove some indirection, I’m moving it to a pipelines helper. --- backend/howtheyvote/pipelines/common.py | 7 +++++++ backend/howtheyvote/pipelines/rcv_list.py | 11 ++++++++--- backend/howtheyvote/scrapers/common.py | 13 +++---------- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/backend/howtheyvote/pipelines/common.py b/backend/howtheyvote/pipelines/common.py index 774eb1b0f..91dc8c15e 100644 --- a/backend/howtheyvote/pipelines/common.py +++ b/backend/howtheyvote/pipelines/common.py @@ -1,7 +1,9 @@ +import hashlib from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any +from requests import Response from structlog import get_logger from ..models import PipelineStatus @@ -59,3 +61,8 @@ def run(self) -> PipelineResult: @abstractmethod def _run(self) -> None: raise NotImplementedError + + +def compute_response_checksum(response: Response) -> str: + """Compute the SHA256 hash of the response contents.""" + return hashlib.sha256(response.content).hexdigest() diff --git a/backend/howtheyvote/pipelines/rcv_list.py b/backend/howtheyvote/pipelines/rcv_list.py index f2e942f63..cc6bb06d6 100644 --- a/backend/howtheyvote/pipelines/rcv_list.py +++ b/backend/howtheyvote/pipelines/rcv_list.py @@ -27,7 +27,12 @@ ) from ..sharepics import generate_vote_sharepic from ..store import Aggregator, BulkWriter, index_records, map_vote, map_vote_group -from .common import BasePipeline, DataUnavailableError, DataUnchangedError +from .common import ( + BasePipeline, + DataUnavailableError, + DataUnchangedError, + compute_response_checksum, +) log = get_logger(__name__) @@ -98,13 +103,13 @@ def _scrape_rcv_list(self) -> None: if ( self.last_run_checksum is not None - and self.last_run_checksum == scraper.response_checksum + and self.last_run_checksum == compute_response_checksum(scraper.response) ): raise DataUnchangedError( "The data source hasn't changed since the last pipeline run." ) - self.checksum = scraper.response_checksum + self.checksum = compute_response_checksum(scraper.response) writer = BulkWriter() writer.add(fragments) diff --git a/backend/howtheyvote/scrapers/common.py b/backend/howtheyvote/scrapers/common.py index 82b707792..28ff3babd 100644 --- a/backend/howtheyvote/scrapers/common.py +++ b/backend/howtheyvote/scrapers/common.py @@ -1,4 +1,3 @@ -import hashlib import html import random import time @@ -95,18 +94,15 @@ def get_url( class BaseScraper(ABC, Generic[ResourceType]): REQUEST_MAX_RETRIES: int = 0 - response_checksum: str | None def __init__(self, request_cache: RequestCache | None = None, **kwargs: Any) -> None: self._request_cache = request_cache self._log = log.bind(scraper=type(self).__name__, **kwargs) - self.response_checksum = None def run(self) -> Any: self._log.info("Running scraper") - self._response = self._fetch() - self.response_checksum = self._compute_checksum(self._response) - doc = self._parse(self._response) + self.response = self._fetch() + doc = self._parse(self.response) return self._extract_data(doc) @abstractmethod @@ -132,7 +128,7 @@ def _fragment( model=model.__name__, source_id=source_id, source_name=type(self).__name__, - source_url=self._response.request.url, + source_url=self.response.request.url, group_key=group_key, data=data, ) @@ -168,9 +164,6 @@ def _headers(self) -> dict[str, str]: "user-agent": random.choice(USER_AGENTS), } - def _compute_checksum(self, response: Response) -> str: - return hashlib.sha256(response.content).hexdigest() - class BeautifulSoupScraper(BaseScraper[BeautifulSoup]): BS_PARSER: str = "lxml" From c1dc7c470f011992556e3b8820a873f0ba6a9b64 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sun, 8 Dec 2024 16:16:15 +0100 Subject: [PATCH 10/10] Refactor: Remove unnecessary package exports All three are only used by other modules from the same package --- backend/howtheyvote/pipelines/__init__.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/backend/howtheyvote/pipelines/__init__.py b/backend/howtheyvote/pipelines/__init__.py index 20f776135..04990465c 100644 --- a/backend/howtheyvote/pipelines/__init__.py +++ b/backend/howtheyvote/pipelines/__init__.py @@ -1,21 +1,11 @@ -from .common import ( - BasePipeline, - DataUnavailableError, - DataUnchangedError, - PipelineError, - PipelineResult, -) +from .common import PipelineResult from .members import MembersPipeline from .press import PressPipeline from .rcv_list import RCVListPipeline from .sessions import SessionsPipeline __all__ = [ - "BasePipeline", "PipelineResult", - "PipelineError", - "DataUnavailableError", - "DataUnchangedError", "RCVListPipeline", "PressPipeline", "MembersPipeline",