Skip to content

Commit

Permalink
Add a new hook for google datastore
Browse files Browse the repository at this point in the history
Also included, adding a new connection type for google datastore,
and slightly correcting the gc base hook to reflect that SignedJwtAssertionCredentials initializer accepts a string or an iterable of strings for scope(s).
  • Loading branch information
mtagle committed Feb 24, 2016
1 parent 6729a9b commit 2fe8f18
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 1 deletion.
104 changes: 104 additions & 0 deletions airflow/contrib/hooks/datastore_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from apiclient.discovery import build
from airflow.contrib.hooks.gc_base_hook import GoogleCloudBaseHook

class DatastoreHook(GoogleCloudBaseHook):
"""
Interact with Google Cloud Datastore. Connections must be defined with an
extras JSON field containing:
{
"project": "<google project ID>",
"service_account": "<google service account email>",
"key_path": "<p12 key path>"
}
If you have used ``gcloud auth`` to authenticate on the machine that's
running Airflow, you can exclude the service_account and key_path
parameters.
"""

conn_name_attr = 'datastore_conn_id'

def __init__(self,
scope=['https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/userinfo.email'],
datastore_conn_id='google_cloud_datastore_default',
delegate_to=None):
super(DatastoreHook, self).__init__(scope, datastore_conn_id, delegate_to)
# datasetId is the same as the project name
self.datasetId = self._extras_dejson().get('project')

def get_conn(self):
"""
Returns a Google Cloud Storage service object.
"""
http_authorized = self._authorize()
return build('datastore', 'v1beta2', http=http_authorized)

def allocate_ids(self, partialKeys):
"""
Allocate IDs for incomplete keys.
see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/allocateIds
:param partialKeys: a list of partial keys
:return: a list of full keys.
"""
resp = self.get_conn().datasets().allocateIds(datasetId=self.datasetId, body={'keys': partialKeys}).execute()
return resp['keys']

def begin_transaction(self):
"""
Get a new transaction handle
see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/beginTransaction
:return: a transaction handle
"""
resp = self.get_conn().datasets().beginTransaction(datasetId=self.datasetId, body={}).execute()
return resp['transaction']

def commit(self, body):
"""
Commit a transaction, optionally creating, deleting or modifying some entities.
see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/commit
:param body: the body of the commit request
:return: the response body of the commit request
"""
resp = self.get_conn().datasets().commit(datasetId=self.datasetId, body=body).execute()
return resp

def lookup(self, keys, read_consistency=None, transaction=None):
"""
Lookup some entities by key
see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/lookup
:param keys: the keys to lookup
:param read_consistency: the read consistency to use. default, strong or eventual.
Cannot be used with a transaction.
:param transaction: the transaction to use, if any.
:return: the response body of the lookup request.
"""
body = {'keys': keys}
if read_consistency:
body['readConsistency'] = read_consistency
if transaction:
body['transaction'] = transaction
return self.get_conn().datasets().lookup(datasetId=self.datasetId, body=body).execute()

def rollback(self, transaction):
"""
Roll back a transaction
see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/rollback
:param transaction: the transaction to roll back
"""
self.get_conn().datasets().rollback(datasetId=self.datasetId, body={'transaction': transaction})\
.execute()

def run_query(self, body):
"""
Run a query for entities.
see https://cloud.google.com/datastore/docs/apis/v1beta2/datasets/runQuery
:param body: the body of the query request
:return: the batch of query results.
"""
resp = self.get_conn().datasets().runQuery(datasetId=self.datasetId, body=body).execute()
return resp['batch']
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/gc_base_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class GoogleCloudBaseHook(BaseHook):
def __init__(self, scope, conn_id, delegate_to=None):
"""
:param scope: The scope of the hook.
:type scope: string
:type scope: string or an iterable of strings.
:param conn_id: The connection ID to use when fetching connection info.
:type conn_id: string
:param delegate_to: The account to impersonate, if any.
Expand Down
1 change: 1 addition & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
form_choices = {
'conn_type': [
('bigquery', 'BigQuery',),
('datastore', 'Google Datastore'),
('ftp', 'FTP',),
('google_cloud_storage', 'Google Cloud Storage'),
('hdfs', 'HDFS',),
Expand Down

0 comments on commit 2fe8f18

Please sign in to comment.