Skip to content

Commit

Permalink
AIRFLOW-21 upgrade GCP client lib
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvanboxel committed May 5, 2016
1 parent aeb5a07 commit b7f0245
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 61 deletions.
16 changes: 3 additions & 13 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
import time

from airflow.contrib.hooks.gc_base_hook import GoogleCloudBaseHook
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.hooks.dbapi_hook import DbApiHook
from apiclient.discovery import build
from pandas.io.gbq import GbqConnector, \
Expand All @@ -35,18 +35,8 @@

class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
"""
Interact with BigQuery. Connections must be defined with an extras JSON
field containing:
{
"project": "<google project ID>",
"service_account": "<google service account email>",
"key_path": "<p12 key path>"
}
If you have used ``gcloud auth`` to authenticate on the machine that's
running Airflow, you can exclude the service_account and key_path
parameters.
Interact with BigQuery. This hook uses the Google Cloud Platform
connection.
"""
conn_name_attr = 'bigquery_conn_id'

Expand Down
16 changes: 3 additions & 13 deletions airflow/contrib/hooks/datastore_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,12 @@
#

from apiclient.discovery import build
from airflow.contrib.hooks.gc_base_hook import GoogleCloudBaseHook
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

class DatastoreHook(GoogleCloudBaseHook):
"""
Interact with Google Cloud Datastore. Connections must be defined with an
extras JSON field containing:
{
"project": "<google project ID>",
"service_account": "<google service account email>",
"key_path": "<p12 key path>"
}
If you have used ``gcloud auth`` to authenticate on the machine that's
running Airflow, you can exclude the service_account and key_path
parameters.
Interact with Google Cloud Datastore. This hook uses the Google Cloud Platform
connection.
This object is not threads safe. If you want to make multiple requests
simultaniously, you will need to create a hook per thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# limitations under the License.
#

import httplib2
import logging

from airflow.hooks.base_hook import BaseHook
import httplib2
from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials

from airflow.exceptions import AirflowException
from oauth2client.client import SignedJwtAssertionCredentials, GoogleCredentials
from airflow.hooks.base_hook import BaseHook


class GoogleCloudBaseHook(BaseHook):
"""
Expand All @@ -29,14 +32,25 @@ class GoogleCloudBaseHook(BaseHook):
for a Google cloud service.
The class also contains some miscellaneous helper functions.
All hook derived from this base hook use the 'Google Cloud Platform' connection
type. Two ways of authentication are supported:
Default credentials: Only specify 'Project Id'. Then you need to have executed
``gcloud auth`` on the Airflow worker machine.
JSON key file: Specify 'Project Id', 'Key Path' and 'Scope'.
Legacy P12 key files are not supported.
"""

def __init__(self, conn_id, delegate_to=None):
"""
:param conn_id: The connection ID to use when fetching connection info.
:type conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide delegation enabled.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: string
"""
self.conn_id = conn_id
Expand All @@ -48,27 +62,30 @@ def _authorize(self):
Returns an authorized HTTP object to be used to build a Google cloud
service hook connection.
"""
service_account = self._get_field('service_account', False)
key_path = self._get_field('key_path', False)
scope = self._get_field('scope', False)

kwargs = {}
if self.delegate_to:
kwargs['sub'] = self.delegate_to

if not key_path or not service_account:
logging.info('Getting connection using `gcloud auth` user, since no service_account/key_path are defined for hook.')
if not key_path:
logging.info('Getting connection using `gcloud auth` user, since no key file '
'is defined for hook.')
credentials = GoogleCredentials.get_application_default()
elif self.scope:
with open(key_path, 'rb') as key_file:
key = key_file.read()
credentials = SignedJwtAssertionCredentials(
service_account,
key,
scope=self.scope,
**kwargs)
else:
raise AirflowException('Scope undefined, or either key_path/service_account config was missing.')
if not scope:
raise AirflowException('Scope should be defined when using a key file.')
scopes = [s.strip() for s in scope.split(',')]
if key_path.endswith('.json'):
logging.info('Getting connection using a JSON key file.')
credentials = ServiceAccountCredentials\
.from_json_keyfile_name(key_path, scopes)
elif key_path.endswith('.p12'):
raise AirflowException('Legacy P12 key file are not supported, '
'use a JSON key file.')
else:
raise AirflowException('Unrecognised extension for key file.')

http = httplib2.Http()
return credentials.authorize(http)
Expand All @@ -85,3 +102,7 @@ def _get_field(self, f, default=None):
return self.extras[long_f]
else:
return default

@property
def project_id(self):
return self._get_field('project')
17 changes: 3 additions & 14 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging

from airflow.contrib.hooks.gc_base_hook import GoogleCloudBaseHook
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from apiclient.discovery import build
from apiclient.http import MediaFileUpload

Expand All @@ -24,19 +24,8 @@

class GoogleCloudStorageHook(GoogleCloudBaseHook):
"""
Interact with Google Cloud Storage. Connections must be defined with an
extras JSON field containing:
::
{
"project": "<google project ID>",
"service_account": "<google service account email>",
"key_path": "<p12 key path>"
}
If you have used ``gcloud auth`` to authenticate on the machine that's
running Airflow, you can exclude the service_account and key_path
parameters.
Interact with Google Cloud Storage. This hook uses the Google Cloud Platform
connection.
"""

def __init__(self,
Expand Down
4 changes: 1 addition & 3 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2193,7 +2193,6 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
'extra__jdbc__drv_clsname',
'extra__google_cloud_platform__project',
'extra__google_cloud_platform__key_path',
'extra__google_cloud_platform__service_account',
'extra__google_cloud_platform__scope',
)
verbose_name = "Connection"
Expand All @@ -2213,9 +2212,8 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
form_extra_fields = {
'extra__jdbc__drv_path' : StringField('Driver Path'),
'extra__jdbc__drv_clsname': StringField('Driver Class'),
'extra__google_cloud_platform__project': StringField('Project'),
'extra__google_cloud_platform__project': StringField('Project Id'),
'extra__google_cloud_platform__key_path': StringField('Keyfile Path'),
'extra__google_cloud_platform__service_account': StringField('Service Account'),
'extra__google_cloud_platform__scope': StringField('Scopes (comma seperated)'),

}
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def run(self):
druid = ['pydruid>=0.2.1']
gcp_api = [
'httplib2',
'google-api-python-client<=1.4.2',
'oauth2client>=1.5.2, <2.0.0',
'google-api-python-client>=1.5.0, <1.6.0',
'oauth2client>=2.0.2, <2.1.0',
'PyOpenSSL',
]
hdfs = ['snakebite>=2.7.8']
Expand Down

0 comments on commit b7f0245

Please sign in to comment.