Skip to content

Commit

Permalink
Python: Add configuration (apache#5488)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Aug 19, 2022
1 parent a060bec commit 460124e
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 230 deletions.
101 changes: 98 additions & 3 deletions python/pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,92 @@

from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Callable

from pyiceberg.exceptions import NotInstalledError
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.typedef import (
EMPTY_DICT,
Identifier,
Properties,
RecursiveDict,
)
from pyiceberg.utils.config import Config, merge_config

logger = logging.getLogger(__name__)

_ENV_CONFIG = Config()

TYPE = "type"


class CatalogType(Enum):
REST = "rest"
HIVE = "hive"


def load_rest(name: str, conf: Properties) -> Catalog:
from pyiceberg.catalog.rest import RestCatalog

return RestCatalog(name, **conf)


def load_hive(name: str, conf: Properties) -> Catalog:
try:
from pyiceberg.catalog.hive import HiveCatalog

return HiveCatalog(name, **conf)
except ImportError as exc:
raise NotInstalledError("Apache Hive support not installed: pip install 'pyiceberg[hive]'") from exc


AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
}


def infer_catalog_type(catalog_properties: RecursiveDict) -> CatalogType | None:
"""Tries to infer the type based on the dict
Args:
catalog_properties: Catalog properties
Returns:
The inferred type based on the provided properties
"""
if uri := catalog_properties.get("uri"):
if isinstance(uri, str):
if uri.startswith("http"):
return CatalogType.REST
elif uri.startswith("thrift"):
return CatalogType.HIVE
return None


def load_catalog(name: str, **properties: str | None) -> Catalog:
env = _ENV_CONFIG.get_catalog_config(name)
conf = merge_config(env or {}, properties)

if provided_catalog_type := conf.get(TYPE):
catalog_type = CatalogType[provided_catalog_type.upper()]
else:
if inferred_catalog_type := infer_catalog_type(conf):
catalog_type = inferred_catalog_type
else:
raise ValueError(f"Invalid configuration. Could not determine the catalog type: {properties}")

if catalog_type:
return AVAILABLE_CATALOGS[catalog_type](name, conf)

raise ValueError(f"Could not initialize catalog with the following properties: {properties}")


@dataclass
Expand All @@ -48,13 +126,30 @@ class Catalog(ABC):
properties (Properties): Catalog properties
"""

name: str | None
name: str
properties: Properties

def __init__(self, name: str | None, **properties: str):
def __init__(self, name: str, **properties: str):
self.name = name
self.properties = properties

def property(self, key: str) -> str:
"""Returns a property from the properties variable. If it doesn't exist, it will raise an error.
Args:
key: The key of the property
Returns: The value of the property
Raises:
ValueError: When the property cannot be found, with a pointer on how to set the property.
"""
if key not in self.properties:
raise ValueError(
f"{type(self).__name__} expects an {key} property. Please set in config or using environment variable PYICEBERG_CATALOG__{self.name.upper()}__{key.upper()}"
)
return self.properties[key]

@abstractmethod
def create_table(
self,
Expand Down
4 changes: 2 additions & 2 deletions python/pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ def identifier_to_database_and_table(

return tuple_identifier[0], tuple_identifier[1]

def __init__(self, name: str, uri: str, **properties: str):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(uri)
self._client = _HiveClient(self.property("uri"))

def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
properties: Dict[str, str] = table.parameters
Expand Down
29 changes: 9 additions & 20 deletions python/pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,11 @@ class OAuthErrorResponse(IcebergBaseModel):

class RestCatalog(Catalog):
token: Optional[str]
config: Properties

uri: str

def __init__(
self,
name: Optional[str],
uri: str,
credential: Optional[str] = None,
token: Optional[str] = None,
name: str,
**properties: str,
):
"""Rest Catalog
Expand All @@ -182,21 +177,15 @@ def __init__(
Args:
name: Name to identify the catalog
uri: The base-url of the REST Catalog endpoint
credential: The credentials for authentication against the client
token: The bearer token
properties: Properties that are passed along to the configuration
"""
self.uri = uri
if credential:
self.token = self._fetch_access_token(credential)
elif token:
self.token = token
else:
self.token = None
self.config = self._fetch_config(properties)
super().__init__(name, **properties)

self.uri = self.property("uri")
if credential := self.properties.get("credential"):
properties["token"] = self._fetch_access_token(credential)
super().__init__(name, **self._fetch_config(properties))

def _check_valid_namespace_identifier(self, identifier: Union[str, Identifier]) -> Identifier:
"""The identifier should have at least one element"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
Expand All @@ -210,8 +199,8 @@ def headers(self) -> Properties:
"Content-type": "application/json",
"X-Client-Version": __version__,
}
if self.token:
headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {self.token}"
if token := self.properties.get("token"):
headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {token}"
return headers

def url(self, endpoint: str, prefixed: bool = True, **kwargs) -> str:
Expand All @@ -229,7 +218,7 @@ def url(self, endpoint: str, prefixed: bool = True, **kwargs) -> str:
url = url + "v1/" if url.endswith("/") else url + "/v1/"

if prefixed:
url += self.config.get(PREFIX, "")
url += self.properties.get(PREFIX, "")
url = url if url.endswith("/") else url + "/"

return url + endpoint.format(**kwargs)
Expand Down
40 changes: 17 additions & 23 deletions python/pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=broad-except,redefined-builtin,redefined-outer-name
import os
from functools import wraps
from typing import (
Dict,
Expand All @@ -28,7 +27,7 @@
import click
from click import Context

from pyiceberg.catalog import Catalog
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
Expand All @@ -55,41 +54,36 @@ def wrapper(*args, **kwargs):


@click.group()
@click.option("--catalog", default=None)
@click.option("--catalog", default="default")
@click.option("--output", type=click.Choice(["text", "json"]), default="text")
@click.option("--uri")
@click.option("--credential")
@click.pass_context
def run(ctx: Context, catalog: Optional[str], output: str, uri: Optional[str], credential: Optional[str]):
uri_env_var = "PYICEBERG_URI"
credential_env_var = "PYICEBERG_CREDENTIAL"

if not uri:
uri = os.environ.get(uri_env_var)
if not credential:
credential = os.environ.get(credential_env_var)
def run(ctx: Context, catalog: str, output: str, uri: Optional[str], credential: Optional[str]):
properties = {}
if uri:
properties["uri"] = uri
if credential:
properties["credential"] = credential

ctx.ensure_object(dict)
if output == "text":
ctx.obj["output"] = ConsoleOutput()
else:
ctx.obj["output"] = JsonOutput()

if not uri:
ctx.obj["output"].exception(
ValueError(f"Missing uri. Please provide using --uri or using environment variable {uri_env_var}")
)
try:
try:
ctx.obj["catalog"] = load_catalog(catalog, **properties)
except ValueError as exc:
raise ValueError(
f"URI missing, please provide using --uri, the config or environment variable PYICEBERG_CATALOG__{catalog.upper()}__URI"
) from exc
except Exception as e:
ctx.obj["output"].exception(e)
ctx.exit(1)

assert uri # for mypy

for scheme, catalog_type in SUPPORTED_CATALOGS.items():
if uri.startswith(scheme):
ctx.obj["catalog"] = catalog_type(catalog, uri=uri, credential=credential) # type: ignore
break

if not isinstance(ctx.obj["catalog"], Catalog):

ctx.obj["output"].exception(
ValueError("Could not determine catalog type from uri. REST (http/https) and Hive (thrift) is supported")
)
Expand Down
4 changes: 4 additions & 0 deletions python/pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ class OAuthError(RESTError):

class NoSuchPropertyException(Exception):
"""When a property is missing"""


class NotInstalledError(Exception):
"""When an optional dependency is not installed"""
8 changes: 7 additions & 1 deletion python/pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
# specific language governing permissions and limitations
# under the License.

from typing import Any, Dict, Tuple
from typing import (
Any,
Dict,
Tuple,
Union,
)


class FrozenDict(Dict):
Expand All @@ -30,3 +35,4 @@ def update(self, *args: Any, **kwargs: Any) -> None:

Identifier = Tuple[str, ...]
Properties = Dict[str, str]
RecursiveDict = Dict[str, Union[str, "RecursiveDict"]] # type: ignore
Loading

0 comments on commit 460124e

Please sign in to comment.