Skip to content

Commit

Permalink
Spark config passing bug fix for local spark submission (feathr-ai#729)
Browse files Browse the repository at this point in the history
* Fix local spark output file-format bug

Signed-off-by: Jun Ki Min <[email protected]>

* Add dev dependencies. Add unit-test for local spark job launcher

Signed-off-by: Jun Ki Min <[email protected]>

* Fix local spark submission unused param error

Signed-off-by: Jun Ki Min <[email protected]>

Signed-off-by: Jun Ki Min <[email protected]>
  • Loading branch information
loomlike authored Oct 12, 2022
1 parent 39c14ca commit c075dc2
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 128 deletions.
53 changes: 25 additions & 28 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
import base64
import copy
import logging
import os
import tempfile
from typing import Dict, List, Union
from feathr.definition.feature import FeatureBase
import copy

import redis
from azure.identity import DefaultAzureCredential
from jinja2 import Template
from pyhocon import ConfigFactory
from feathr.definition.sink import Sink
from feathr.registry.feature_registry import default_registry_client

from feathr.spark_provider._databricks_submission import _FeathrDatabricksJobLauncher
from feathr.spark_provider._synapse_submission import _FeathrSynapseJobLauncher
from feathr.spark_provider._localspark_submission import _FeathrDLocalSparkJobLauncher
import redis

from feathr.definition._materialization_utils import _to_materialization_config
from feathr.udf._preprocessing_pyudf_manager import _PreprocessingPyudfManager
from feathr.constants import *
from feathr.spark_provider.feathr_configurations import SparkExecutionConfiguration
from feathr.definition._materialization_utils import _to_materialization_config
from feathr.definition.anchor import FeatureAnchor
from feathr.definition.feature import FeatureBase
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.materialization_settings import MaterializationSettings
from feathr.definition.monitoring_settings import MonitoringSettings
from feathr.protobuf.featureValue_pb2 import FeatureValue
from feathr.definition.query_feature_list import FeatureQuery
from feathr.definition.settings import ObservationSettings
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.anchor import FeatureAnchor
from feathr.definition.sink import Sink
from feathr.protobuf.featureValue_pb2 import FeatureValue
from feathr.registry.feature_registry import default_registry_client
from feathr.spark_provider._databricks_submission import _FeathrDatabricksJobLauncher
from feathr.spark_provider._localspark_submission import _FeathrLocalSparkJobLauncher
from feathr.spark_provider._synapse_submission import _FeathrSynapseJobLauncher
from feathr.spark_provider.feathr_configurations import SparkExecutionConfiguration
from feathr.udf._preprocessing_pyudf_manager import _PreprocessingPyudfManager
from feathr.utils._envvariableutil import _EnvVaraibleUtil
from feathr.utils._file_utils import write_to_file
from feathr.utils.feature_printer import FeaturePrinter
from feathr.utils.spark_job_params import FeatureJoinJobParams, FeatureGenerationJobParams
from feathr.utils.spark_job_params import FeatureGenerationJobParams, FeatureJoinJobParams


class FeathrClient(object):
Expand Down Expand Up @@ -161,7 +158,7 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir
self._FEATHR_JOB_JAR_PATH = \
self.envutils.get_environment_variable_with_default(
'spark_config', 'local', 'feathr_runtime_location')
self.feathr_spark_launcher = _FeathrDLocalSparkJobLauncher(
self.feathr_spark_launcher = _FeathrLocalSparkJobLauncher(
workspace_path = self.envutils.get_environment_variable_with_default('spark_config', 'local', 'workspace'),
master = self.envutils.get_environment_variable_with_default('spark_config', 'local', 'master')
)
Expand Down Expand Up @@ -354,7 +351,7 @@ def _decode_proto(self, feature_list):
else:
typed_result.append(raw_feature)
return typed_result

def delete_feature_from_redis(self, feature_table, key, feature_name) -> None:
"""
Delete feature from Redis
Expand All @@ -364,7 +361,7 @@ def delete_feature_from_redis(self, feature_table, key, feature_name) -> None:
key: the key of the entity
feature_name: feature name to be deleted
"""

redis_key = self._construct_redis_key(feature_table, key)
if self.redis_client.hexists(redis_key, feature_name):
self.redis_client.delete(redis_key, feature_name)
Expand Down Expand Up @@ -575,20 +572,20 @@ def monitor_features(self, settings: MonitoringSettings, execution_configuration
def _get_feature_key(self, feature_name: str):
features = []
if 'derived_feature_list' in dir(self):
features += self.derived_feature_list
features += self.derived_feature_list
if 'anchor_list' in dir(self):
for anchor in self.anchor_list:
features += anchor.features
features += anchor.features
for feature in features:
if feature.name == feature_name:
keys = feature.key
return set(key.key_column for key in keys)
return set(key.key_column for key in keys)
self.logger.warning(f"Invalid feature name: {feature_name}. Please call FeathrClient.build_features() first in order to materialize the features.")
return None

# Validation on feature keys:
# Features within a set of aggregation or planned to be merged should have same keys
# The param "allow_empty_key" shows if empty keys are acceptable
# The param "allow_empty_key" shows if empty keys are acceptable
def _valid_materialize_keys(self, features: List[str], allow_empty_key=False):
keys = None
for feature in features:
Expand All @@ -611,7 +608,7 @@ def _valid_materialize_keys(self, features: List[str], allow_empty_key=False):
self.logger.error(f"Inconsistent feature keys. Current keys are {str(keys)}")
return False
return True

def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False):
"""Materialize feature data
Expand All @@ -622,7 +619,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
feature_list = settings.feature_names
if len(feature_list) > 0 and not self._valid_materialize_keys(feature_list):
raise RuntimeError(f"Invalid materialization features: {feature_list}, since they have different keys. Currently Feathr only supports materializing features of the same keys.")

# Collect secrets from sinks
secrets = []
for sink in settings.sinks:
Expand All @@ -632,7 +629,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
# produce materialization config
for end in settings.get_backfill_cutoff_time():
settings.backfill_time.end = end
config = _to_materialization_config(settings)
config = _to_materialization_config(settings)
config_file_name = "feature_gen_conf/auto_gen_config_{}.conf".format(end.timestamp())
config_file_path = os.path.join(self.local_workspace_dir, config_file_name)
write_to_file(content=config, full_file_name=config_file_path)
Expand Down Expand Up @@ -854,7 +851,7 @@ def get_features_from_registry(self, project_name: str) -> Dict[str, FeatureBase
feature_dict[feature.name] = feature
for feature in registry_derived_feature_list:
feature_dict[feature.name] = feature
return feature_dict
return feature_dict

def _reshape_config_str(self, config_str:str):
if self.spark_runtime == 'local':
Expand Down
4 changes: 2 additions & 2 deletions feathr_project/feathr/spark_provider/_abc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple

from typing import Any, Dict, List, Optional, Tuple

class SparkJobLauncher(ABC):
"""This is the abstract class for all the spark launchers. All the Spark launcher should implement those interfaces
Expand All @@ -15,7 +15,6 @@ def upload_or_get_cloud_path(self, local_path_or_http_path: str):
"""
pass


@abstractmethod
def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: str, arguments: List[str],
reference_files_path: List[str], job_tags: Dict[str, str] = None,
Expand All @@ -33,6 +32,7 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name:
properties (Dict[str, str]): Additional System Properties for the spark job
"""
pass

@abstractmethod
def wait_for_completion(self, timeout_seconds: Optional[float]) -> bool:
"""Returns true if the job completed successfully
Expand Down
Loading

0 comments on commit c075dc2

Please sign in to comment.