Skip to content

Commit

Permalink
[AIRFLOW-588] Add Google Cloud Storage Object sensor[]
Browse files Browse the repository at this point in the history
The Cloud Storage sensor will check for the
existence if an object in
 a bucket. It will wait till the object exists
before continuing.

Closes apache#1849 from alexvanboxel/feature/airflow-588
-gcs-sensor
  • Loading branch information
alexvanboxel authored and criccomini committed Oct 25, 2016
1 parent c49d0b3 commit 61370fb
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
29 changes: 27 additions & 2 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import logging

from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from apiclient.discovery import build
from apiclient.http import MediaFileUpload
from googleapiclient import errors

from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

logging.getLogger("google_cloud_storage").setLevel(logging.INFO)

Expand All @@ -31,7 +33,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
def __init__(self,
google_cloud_storage_conn_id='google_cloud_storage_default',
delegate_to=None):
super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id, delegate_to)
super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id,
delegate_to)

def get_conn(self):
"""
Expand Down Expand Up @@ -84,3 +87,25 @@ def upload(self, bucket, object, filename, mime_type='application/octet-stream')
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
.execute()

def exists(self, bucket, object):
"""
Checks for the existence of a file in Google Cloud Storage.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
:param object: The name of the object to check in the Google cloud
storage bucket.
:type object: string
"""
service = self.get_conn()
try:
service \
.objects() \
.get(bucket=bucket, object=object) \
.execute()
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
raise
64 changes: 64 additions & 0 deletions airflow/contrib/sensors/gcs_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class GoogleCloudStorageObjectSensor(BaseSensorOperator):
"""
Checks for the existence of a file in Google Cloud Storage.
"""
template_fields = ('bucket', 'object')
ui_color = '#f0eee4'

@apply_defaults
def __init__(
self,
bucket,
object,
google_cloud_conn_id='google_cloud_storage_default',
delegate_to=None,
*args,
**kwargs):
"""
Create a new GoogleCloudStorageDownloadOperator.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
:param object: The name of the object to check in the Google cloud
storage bucket.
:type object: string
:param google_cloud_storage_conn_id: The connection ID to use when
connecting to Google cloud storage.
:type google_cloud_storage_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide delegation enabled.
:type delegate_to: string
"""
super(GoogleCloudStorageObjectSensor, self).__init__(*args, **kwargs)
self.bucket = bucket
self.object = object
self.google_cloud_conn_id = google_cloud_conn_id
self.delegate_to = delegate_to

def poke(self, context):
logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
return hook.exists(self.bucket, self.object)

0 comments on commit 61370fb

Please sign in to comment.