Skip to content

Commit

Permalink
Migrate where/when we filter for the freshest XBRL data (catalyst-coo…
Browse files Browse the repository at this point in the history
…perative#3861)

* migrate the comparing of the freshest data methodologies into tests and the filtering into the transform step

* omigosh more moving of stuff

* why did it let me commit that when the unit test failed? bc its marked slow?

* rejigger the filtering so we can run it in the tests

* re-work how we compare the filtering methodologies

* transition fully to new compare diffs

* clean up the arg names for this filter_for_freshest_data_xbrl
  • Loading branch information
cmgosnell authored Oct 1, 2024
1 parent cb4a739 commit c37dffb
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 301 deletions.
2 changes: 1 addition & 1 deletion src/pudl/extract/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@
"sales_of_electricity_by_rate_schedules_account_445_other_sales_to_public_authorities_304",
"sales_of_electricity_by_rate_schedules_account_446_sales_to_railroads_and_railways_304",
"sales_of_electricity_by_rate_schedules_account_448_interdepartmental_sales_304",
"sales_of_electricity_by_rate_schedules_account_4491_provision_for_rate_refunds_304",
"sales_of_electricity_by_rate_schedules_account_449_1_provision_for_rate_refunds_304",
"sales_of_electricity_by_rate_schedules_account_totals_304",
],
},
Expand Down
131 changes: 3 additions & 128 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Dagster IO Managers."""

import json
import re
from pathlib import Path
from sqlite3 import sqlite_version
Expand Down Expand Up @@ -688,108 +687,6 @@ class FercXBRLSQLiteIOManager(FercSQLiteIOManager):
metadata.
"""

@staticmethod
def filter_for_freshest_data(
table: pd.DataFrame, primary_key: list[str]
) -> pd.DataFrame:
"""Get most updated values for each XBRL context.
An XBRL context includes an entity ID, the time period the data applies to, and
other dimensions such as utility type. Each context has its own ID, but they are
frequently redefined with the same contents but different IDs - so we identify
them by their actual content.
Each row in our SQLite database includes all the facts for one context/filing
pair.
If one context is represented in multiple filings, we take the most
recently-reported non-null value.
This means that if a utility reports a non-null value, then later
either reports a null value for it or simply omits it from the report,
we keep the old non-null value, which may be erroneous. This appears to
be fairly rare, affecting < 0.005% of reported values.
"""

def __apply_diffs(
duped_groups: pd.core.groupby.DataFrameGroupBy,
) -> pd.DataFrame:
"""Take the latest reported non-null value for each group."""
return duped_groups.last()

def __best_snapshot(
duped_groups: pd.core.groupby.DataFrameGroupBy,
) -> pd.DataFrame:
"""Take the row that has most non-null values out of each group."""
# Ignore errors when dropping the "count" column since empty
# groupby won't have this column.
return duped_groups.apply(
lambda df: df.assign(count=df.count(axis="columns"))
.sort_values(by="count", ascending=True)
.tail(1)
).drop(columns="count", errors="ignore")

def __compare_dedupe_methodologies(
apply_diffs: pd.DataFrame, best_snapshot: pd.DataFrame
):
"""Compare deduplication methodologies.
By cross-referencing these we can make sure that the apply-diff
methodology isn't doing something unexpected.
The main thing we want to keep tabs on is apply-diff adding new
non-null values compared to best-snapshot, because some of those
are instances of a value correctly being reported as `null`.
Instead of stacking the two datasets, merging by context, and then
looking for left_only or right_only values, we just count non-null
values. This is because we would want to use the report_year as a
merge key, but that isn't available until after we pipe the
dataframe through `refine_report_year`.
"""
n_diffs = apply_diffs.count().sum()
n_best = best_snapshot.count().sum()

if n_diffs < n_best:
raise ValueError(
f"Found {n_diffs} non-null values with apply-diffs"
f"methodology, and {n_best} with best-snapshot. "
"apply-diffs should be >= best-snapshot."
)

# 2024-04-10: this threshold set by looking at existing values for FERC
# <=2022. It was updated from .3 to .44 during the 2023 update.
threshold_ratio = 1.0044
if (found_ratio := n_diffs / n_best) > threshold_ratio:
raise ValueError(
"Found more than expected excess non-null values using the "
f"currently implemented apply_diffs methodology (#{n_diffs}) as "
f"compared to the best_snapshot methodology (#{n_best}). We expected"
" the apply_diffs methodology to result in no more than "
f"{threshold_ratio:.2%} non-null records but found {found_ratio:.2%}.\n\n"
"We are concerned about excess non-null values because apply-diffs "
"grabs the most recent non-null values. If this error is raised, "
"investigate filter_for_freshest_data."
)

filing_metadata_cols = {"publication_time", "filing_name"}
xbrl_context_cols = [c for c in primary_key if c not in filing_metadata_cols]
original = table.sort_values("publication_time")
dupe_mask = original.duplicated(subset=xbrl_context_cols, keep=False)
duped_groups = original.loc[dupe_mask].groupby(
xbrl_context_cols, as_index=False, dropna=True
)
never_duped = original.loc[~dupe_mask]
apply_diffs = __apply_diffs(duped_groups)
# TODO: MAKE THIS FASTER AND TURN IT BACK ON!!!
# best_snapshot = __best_snapshot(duped_groups)
# __compare_dedupe_methodologies(
# apply_diffs=apply_diffs, best_snapshot=best_snapshot
# )

deduped = pd.concat([never_duped, apply_diffs], ignore_index=True)
return deduped

@staticmethod
def refine_report_year(df: pd.DataFrame, xbrl_years: list[int]) -> pd.DataFrame:
"""Set a fact's report year by its actual dates.
Expand Down Expand Up @@ -832,19 +729,6 @@ def get_year(df: pd.DataFrame, col: str) -> pd.Series:
.reset_index(drop=True)
)

def _get_primary_key(self, sched_table_name: str) -> list[str]:
# TODO (daz): as of 2023-10-13, our datapackage.json is merely
# "frictionless-like" so we manually parse it as JSON. once we make our
# datapackage.json conformant, we will need to at least update the
# "primary_key" to "primaryKey", but maybe there will be other changes
# as well.
with (self.base_dir / f"{self.db_name}_datapackage.json").open() as f:
datapackage = json.loads(f.read())
[table_resource] = [
tr for tr in datapackage["resources"] if tr["name"] == sched_table_name
]
return table_resource["schema"]["primary_key"]

def handle_output(self, context: OutputContext, obj: pd.DataFrame | str):
"""Handle an op or asset output."""
raise NotImplementedError("FercXBRLSQLiteIOManager can't write outputs yet.")
Expand Down Expand Up @@ -881,18 +765,9 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
con=con,
).assign(sched_table_name=sched_table_name)

primary_key = self._get_primary_key(table_name)

return (
df.pipe(
FercXBRLSQLiteIOManager.filter_for_freshest_data,
primary_key=primary_key,
)
.pipe(
FercXBRLSQLiteIOManager.refine_report_year,
xbrl_years=ferc_settings.xbrl_years,
)
.drop(columns=["publication_time"])
return df.pipe(
FercXBRLSQLiteIOManager.refine_report_year,
xbrl_years=ferc_settings.xbrl_years,
)


Expand Down
1 change: 1 addition & 0 deletions src/pudl/transform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
eia_bulk_elec,
eiaaeo,
epacems,
ferc,
ferc1,
ferc714,
gridpathratoolkit,
Expand Down
145 changes: 145 additions & 0 deletions src/pudl/transform/ferc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Module for shared helpers for FERC Form transforms."""

import json
from typing import Literal

import pandas as pd

import pudl
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)


def __apply_diffs(
duped_groups: pd.core.groupby.DataFrameGroupBy,
) -> pd.DataFrame:
"""Take the latest reported non-null value for each group."""
return duped_groups.last()


def __best_snapshot(
duped_groups: pd.core.groupby.DataFrameGroupBy,
) -> pd.DataFrame:
"""Take the row that has most non-null values out of each group."""
# Ignore errors when dropping the "count" column since empty
# groupby won't have this column.
return duped_groups.apply(
lambda df: df.assign(count=df.count(axis="columns"))
.sort_values(by="count", ascending=True)
.tail(1)
).drop(columns="count", errors="ignore")


def __compare_dedupe_methodologies(
applied_diffs: pd.DataFrame,
best_snapshot: pd.DataFrame,
xbrl_context_cols: list[str],
):
"""Compare deduplication methodologies.
By cross-referencing these we can make sure that the apply-diff
methodology isn't doing something unexpected.
The main things we want to keep tabs on are: whether apply-diff is
adding more than expected differences compared to best-snapshot and
whether or not apply-diff is giving us more values than best-snapshot.
"""

def _stack_pre_merge(df):
filing_metadata_cols = {"publication_time", "filing_name"}
return pd.DataFrame(
df.set_index(xbrl_context_cols + ["report_year"])
.drop(columns=filing_metadata_cols)
.rename_axis("xbrl_factoid", axis=1)
.stack(0),
columns=["value"],
).reset_index()

test_filters = pd.merge(
_stack_pre_merge(applied_diffs),
_stack_pre_merge(best_snapshot),
on=xbrl_context_cols + ["report_year", "value", "xbrl_factoid"],
how="outer",
indicator=True,
)
merge_counts = test_filters._merge.value_counts()
if (n_diffs := merge_counts.left_only) < (n_best := merge_counts.right_only):
raise AssertionError(
"We expected to find more values with the apply_diffs methodology, "
f"but we found {n_diffs} unique apply_diffs values and {n_best}"
f"unique best_snapshot values."
)
difference_ratio = sum(merge_counts.loc[["left_only", "right_only"]]) / sum(
merge_counts
)
threshold_ratio = 0.025
if difference_ratio > threshold_ratio:
raise AssertionError(
"We expected the currently implemented apply_diffs methodology and the "
"best_snapshot methodology to result in no more than "
f"{threshold_ratio:.2%} of records with differing values but "
f"found {difference_ratio:.2%}."
)


def filter_for_freshest_data_xbrl(
xbrl_table: pd.DataFrame, primary_keys, compare_methods: bool = False
) -> pd.DataFrame:
"""Get most updated values for each XBRL context.
An XBRL context includes an entity ID, the time period the data applies to, and
other dimensions such as utility type. Each context has its own ID, but they are
frequently redefined with the same contents but different IDs - so we identify
them by their actual content.
Each row in our SQLite database includes all the facts for one context/filing
pair.
If one context is represented in multiple filings, we take the most
recently-reported non-null value.
This means that if a utility reports a non-null value, then later
either reports a null value for it or simply omits it from the report,
we keep the old non-null value, which may be erroneous. This appears to
be fairly rare, affecting < 0.005% of reported values.
"""
if not xbrl_table.empty:
filing_metadata_cols = {"publication_time", "filing_name"}
xbrl_context_cols = [c for c in primary_keys if c not in filing_metadata_cols]
original = xbrl_table.sort_values("publication_time")
dupe_mask = original.duplicated(subset=xbrl_context_cols, keep=False)
duped_groups = original.loc[dupe_mask].groupby(
xbrl_context_cols, as_index=False, dropna=True
)
never_duped = original.loc[~dupe_mask]
applied_diffs = __apply_diffs(duped_groups)
if compare_methods:
best_snapshot = __best_snapshot(duped_groups)
__compare_dedupe_methodologies(
applied_diffs=applied_diffs,
best_snapshot=best_snapshot,
xbrl_context_cols=xbrl_context_cols,
)

xbrl_table = pd.concat([never_duped, applied_diffs], ignore_index=True).drop(
columns=["publication_time"]
)
return xbrl_table


def get_primary_key_raw_xbrl(
sched_table_name: str, ferc_form: Literal["ferc1", "ferc714"]
) -> list[str]:
"""Get the primary key for a raw XBRL table from the XBRL datapackage."""
# TODO (daz): as of 2023-10-13, our datapackage.json is merely
# "frictionless-like" so we manually parse it as JSON. once we make our
# datapackage.json conformant, we will need to at least update the
# "primary_key" to "primaryKey", but maybe there will be other changes
# as well.
with (PudlPaths().output_dir / f"{ferc_form}_xbrl_datapackage.json").open() as f:
datapackage = json.loads(f.read())
[table_resource] = [
tr for tr in datapackage["resources"] if tr["name"] == sched_table_name
]
return table_resource["schema"]["primary_key"]
36 changes: 29 additions & 7 deletions src/pudl/transform/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
cache_df,
enforce_snake_case,
)
from pudl.transform.ferc import filter_for_freshest_data_xbrl, get_primary_key_raw_xbrl

logger = pudl.logging_helpers.get_logger(__name__)

Expand Down Expand Up @@ -305,7 +306,7 @@ def wide_to_tidy(df: pd.DataFrame, params: WideToTidy) -> pd.DataFrame:
)
df_out.columns = new_cols
df_out = (
df_out.stack(params.stacked_column_name, dropna=False)
df_out.stack(params.stacked_column_name, future_stack=True)
.loc[:, params.value_types]
.reset_index()
)
Expand Down Expand Up @@ -6123,13 +6124,13 @@ def ferc1_transform_asset_factory(
dbf_tables = listify(TABLE_NAME_MAP_FERC1[table_name]["dbf"])
xbrl_tables = listify(TABLE_NAME_MAP_FERC1[table_name]["xbrl"])

ins = {f"raw_dbf__{tn}": AssetIn(f"raw_ferc1_dbf__{tn}") for tn in dbf_tables}
ins = {f"raw_ferc1_dbf__{tn}": AssetIn(f"raw_ferc1_dbf__{tn}") for tn in dbf_tables}
ins |= {
f"raw_xbrl_instant__{tn}": AssetIn(f"raw_ferc1_xbrl__{tn}_instant")
f"raw_ferc1_xbrl__{tn}_instant": AssetIn(f"raw_ferc1_xbrl__{tn}_instant")
for tn in xbrl_tables
}
ins |= {
f"raw_xbrl_duration__{tn}": AssetIn(f"raw_ferc1_xbrl__{tn}_duration")
f"raw_ferc1_xbrl__{tn}_duration": AssetIn(f"raw_ferc1_xbrl__{tn}_duration")
for tn in xbrl_tables
}
ins["_core_ferc1_xbrl__metadata_json"] = AssetIn("_core_ferc1_xbrl__metadata_json")
Expand Down Expand Up @@ -6162,13 +6163,30 @@ def ferc1_transform_asset(**kwargs: dict[str, pd.DataFrame]) -> pd.DataFrame:
)

raw_dbf = pd.concat(
[df for key, df in kwargs.items() if key.startswith("raw_dbf__")]
[df for key, df in kwargs.items() if key.startswith("raw_ferc1_dbf__")]
)
raw_xbrls = {
tn: filter_for_freshest_data_xbrl(
df,
get_primary_key_raw_xbrl(tn.removeprefix("raw_ferc1_xbrl__"), "ferc1"),
)
for tn, df in kwargs.items()
if tn.startswith("raw_ferc1_xbrl__")
}
for raw_xbrl_table_name in listify(TABLE_NAME_MAP_FERC1[table_name]["xbrl"]):
if (
raw_xbrls[f"raw_ferc1_xbrl__{raw_xbrl_table_name}_instant"].empty
and raw_xbrls[f"raw_ferc1_xbrl__{raw_xbrl_table_name}_duration"].empty
):
raise AssertionError(
f"{raw_xbrl_table_name} has neither instant nor duration tables. "
"Is it spelled correctly in pudl.extract.ferc1.TABLE_NAME_MAP_FERC1"
)
raw_xbrl_instant = pd.concat(
[df for key, df in kwargs.items() if key.startswith("raw_xbrl_instant__")]
[df for key, df in raw_xbrls.items() if key.endswith("_instant")]
)
raw_xbrl_duration = pd.concat(
[df for key, df in kwargs.items() if key.startswith("raw_xbrl_duration__")]
[df for key, df in raw_xbrls.items() if key.endswith("_duration")]
)
df = transformer.transform(
raw_dbf=raw_dbf,
Expand Down Expand Up @@ -6196,6 +6214,10 @@ def create_ferc1_transform_assets() -> list[AssetsDefinition]:

ferc1_assets = create_ferc1_transform_assets()

##########################################
# Post-core tables/XBLR Calculations Stuff
##########################################


def other_dimensions(table_names: list[str]) -> list[str]:
"""Get a list of the other dimension columns across all of the transformers."""
Expand Down
Loading

0 comments on commit c37dffb

Please sign in to comment.