Skip to content

Commit

Permalink
Merge pull request #203 from InstitutoTodosPelaSaude/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
mathdesilva authored Jan 14, 2025
2 parents d989ade + f3811ef commit a194deb
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM python:3.11-slim-bookworm

ENV TZ=America/Sao_Paulo

WORKDIR /usr/app/arboviroses/

COPY requirements.txt /usr/app/arboviroses/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
DAGSTER_SLACK_BOT_CHANNEL = os.getenv('DAGSTER_SLACK_BOT_CHANNEL')

@asset_sensor(
asset_key=AssetKey('combined_05_location'),
asset_key=AssetKey('zip_exported_file'),
job=report_epirio_assets_job,
default_status=DefaultSensorStatus.RUNNING
)
Expand Down
2 changes: 1 addition & 1 deletion dagster/matrices/matrices/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def export_matrices_to_xlsx(context):
matrices_all_assets_job = define_asset_job(name="matrices_all_assets_job")

@asset_sensor(
asset_key=AssetKey('combined_05_location'),
asset_key=AssetKey('zip_exported_file'),
job=matrices_all_assets_job,
default_status=DefaultSensorStatus.RUNNING
)
Expand Down
5 changes: 4 additions & 1 deletion dagster/save_results/save_results/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
DAGSTER_SLACK_BOT_TOKEN = os.getenv('DAGSTER_SLACK_BOT_TOKEN')
DAGSTER_SLACK_BOT_CHANNEL = os.getenv('DAGSTER_SLACK_BOT_CHANNEL')

@asset(compute_kind="python")
@asset(
compute_kind="python",
deps=[AssetKey('zip_exported_file')]
)
def create_new_folder(context):
file_system = FileSystem(root_path=REPORTS_FILES_FOLDER)

Expand Down
20 changes: 16 additions & 4 deletions dagster/save_results/save_results/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@
save_matrices_files,
save_external_reports_files
)
from .jobs import save_files_assets_job
from .jobs import (
create_new_folder_job,
save_matrices_files_job,
save_external_reports_files_job,
)
from .sensors import (
run_save_results_sensor,
run_create_new_folder_sensor,
run_save_matrices_files_sensor,
run_save_external_reports_files_sensor,
save_files_slack_success_sensor,
save_files_slack_failure_sensor
)
Expand All @@ -22,9 +28,15 @@
save_matrices_files,
save_external_reports_files
],
jobs=[save_files_assets_job],
jobs=[
create_new_folder_job,
save_matrices_files_job,
save_external_reports_files_job
],
sensors=[
run_save_results_sensor,
run_create_new_folder_sensor,
run_save_matrices_files_sensor,
run_save_external_reports_files_sensor,
save_files_slack_success_sensor,
save_files_slack_failure_sensor,
]
Expand Down
22 changes: 20 additions & 2 deletions dagster/save_results/save_results/jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
from dagster import define_asset_job

save_files_assets_job = define_asset_job(
name="save_files_assets_job"
create_new_folder_job = define_asset_job(
name="create_new_folder_job",
selection=[
'create_new_folder',
'save_combined_files'
]
)

save_matrices_files_job = define_asset_job(
name="save_matrices_files_job",
selection=[
'save_matrices_files'
]
)

save_external_reports_files_job = define_asset_job(
name="save_external_reports_files_job",
selection=[
'save_external_reports_files'
]
)
58 changes: 41 additions & 17 deletions dagster/save_results/save_results/sensors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dagster import (
multi_asset_sensor,
asset_sensor,
EventLogEntry,
AssetKey,
RunRequest,
DefaultSensorStatus,
Expand All @@ -18,34 +20,52 @@
from dotenv import load_dotenv
from time import sleep

from .jobs import save_files_assets_job
from .jobs import (
create_new_folder_job,
save_matrices_files_job,
save_external_reports_files_job,
)

load_dotenv()
DAGSTER_SLACK_BOT_TOKEN = os.getenv('DAGSTER_SLACK_BOT_TOKEN')
DAGSTER_SLACK_BOT_CHANNEL = os.getenv('DAGSTER_SLACK_BOT_CHANNEL')
DAGSTER_SLACK_BOT_MAIN_CHANNEL = os.getenv('DAGSTER_SLACK_BOT_MAIN_CHANNEL')
MINIO_UI_URL = os.getenv('MINIO_UI_URL')

@multi_asset_sensor(
monitored_assets=[
AssetKey("export_matrices_to_xlsx"),
AssetKey("report_epirio_export_to_tsv")
],
job=save_files_assets_job,
@asset_sensor(
asset_key=AssetKey("zip_exported_file"),
job=create_new_folder_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=30
)
def run_create_new_folder_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key
return RunRequest()

@asset_sensor(
asset_key=AssetKey("export_matrices_to_xlsx"),
job=save_matrices_files_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=30
)
def run_save_matrices_files_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key
return RunRequest()

@asset_sensor(
asset_key=AssetKey("report_epirio_export_to_tsv"),
job=save_external_reports_files_job,
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=30
)
def run_save_results_sensor(context: SensorEvaluationContext):
# Check if there are new lab assets completed and run combined if it is true
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
# If all upstream jobs are finished, return RunRequest
context.advance_all_cursors()
return RunRequest()

def run_save_external_reports_files_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key
return RunRequest()

@run_status_sensor(
monitored_jobs=[save_files_assets_job],
monitored_jobs=[
create_new_folder_job
],
run_status=DagsterRunStatus.SUCCESS,
default_status=DefaultSensorStatus.RUNNING
)
Expand Down Expand Up @@ -170,7 +190,11 @@ def save_files_slack_success_sensor(context: SensorEvaluationContext):

# Failure sensor that sends a message to slack
save_files_slack_failure_sensor = make_slack_on_run_failure_sensor(
monitored_jobs=[save_files_assets_job],
monitored_jobs=[
create_new_folder_job,
save_matrices_files_job,
save_external_reports_files_job,
],
slack_token=DAGSTER_SLACK_BOT_TOKEN,
channel=DAGSTER_SLACK_BOT_CHANNEL,
default_status=DefaultSensorStatus.RUNNING,
Expand Down
22 changes: 12 additions & 10 deletions dbt/models/matrices/11_DENV_map_pos_direct_cities.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,18 @@ source_data_cumulative_sum AS (

-- Seleção final dos dados, filtrando apenas semanas com casos cumulativos maiores que zero
SELECT
"semanas epidemiologicas",
"location_ibge_code",
"location",
"state_code",
"state",
"lat",
"long",
"epiweek_cases"::INTEGER,
"cumulative_cases"::INTEGER
"semanas epidemiologicas" as "Semanas epidemiologicas",
"location_ibge_code" as "location_ibge_code",
"location" as "Cidade",
"state_code" as "state_code",
"state" as "state",
"lat" as "lat",
"long" as "long",
"epiweek_cases"::INTEGER as "Casos da última semana",
"cumulative_cases"::INTEGER as "Casos cumulativos"
FROM source_data_cumulative_sum
WHERE "cumulative_cases" > 0
WHERE
"cumulative_cases" > 0 AND
location not in ('NOT REPORTED')
ORDER BY "semanas epidemiologicas", "state_code", "location"

10 changes: 5 additions & 5 deletions dbt/models/matrices/11_DENV_map_pos_direct_states.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ source_data_cumulative_sum AS (

-- Seleção final dos dados, filtrando apenas semanas com casos cumulativos maiores que zero
SELECT
"semanas epidemiologicas",
"state_code",
"state",
"epiweek_cases"::INTEGER,
"cumulative_cases"::INTEGER
"semanas epidemiologicas" as "Semanas epidemiológicas",
"state_code" as "Código do estado",
"state" as "Estado",
"epiweek_cases"::INTEGER as "Casos na última semana",
"cumulative_cases"::INTEGER as "Casos cumulativos"
FROM source_data_cumulative_sum
WHERE
"cumulative_cases" > 0 AND
Expand Down
22 changes: 12 additions & 10 deletions dbt/models/matrices/12_CHIKV_map_pos_direct_cities.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,18 @@ source_data_cumulative_sum AS (

-- Seleção final dos dados, filtrando apenas semanas com casos cumulativos maiores que zero
SELECT
"semanas epidemiologicas",
"location_ibge_code",
"location",
"state_code",
"state",
"lat",
"long",
"epiweek_cases"::INTEGER,
"cumulative_cases"::INTEGER
"semanas epidemiologicas" as "Semanas epidemiologicas",
"location_ibge_code" as "location_ibge_code",
"location" as "Cidade",
"state_code" as "state_code",
"state" as "state",
"lat" as "lat",
"long" as "long",
"epiweek_cases"::INTEGER as "Casos da última semana",
"cumulative_cases"::INTEGER as "Casos cumulativos"
FROM source_data_cumulative_sum
WHERE "cumulative_cases" > 0
WHERE
"cumulative_cases" > 0 AND
location not in ('NOT REPORTED')
ORDER BY "semanas epidemiologicas", "state_code", "location"

10 changes: 5 additions & 5 deletions dbt/models/matrices/12_CHIKV_map_pos_direct_states.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ source_data_cumulative_sum AS (

-- Seleção final dos dados, filtrando apenas semanas com casos cumulativos maiores que zero
SELECT
"semanas epidemiologicas",
"state_code",
"state",
"epiweek_cases"::INTEGER,
"cumulative_cases"::INTEGER
"semanas epidemiologicas" as "Semanas epidemiológicasc",
"state_code" as "Código do estado",
"state" as "Estado",
"epiweek_cases"::INTEGER as "Casos na última semana",
"cumulative_cases"::INTEGER as "Casos cumulativos"
FROM source_data_cumulative_sum
WHERE
"cumulative_cases" > 0 AND
Expand Down
13 changes: 10 additions & 3 deletions utils/epiweek.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import datetime
from datetime import datetime, timedelta
from epiweeks import Week

def get_epiweek_str(
datetime_: datetime = datetime.now(),
format: str = '{EPINUM}',
zfill: int = 0
zfill: int = 0,
use_epiweek_enddate: bool = True
):
""" Create a string with the epiweek number from 'datetime_'.
Use {EPINUM} to show where you want to insert the epiweek number
Expand All @@ -23,7 +24,13 @@ def get_epiweek_str(
assert type(datetime_) == datetime, f"'datetime_' must be a datetime, not {type(datetime_)}."
assert type(format) == str, f"'format' must be a str, not {type(format)}."
assert "{EPINUM}" in format, "'format' must have '{EPINUM}' keywork to be replaced by the epiweek number."
assert type(zfill) == int, f"'zfill' must be a int, not {type(zfill)}."
assert type(zfill) == int, f"'zfill' must be an int, not {type(zfill)}."
assert type(use_epiweek_enddate) == bool, f"'use_epiweek_enddate' must be a bool, not {type(use_epiweek_enddate)}."

# Move datetime_ to epiweek enddate, moving for the next saturday.
if use_epiweek_enddate:
days_until_saturday = (5 - datetime_.weekday()) % 7 # Saturday is 5
datetime_ = datetime_ + timedelta(days=days_until_saturday)

# Create string for the datetime format keeping the '{EPINUM}' keyword
datetime_str = datetime_.strftime(format)
Expand Down

0 comments on commit a194deb

Please sign in to comment.