Skip to content

Commit

Permalink
Add flag to enable generation non-agg features (feathr-ai#719)
Browse files Browse the repository at this point in the history
* Add flag to enable generation non-agg features

* Typo

* Resolve comments
  • Loading branch information
windoze authored Oct 20, 2022
1 parent c0e8bc8 commit 143ff89
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 8 deletions.
19 changes: 17 additions & 2 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, List, Union

from azure.identity import DefaultAzureCredential
from feathr.definition.transformation import WindowAggTransformation
from jinja2 import Template
from pyhocon import ConfigFactory
import redis
Expand Down Expand Up @@ -608,17 +609,31 @@ def _valid_materialize_keys(self, features: List[str], allow_empty_key=False):
self.logger.error(f"Inconsistent feature keys. Current keys are {str(keys)}")
return False
return True

def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False):
def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, allow_materialize_non_agg_feature: bool = False):
"""Materialize feature data
Args:
settings: Feature materialization settings
execution_configurations: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.
allow_materialize_non_agg_feature: Materializing non-aggregated features (the features without WindowAggTransformation) doesn't output meaningful results so it's by default set to False, but if you really want to materialize non-aggregated features, set this to True.
"""
feature_list = settings.feature_names
if len(feature_list) > 0 and not self._valid_materialize_keys(feature_list):
raise RuntimeError(f"Invalid materialization features: {feature_list}, since they have different keys. Currently Feathr only supports materializing features of the same keys.")

if not allow_materialize_non_agg_feature:
# Check if there are non-aggregation features in the list
for fn in feature_list:
# Check over anchor features
for anchor in self.anchor_list:
for feature in anchor.features:
if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation):
raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features. If you want to materialize {fn}, please set allow_materialize_non_agg_feature to True.")
# Check over derived features
for feature in self.derived_feature_list:
if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation):
raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features. If you want to materialize {fn}, please set allow_materialize_non_agg_feature to True.")

# Collect secrets from sinks
secrets = []
Expand Down
2 changes: 1 addition & 1 deletion feathr_project/test/test_azure_snowflake_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_feathr_online_store_agg_features():
feature_names=['f_snowflake_call_center_division_name',
'f_snowflake_call_center_zipcode'],
backfill_time=backfill_time)
client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)
# just assume the job is successful without validating the actual result in Redis. Might need to consolidate
# this part with the test_feathr_online_store test case
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
Expand Down
2 changes: 1 addition & 1 deletion feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def test_feathr_online_store_non_agg_features():
feature_names=["f_gen_trip_distance", "f_gen_is_long_trip_distance", "f1", "f2", "f3", "f4", "f5", "f6"],
backfill_time=backfill_time)

client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)
# just assume the job is successful without validating the actual result in Redis. Might need to consolidate
# this part with the test_feathr_online_store test case
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
Expand Down
37 changes: 35 additions & 2 deletions feathr_project/test/test_azure_spark_maven_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
from pathlib import Path

from feathr import (BackfillTime, MaterializationSettings)
from feathr import RedisSink
# from feathr import *
from feathr.client import FeathrClient
from feathr.definition.dtype import ValueType
from feathr.definition.query_feature_list import FeatureQuery
from feathr.definition.settings import ObservationSettings
from feathr.definition.typed_key import TypedKey
from test_fixture import (basic_test_setup, get_online_test_table_name)
from test_utils.constants import Constants

Expand All @@ -22,6 +26,35 @@ def test_feathr_online_store_agg_features():
# Maven package as the dependency and `noop.jar` as the main file
client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_maven.yaml"))



location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")

feature_query = FeatureQuery(
feature_list=["f_location_avg_fare"], key=location_id)
settings = ObservationSettings(
observation_path="wasbs://[email protected]/sample_data/green_tripdata_2020-04.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")

now = datetime.now()
# set output folder based on different runtime
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"])
else:
output_path = ''.join(['abfss://[email protected]/demo_data/output','_', str(now.minute), '_', str(now.second), ".avro"])


client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path)

# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
return
backfill_time = BackfillTime(start=datetime(
2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))
redisSink = RedisSink(table_name=online_test_table)
Expand Down Expand Up @@ -51,4 +84,4 @@ def test_feathr_online_store_agg_features():
assert res['239'][0] != None
assert res['239'][1] != None
assert res['265'][0] != None
assert res['265'][1] != None
assert res['265'][1] != None
2 changes: 1 addition & 1 deletion feathr_project/test/test_feature_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_delete_feature_from_redis():
"f_day_of_week"
],
backfill_time=backfill_time)
client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)

client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

Expand Down
2 changes: 1 addition & 1 deletion feathr_project/test/test_pyduf_preprocessing_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing():
"f_day_of_week"
],
backfill_time=backfill_time)
client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)
# just assume the job is successful without validating the actual result in Redis. Might need to consolidate
# this part with the test_feathr_online_store test case
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
Expand Down

0 comments on commit 143ff89

Please sign in to comment.