Skip to content

Commit

Permalink
Separate out snowflake source (feathr-ai#836)
Browse files Browse the repository at this point in the history
* Add more secret manager support

* Add abstract class

* Update feathr-configuration-and-env.md

* Update _envvariableutil.py

* add tests for aws secrets manager

* Update test_secrets_read.py

* fix tests

* Update test_secrets_read.py

* fix test

* Update pull_request_push_test.yml

* get_secrets_update

* move import statement

* update spelling

* update raise exception

* revert

* feature registry hack

* query for uppercase

* add snowflake source

* remove snowflake type

* enableDebugLogger

* add logging

* simple path snowflake fix

* snowflake-update

* fix bugs/log

* get_snowflake_path

* update get_snowflake_path

* remove log

* log

* add logs

* test with path

* update snowflake registry handling

* update source

* remove logs

* update error handling and test

* make lowercase

* remove logging

* Revert "Merge pull request #5 from aabbasi-hbo/secrets-key-test"

This reverts commit 41554b4, reversing
changes made to 6b401de.

* Revert "remove logging"

This reverts commit e01635d.

* Revert "update error handling and test"

This reverts commit e5c200f.

* Revert "query for uppercase"

This reverts commit 0531788.

* Revert "revert"

This reverts commit 87cd083.

* Revert "update raise exception"

This reverts commit 44a3ce0.

* Revert "update spelling"

This reverts commit 07a8cf0.

* Revert "move import statement"

This reverts commit 218123f.

* Revert "get_secrets_update"

This reverts commit 9cb332c.

* Revert "Update pull_request_push_test.yml"

This reverts commit e617b99.

* Revert "fix test"

This reverts commit 8be6a42.

* Revert "Update test_secrets_read.py"

This reverts commit 997a2b1.

* Revert "fix tests"

This reverts commit a6870d9.

* Revert "Update test_secrets_read.py"

This reverts commit aa5fdda.

* Revert "add tests for aws secrets manager"

This reverts commit cdcd612.

* Revert "Update _envvariableutil.py"

This reverts commit f616522.

* Revert "Update feathr-configuration-and-env.md"

This reverts commit 2d6c135.

* Revert "Add abstract class"

This reverts commit e96459a.

* Revert "Add more secret manager support"

This reverts commit c31906c.

* remove extra line

* fix formatting

* Update setup.py

* update python tests

* update scala test

* update tests

* update test

* add test

* update docs

* fix test

* add snowflake guide

* add to NonTimeBasedDataSourceAccessor

* remove registry fixes

* Update source.py

* Update source.py

* Update source.py

* remove print

* Update feathr-snowflake-guide.md

Co-authored-by: Xiaoyong Zhu <[email protected]>
  • Loading branch information
aabbasi-hbo and xiaoyongzhu authored Nov 23, 2022
1 parent 26c14b4 commit c21d89d
Show file tree
Hide file tree
Showing 35 changed files with 501 additions and 116 deletions.
3 changes: 2 additions & 1 deletion docs/how-to-guides/feathr-configuration-and-env.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Feathr will get the configurations in the following order:
| OFFLINE_STORE__SNOWFLAKE__SNOWFLAKE_ENABLED | Configures whether Snowflake as offline store is enabled or not. Available value: "True" or "False". Equivalent to "False" if not set. | Required if using Snowflake as an offline store. |
| OFFLINE_STORE__SNOWFLAKE__URL | Configures the Snowflake URL. Usually it's something like `dqllago-ol19457.snowflakecomputing.com`. | Required if using Snowflake as an offline store. |
| OFFLINE_STORE__SNOWFLAKE__USER | Configures the Snowflake user. | Required if using Snowflake as an offline store. |
| OFFLINE_STORE__SNOWFLAKE__ROLE | Configures the Snowflake role. Usually it's something like `ACCOUNTADMIN`. | Required if using Snowflake as an offline store. |
| OFFLINE_STORE__SNOWFLAKE__ROLE | Configures the Snowflake role. Usually it's something like `ACCOUNTADMIN`. | Required if using Snowflake as an offline store.
| OFFLINE_STORE__SNOWFLAKE__WAREHOUSE | Configures the Snowflake Warehouse. | Required if using Snowflake as an offline store. |
| JDBC_SF_PASSWORD | Configurations for Snowflake password | Required if using Snowflake as an offline store. |
| SPARK_CONFIG__SPARK_CLUSTER | Choice for spark runtime. Currently support: `azure_synapse`, `databricks`. The `databricks` configs will be ignored if `azure_synapse` is set and vice versa. | Required |
| SPARK_CONFIG__SPARK_RESULT_OUTPUT_PARTS | Configure number of parts for the spark output for feature generation job | Required |
Expand Down
38 changes: 38 additions & 0 deletions docs/how-to-guides/feathr-snowflake-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
layout: default
title: Using Snowflake with Feathr
parent: Feathr How-to Guides
---

# Using Snowflake with Feathr

Currently, feathr supports using Snowflake as a source.

# Using Snowflake as a source

To use Snowflake as a source, we need to create a `SnowflakeSource` in projects.

```
source = feathr.SnowflakeSource(name: str, database: str, schema: str, dbtable: optional[str], query: Optional[str])
```

* `name` is the source name, same as other sources.
* `database` is SF database that stores the table of interest
* `schema` is SF schema that stores the table of interest
* `dbtable` or `query`, `dbtable` is the table name in the database and `query` is a SQL `SELECT` statement, only one of them should be specified at the same time.

For more information on how Snowflake uses Databases and Schemas to organize data, please refer to [Snowflake Datatabase and Schema](https://docs.snowflake.com/en/sql-reference/ddl-database.html)

There are some other parameters such as `preprocessing`, they're same as other sources like `HdfsSource`.

After creating the `SnowflakeSource`, you can use it in the same way as other kinds of sources.

# Specifying Snowflake Source in Observation Settings

`ObservationSettings` requires an observation path. In order to generate the snowflake path, feathr exposes client functionality that exposes the same arguments as SnowflakeSource.

To generate snowflake path to pass into `ObservationSettings`, we need to call `client.get_snowflake_path()` functionality.

```
observation_path = client.get_snowflake_path(database: str, schema: str, dbtable: Optional[str], query: Optional[str])
```
1 change: 1 addition & 0 deletions docs/samples/customer360/Customer360.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" spark_cluster: 'databricks'\n",
" spark_result_output_parts: '1'\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" # choice for spark runtime. Currently support: azure_synapse, databricks\n",
" # The `databricks` configs will be ignored if `azure_synapse` is set and vice versa.\n",
Expand Down Expand Up @@ -1417,7 +1418,7 @@
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3.8.10 ('logistics')",
"display_name": "Python 3.9.14 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -1431,11 +1432,11 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.9.14"
},
"vscode": {
"interpreter": {
"hash": "6d25d3d1f1809ed0384c3d8e0cd4f1df57fe7bb936ead67f035c6ff1494f4e23"
"hash": "a665b5d41d17b532ea9890333293a1b812fa0b73c9c25c950b3cedf1bebd0438"
}
}
},
Expand Down
7 changes: 4 additions & 3 deletions docs/samples/fraud_detection_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" spark_cluster: 'azure_synapse'\n",
" spark_result_output_parts: '1'\n",
Expand Down Expand Up @@ -997,7 +998,7 @@
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3.10.4 64-bit",
"display_name": "Python 3.9.14 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -1011,12 +1012,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
"version": "3.9.14"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "6eea572ac5b43246b7c51fa33510c93fb6df4c34b515a6e4994c858f44841967"
"hash": "a665b5d41d17b532ea9890333293a1b812fa0b73c9c25c950b3cedf1bebd0438"
}
}
},
Expand Down
7 changes: 4 additions & 3 deletions docs/samples/nyc_taxi_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
" url: \"dqllago-ol19457.snowflakecomputing.com\"\n",
" user: \"feathrintegration\"\n",
" role: \"ACCOUNTADMIN\"\n",
" warehouse: \"COMPUTE_WH\"\n",
"spark_config:\n",
" spark_cluster: 'azure_synapse'\n",
" spark_result_output_parts: '1'\n",
Expand Down Expand Up @@ -693,7 +694,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.10.8 64-bit",
"display_name": "Python 3.9.14 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -707,11 +708,11 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
"version": "3.9.14"
},
"vscode": {
"interpreter": {
"hash": "b0fa6594d8f4cbf19f97940f81e996739fb7646882a419484c72d19e05852a7e"
"hash": "a665b5d41d17b532ea9890333293a1b812fa0b73c9c25c950b3cedf1bebd0438"
}
}
},
Expand Down
1 change: 1 addition & 0 deletions docs/samples/product_recommendation_demo_advanced.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" spark_cluster: 'azure_synapse'\n",
" spark_result_output_parts: '1'\n",
Expand Down
1 change: 1 addition & 0 deletions feathr_project/feathr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
'Source',
'InputContext',
'HdfsSource',
'SnowflakeSource',
'KafkaConfig',
'KafKaSource',
'ValueType',
Expand Down
18 changes: 17 additions & 1 deletion feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
# Pretty print anchor_list
if verbose and self.anchor_list:
FeaturePrinter.pretty_print_anchors(self.anchor_list)

def get_snowflake_path(self, database: str, schema: str, dbtable: str = None, query: str = None) -> str:
"""
Returns snowflake path given dataset location information.
Either dbtable or query must be specified but not both.
"""
if dbtable is not None and query is not None:
raise RuntimeError("Both dbtable and query are specified. Can only specify one..")
if dbtable is None and query is None:
raise RuntimeError("One of dbtable or query must be specified..")
if dbtable:
return f"snowflake://snowflake_account/?sfDatabase={database}&sfSchema={schema}&dbtable={dbtable}"
else:
return f"snowflake://snowflake_account/?sfDatabase={database}&sfSchema={schema}&query={query}"

def list_registered_features(self, project_name: str = None) -> List[str]:
"""List all the already registered features under the given project.
Expand Down Expand Up @@ -836,14 +850,16 @@ def _get_snowflake_config_str(self):
sf_url = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'url')
sf_user = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'user')
sf_role = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'role')
sf_warehouse = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'warehouse')
sf_password = self.envutils.get_environment_variable('JDBC_SF_PASSWORD')
# HOCCON format will be parsed by the Feathr job
config_str = """
JDBC_SF_URL: {JDBC_SF_URL}
JDBC_SF_USER: {JDBC_SF_USER}
JDBC_SF_ROLE: {JDBC_SF_ROLE}
JDBC_SF_WAREHOUSE: {JDBC_SF_WAREHOUSE}
JDBC_SF_PASSWORD: {JDBC_SF_PASSWORD}
""".format(JDBC_SF_URL=sf_url, JDBC_SF_USER=sf_user, JDBC_SF_PASSWORD=sf_password, JDBC_SF_ROLE=sf_role)
""".format(JDBC_SF_URL=sf_url, JDBC_SF_USER=sf_user, JDBC_SF_PASSWORD=sf_password, JDBC_SF_ROLE=sf_role, JDBC_SF_WAREHOUSE=sf_warehouse)
return self._reshape_config_str(config_str)

def _get_kafka_config_str(self):
Expand Down
86 changes: 86 additions & 0 deletions feathr_project/feathr/definition/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from jinja2 import Template
from loguru import logger
from urllib.parse import urlparse, parse_qs
import json


Expand Down Expand Up @@ -150,6 +151,91 @@ def __str__(self):
def to_argument(self):
return self.path

class SnowflakeSource(Source):
"""
A data source for Snowflake
Attributes:
name (str): name of the source
database (str): Snowflake Database
schema (str): Snowflake Schema
dbtable (Optional[str]): Snowflake Table
query (Optional[str]): Query instead of snowflake table
Either one of query or dbtable must be specified but not both.
preprocessing (Optional[Callable]): A preprocessing python function that transforms the source data for further feature transformation.
event_timestamp_column (Optional[str]): The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column.
timestamp_format (Optional[str], optional): The format of the timestamp field. Defaults to "epoch". Possible values are:
- `epoch` (seconds since epoch), for example `1647737463`
- `epoch_millis` (milliseconds since epoch), for example `1647737517761`
- Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
registry_tags: A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {"deprecated": "true"} to indicate this source is deprecated, etc.
"""
def __init__(self, name: str, database: str, schema: str, dbtable: Optional[str] = None, query: Optional[str] = None, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None) -> None:
super().__init__(name, event_timestamp_column,
timestamp_format, registry_tags=registry_tags)
self.preprocessing=preprocessing
if dbtable is not None and query is not None:
raise RuntimeError("Both dbtable and query are specified. Can only specify one..")
if dbtable is None and query is None:
raise RuntimeError("One of dbtable or query must be specified..")
if dbtable is not None:
self.dbtable = dbtable
if query is not None:
self.query = query
self.database = database
self.schema = schema
self.path = self._get_snowflake_path(dbtable, query)

def _get_snowflake_path(self, dbtable: Optional[str] = None, query: Optional[str] = None) -> str:
"""
Returns snowflake path for registry.
"""
if dbtable:
return f"snowflake://snowflake_account/?sfDatabase={self.database}&sfSchema={self.schema}&dbtable={dbtable}"
else:
return f"snowflake://snowflake_account/?sfDatabase={self.database}&sfSchema={self.schema}&query={query}"

def parse_snowflake_path(url: str) -> Dict[str, str]:
"""
Parses snowflake path into dictionary of components for registry.
"""
parse_result = urlparse(url)
parsed_queries = parse_qs(parse_result.query)
updated_dict = {key: parsed_queries[key][0] for key in parsed_queries}
return updated_dict

def to_feature_config(self) -> str:
tm = Template("""
{{source.name}}: {
type: SNOWFLAKE
location: {
type: "snowflake"
{% if source.dbtable is defined %}
dbtable: "{{source.dbtable}}"
{% endif %}
{% if source.query is defined %}
query: "{{source.query}}"
{% endif %}
database: "{{source.database}}"
schema: "{{source.schema}}"
}
{% if source.event_timestamp_column %}
timeWindowParameters: {
timestampColumn: "{{source.event_timestamp_column}}"
timestampColumnFormat: "{{source.timestamp_format}}"
}
{% endif %}
}
""")
msg = tm.render(source=self)
return msg

def __str__(self):
return str(self.preprocessing) + '\n' + self.to_feature_config()

def to_argument(self):
return self.path

class JdbcSource(Source):
def __init__(self, name: str, url: str = "", dbtable: Optional[str] = None, query: Optional[str] = None, auth: Optional[str] = None, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None) -> None:
super().__init__(name, event_timestamp_column, timestamp_format, registry_tags)
Expand Down
23 changes: 22 additions & 1 deletion feathr_project/feathr/registry/_feathr_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from feathr.definition.feature import Feature, FeatureBase
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.repo_definitions import RepoDefinitions
from feathr.definition.source import GenericSource, HdfsSource, InputContext, JdbcSource, Source
from feathr.definition.source import GenericSource, HdfsSource, InputContext, JdbcSource, SnowflakeSource, Source
from feathr.definition.transformation import ExpressionTransformation, Transformation, WindowAggTransformation
from feathr.definition.typed_key import TypedKey
from feathr.registry.feature_registry import FeathrRegistry
Expand Down Expand Up @@ -397,6 +397,12 @@ def source_to_def(v: Source) -> dict:
"type": urlparse(v.path).scheme,
"path": v.path,
}
elif isinstance(v, SnowflakeSource):
ret = {
"name": v.name,
"type": "SNOWFLAKE",
"path": v.path,
}
elif isinstance(v, JdbcSource):
ret = {
"name": v.name,
Expand Down Expand Up @@ -446,6 +452,21 @@ def dict_to_source(v: dict) -> Source:
timestamp_format=v["attributes"].get(
"timestampFormat"),
registry_tags=v["attributes"].get("tags", {}))
elif type == "SNOWFLAKE":
snowflake_path = v["attributes"]["path"]
snowflake_parameters = SnowflakeSource.parse_snowflake_path(snowflake_path)
source = SnowflakeSource(name=v["attributes"]["name"],
dbtable=snowflake_parameters.get("dbtable", None),
query=snowflake_parameters.get("query", None),
database=snowflake_parameters["sfDatabase"],
schema=snowflake_parameters["sfSchema"],
preprocessing=_correct_function_indentation(
v["attributes"].get("preprocessing")),
event_timestamp_column=v["attributes"].get(
"eventTimestampColumn"),
timestamp_format=v["attributes"].get(
"timestampFormat"),
registry_tags=v["attributes"].get("tags", {}))
elif type == "generic":
options = v["attributes"].copy()
# These are not options
Expand Down
Loading

0 comments on commit c21d89d

Please sign in to comment.