Skip to content

Commit

Permalink
Add Google leveldb hook and operator (apache#13109) (apache#14105)
Browse files Browse the repository at this point in the history
* Add Google leveldb hook (apache#13109)

* Add write_batch, options for DB creation(comparator and other) plus fixes (apache#13109)

* Fix some static checks, add docs (apache#13109)

* Apply suggestions from code review

Co-authored-by: RosterIn <[email protected]>

* Fix some otger static checks and docs (apache#13109)

* Fix tests and some build-docs checks (apache#13109)

* Apply suggestions from code review

Co-authored-by: RosterIn <[email protected]>

* Fix build-docs checks (apache#13109)

* Fix build-docs checks (apache#13109)

* fixup! Fix build-docs checks (apache#13109)

* Rewrite example dag as in google package (apache#13109)

* Add extra options in operator, fix docstrings (apache#13109)

* Update airflow/providers/google/leveldb/operators/leveldb.py

Co-authored-by: Ephraim Anierobi <[email protected]>

* Add system testing and docstrings (apache#13109)

* Fix comparator place in spelling wordlist(apache#13109)

Co-authored-by: RosterIn <[email protected]>
Co-authored-by: Kamil Bregula <[email protected]>
Co-authored-by: Ephraim Anierobi <[email protected]>
  • Loading branch information
4 people authored Mar 1, 2021
1 parent e273366 commit 35c9a90
Show file tree
Hide file tree
Showing 22 changed files with 723 additions and 0 deletions.
16 changes: 16 additions & 0 deletions airflow/providers/google/leveldb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
17 changes: 17 additions & 0 deletions airflow/providers/google/leveldb/example_dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
52 changes: 52 additions & 0 deletions airflow/providers/google/leveldb/example_dags/example_leveldb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example use of LevelDB operators.
"""

from airflow import models
from airflow.providers.google.leveldb.operators.leveldb import LevelDBOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
}

with models.DAG(
'example_leveldb',
default_args=default_args,
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
) as dag:
# [START howto_operator_leveldb_get_key]
get_key_leveldb_task = LevelDBOperator(
task_id='get_key_leveldb', leveldb_conn_id='leveldb_default', command='get', key=b'key', dag=dag
)
# [END howto_operator_leveldb_get_key]
# [START howto_operator_leveldb_put_key]
put_key_leveldb_task = LevelDBOperator(
task_id='put_key_leveldb',
leveldb_conn_id='leveldb_default',
command='put',
key=b'another_key',
value=b'another_value',
dag=dag,
)
# [END howto_operator_leveldb_put_key]
get_key_leveldb_task >> put_key_leveldb_task
16 changes: 16 additions & 0 deletions airflow/providers/google/leveldb/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
151 changes: 151 additions & 0 deletions airflow/providers/google/leveldb/hooks/leveldb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Level DB"""
from typing import List, Optional

import plyvel
from plyvel import DB

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook


class LevelDBHookException(AirflowException):
"""Exception specific for LevelDB"""


class LevelDBHook(BaseHook):
"""
Plyvel Wrapper to Interact With LevelDB Database
`LevelDB Connection Documentation <https://plyvel.readthedocs.io/en/latest/>`__
"""

conn_name_attr = 'leveldb_conn_id'
default_conn_name = 'leveldb_default'
conn_type = 'leveldb'
hook_name = 'LevelDB'

def __init__(self, leveldb_conn_id: str = default_conn_name):
super().__init__()
self.leveldb_conn_id = leveldb_conn_id
self.connection = self.get_connection(leveldb_conn_id)
self.db = None

def get_conn(self, name: str = '/tmp/testdb/', create_if_missing: bool = False, **kwargs) -> DB:
"""
Creates `Plyvel DB <https://plyvel.readthedocs.io/en/latest/api.html#DB>`__
:param name: path to create database e.g. `/tmp/testdb/`)
:type name: str
:param create_if_missing: whether a new database should be created if needed
:type create_if_missing: bool
:param kwargs: other options of creation plyvel.DB. See more in the link above.
:type kwargs: Dict[str, Any]
:returns: DB
:rtype: plyvel.DB
"""
if self.db is not None:
return self.db
self.db = plyvel.DB(name=name, create_if_missing=create_if_missing, **kwargs)
return self.db

def close_conn(self) -> None:
"""Closes connection"""
db = self.db
if db is not None:
db.close()
self.db = None

def run(
self,
command: str,
key: bytes,
value: bytes = None,
keys: List[bytes] = None,
values: List[bytes] = None,
) -> Optional[bytes]:
"""
Execute operation with leveldb
:param command: command of plyvel(python wrap for leveldb) for DB object e.g.
``"put"``, ``"get"``, ``"delete"``, ``"write_batch"``.
:type command: str
:param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``)
:type value: bytes
:param keys: keys for command(write_batch) execution(List[bytes], e.g. ``[b'key', b'another-key'])``
:type keys: List[bytes]
:param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']``
:type values: List[bytes]
:returns: value from get or None
:rtype: Optional[bytes]
"""
if command == 'put':
return self.put(key, value)
elif command == 'get':
return self.get(key)
elif command == 'delete':
return self.delete(key)
elif command == 'write_batch':
return self.write_batch(keys, values)
else:
raise LevelDBHookException("Unknown command for LevelDB hook")

def put(self, key: bytes, value: bytes):
"""
Put a single value into a leveldb db by key
:param key: key for put execution, e.g. ``b'key'``, ``b'another-key'``
:type key: bytes
:param value: value for put execution e.g. ``b'value'``, ``b'another-value'``
:type value: bytes
"""
self.db.put(key, value)

def get(self, key: bytes) -> bytes:
"""
Get a single value into a leveldb db by key
:param key: key for get execution, e.g. ``b'key'``, ``b'another-key'``
:type key: bytes
:returns: value of key from db.get
:rtype: bytes
"""
return self.db.get(key)

def delete(self, key: bytes):
"""
Delete a single value in a leveldb db by key.
:param key: key for delete execution, e.g. ``b'key'``, ``b'another-key'``
:type key: bytes
"""
self.db.delete(key)

def write_batch(self, keys: List[bytes], values: List[bytes]):
"""
Write batch of values in a leveldb db by keys
:param keys: keys for write_batch execution e.g. ``[b'key', b'another-key']``
:type keys: List[bytes]
:param values: values for write_batch execution e.g. ``[b'value', b'another-value']``
:type values: List[bytes]
"""
with self.db.write_batch() as batch:
for i, key in enumerate(keys):
batch.put(key, values[i])
16 changes: 16 additions & 0 deletions airflow/providers/google/leveldb/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
100 changes: 100 additions & 0 deletions airflow/providers/google/leveldb/operators/leveldb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, List, Optional

from airflow.models import BaseOperator
from airflow.providers.google.leveldb.hooks.leveldb import LevelDBHook
from airflow.utils.decorators import apply_defaults


class LevelDBOperator(BaseOperator):
"""
Execute command in LevelDB
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:LevelDBOperator`
:param command: command of plyvel(python wrap for leveldb) for DB object e.g.
``"put"``, ``"get"``, ``"delete"``, ``"write_batch"``.
:type command: str
:param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``)
:type key: bytes
:param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``)
:type value: bytes
:param keys: keys for command(write_batch) execution(List[bytes], e.g. ``[b'key', b'another-key'])``
:type keys: List[bytes]
:param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']``
:type values: List[bytes]
:param leveldb_conn_id:
:type leveldb_conn_id: str
:param create_if_missing: whether a new database should be created if needed
:type create_if_missing: bool
:param create_db_extra_options: extra options of creation LevelDBOperator. See more in the link below
`Plyvel DB <https://plyvel.readthedocs.io/en/latest/api.html#DB>`__
:type create_db_extra_options: Optional[Dict[str, Any]]
"""

@apply_defaults
def __init__(
self,
*,
command: str,
key: bytes,
value: bytes = None,
keys: List[bytes] = None,
values: List[bytes] = None,
leveldb_conn_id: str = 'leveldb_default',
name: str = '/tmp/testdb/',
create_if_missing: bool = True,
create_db_extra_options: Optional[Dict[str, Any]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.command = command
self.key = key
self.value = value
self.keys = keys
self.values = values
self.leveldb_conn_id = leveldb_conn_id
self.name = name
self.create_if_missing = create_if_missing
self.create_db_extra_options = create_db_extra_options or {}

def execute(self, context) -> Optional[str]:
"""
Execute command in LevelDB
:returns: value from get(str, not bytes, to prevent error in json.dumps in serialize_value in xcom.py)
or None(Optional[str])
:rtype: Optional[str]
"""
leveldb_hook = LevelDBHook(leveldb_conn_id=self.leveldb_conn_id)
leveldb_hook.get_conn(
name=self.name, create_if_missing=self.create_if_missing, **self.create_db_extra_options
)
value = leveldb_hook.run(
command=self.command,
key=self.key,
value=self.value,
keys=self.keys,
values=self.values,
)
self.log.info("Done. Returned value was: %s", str(value))
leveldb_hook.close_conn()
value = value if value is None else value.decode()
return value
Loading

0 comments on commit 35c9a90

Please sign in to comment.