Skip to content

Commit

Permalink
[AIRFLOW-XXX] Fix Minor issues with Azure Cosmos Operator (apache#4289)
Browse files Browse the repository at this point in the history
- Fixed Documentation in integration.rst
- Fixed Incorrect type in docstring of `AzureCosmosInsertDocumentOperator`
- Added the Hook, Sensor and Operator in code.rst
- Updated the name of example DAG and its filename to follow the convention
  • Loading branch information
kaxil authored and Tao Feng committed Dec 8, 2018
1 parent 005e582 commit 4371969
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 224 deletions.
Original file line number Diff line number Diff line change
@@ -1,64 +1,64 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is only an example DAG to highlight usage of AzureCosmosDocumentSensor to detect
if a document now exists.
You can trigger this manually with `airflow trigger_dag example_cosmosdb_sensor`.
*Note: Make sure that connection `azure_cosmos_default` is properly set before running
this example.*
"""

from airflow import DAG
from airflow.contrib.sensors.azure_cosmos_sensor import AzureCosmosDocumentSensor
from airflow.contrib.operators.azure_cosmos_insertdocument_operator import AzureCosmosInsertDocumentOperator
from airflow.utils import dates

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': dates.days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False
}

dag = DAG('example_cosmosdb_sensor', default_args=default_args)

dag.doc_md = __doc__

t1 = AzureCosmosDocumentSensor(
task_id='check_cosmos_file',
database_name='airflow_example_db',
collection_name='airflow_example_coll',
document_id='airflow_checkid',
azure_cosmos_conn_id='azure_cosmos_default',
dag=dag)

t2 = AzureCosmosInsertDocumentOperator(
task_id='insert_cosmos_file',
dag=dag,
database_name='airflow_example_db',
collection_name='new-collection',
document={"id": "someuniqueid", "param1": "value1", "param2": "value2"},
azure_cosmos_conn_id='azure_cosmos_default')

t2.set_upstream(t1)
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is only an example DAG to highlight usage of AzureCosmosDocumentSensor to detect
if a document now exists.
You can trigger this manually with `airflow trigger_dag example_cosmosdb_sensor`.
*Note: Make sure that connection `azure_cosmos_default` is properly set before running
this example.*
"""

from airflow import DAG
from airflow.contrib.sensors.azure_cosmos_sensor import AzureCosmosDocumentSensor
from airflow.contrib.operators.azure_cosmos_operator import AzureCosmosInsertDocumentOperator
from airflow.utils import dates

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': dates.days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False
}

dag = DAG('example_azure_cosmosdb_sensor', default_args=default_args)

dag.doc_md = __doc__

t1 = AzureCosmosDocumentSensor(
task_id='check_cosmos_file',
database_name='airflow_example_db',
collection_name='airflow_example_coll',
document_id='airflow_checkid',
azure_cosmos_conn_id='azure_cosmos_default',
dag=dag)

t2 = AzureCosmosInsertDocumentOperator(
task_id='insert_cosmos_file',
dag=dag,
database_name='airflow_example_db',
collection_name='new-collection',
document={"id": "someuniqueid", "param1": "value1", "param2": "value2"},
azure_cosmos_conn_id='azure_cosmos_default')

t1 >> t2
Original file line number Diff line number Diff line change
@@ -1,69 +1,69 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class AzureCosmosInsertDocumentOperator(BaseOperator):
"""
Inserts a new document into the specified Cosmos database and collection
It will create both the database and collection if they do not already exist
:param database_name: The name of the database. (templated)
:type database_name: str
:param collection_name: The name of the collection. (templated)
:type collection_name: str
:param document: The document to insert
:type document: json
:param azure_cosmos_conn_id: reference to a CosmosDB connection.
:type azure_cosmos_conn_id: str
"""
template_fields = ('database_name', 'collection_name')
ui_color = '#e4f0e8'

@apply_defaults
def __init__(self,
database_name,
collection_name,
document,
azure_cosmos_conn_id='azure_cosmos_default',
*args,
**kwargs):
super(AzureCosmosInsertDocumentOperator, self).__init__(*args, **kwargs)
self.database_name = database_name
self.collection_name = collection_name
self.document = document
self.azure_cosmos_conn_id = azure_cosmos_conn_id

def execute(self, context):
# Create the hook
hook = AzureCosmosDBHook(azure_cosmos_conn_id=self.azure_cosmos_conn_id)

# Create the DB if it doesn't already exist
if (not hook.does_database_exist(self.database_name)):
hook.create_database(self.database_name)

# Create the collection as well
if (not hook.does_collection_exist(self.collection_name, self.database_name)):
hook.create_collection(self.collection_name, self.database_name)

# finally insert the document
hook.upsert_document(self.document, self.database_name, self.collection_name)
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class AzureCosmosInsertDocumentOperator(BaseOperator):
"""
Inserts a new document into the specified Cosmos database and collection
It will create both the database and collection if they do not already exist
:param database_name: The name of the database. (templated)
:type database_name: str
:param collection_name: The name of the collection. (templated)
:type collection_name: str
:param document: The document to insert
:type document: dict
:param azure_cosmos_conn_id: reference to a CosmosDB connection.
:type azure_cosmos_conn_id: str
"""
template_fields = ('database_name', 'collection_name')
ui_color = '#e4f0e8'

@apply_defaults
def __init__(self,
database_name,
collection_name,
document,
azure_cosmos_conn_id='azure_cosmos_default',
*args,
**kwargs):
super(AzureCosmosInsertDocumentOperator, self).__init__(*args, **kwargs)
self.database_name = database_name
self.collection_name = collection_name
self.document = document
self.azure_cosmos_conn_id = azure_cosmos_conn_id

def execute(self, context):
# Create the hook
hook = AzureCosmosDBHook(azure_cosmos_conn_id=self.azure_cosmos_conn_id)

# Create the DB if it doesn't already exist
if not hook.does_database_exist(self.database_name):
hook.create_database(self.database_name)

# Create the collection as well
if not hook.does_collection_exist(self.collection_name, self.database_name):
hook.create_collection(self.collection_name, self.database_name)

# finally insert the document
hook.upsert_document(self.document, self.database_name, self.collection_name)
4 changes: 4 additions & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ Operators
.. autoclass:: airflow.contrib.operators.adls_to_gcs.AdlsToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator
.. autoclass:: airflow.contrib.operators.awsbatch_operator.AWSBatchOperator
.. autoclass:: airflow.contrib.operators.azure_cosmos_operator.AzureCosmosInsertDocumentOperator
.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator
.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator
.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator
Expand Down Expand Up @@ -235,6 +236,7 @@ Sensors

.. autoclass:: airflow.contrib.sensors.aws_athena_sensor.AthenaSensor
.. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor
.. autoclass:: airflow.contrib.sensors.azure_cosmos_sensor.AzureCosmosDocumentSensor
.. autoclass:: airflow.contrib.sensors.bash_sensor.BashSensor
.. autoclass:: airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor
.. autoclass:: airflow.contrib.sensors.cassandra_record_sensor.CassandraRecordSensor
Expand Down Expand Up @@ -417,6 +419,7 @@ Community contributed hooks
.. autoclass:: airflow.contrib.hooks.aws_hook.AwsHook
.. autoclass:: airflow.contrib.hooks.aws_lambda_hook.AwsLambdaHook
.. autoclass:: airflow.contrib.hooks.aws_sns_hook.AwsSnsHook
.. autoclass:: airflow.contrib.hooks.azure_cosmos_hook.AzureCosmosDBHook
.. autoclass:: airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook
.. autoclass:: airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook
.. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook
Expand Down Expand Up @@ -473,3 +476,4 @@ Community-contributed executors
'''''''''''''''''''''''''''''''

.. autoclass:: airflow.contrib.executors.mesos_executor.MesosExecutor
.. autoclass:: airflow.contrib.executors.kubernetes_executor.KubernetesExecutor
14 changes: 7 additions & 7 deletions docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Airflow can be configured to read and write task logs in Azure Blob Storage.
See :ref:`write-logs-azure`.

Azure CosmosDB
''''''''''''''''''
''''''''''''''

AzureCosmosDBHook communicates via the Azure Cosmos library. Make sure that a
Airflow connection of type `azure_cosmos` exists. Authorization can be done by supplying a
Expand All @@ -173,20 +173,20 @@ default database and collection to use (see connection `azure_cosmos_default` fo
- :ref:`AzureCosmosInsertDocumentOperator`: Simple operator to insert document into CosmosDB.
- :ref:`AzureCosmosDocumentSensor`: Simple sensor to detect document existence in CosmosDB.

.. AzureCosmosDBHook:
.. _AzureCosmosDBHook:

AzureCosmosDBHook
"""""""""
"""""""""""""""""

.. autoclass:: airflow.contrib.hooks.azure_cosmos_hook.AzureCosmosDBHook

AzureCosmosInsertDocumentOperator
"""""""""
"""""""""""""""""""""""""""""""""

.. autoclass:: airflow.contrib.operators.azure_cosmos_insertdocument_operator.AzureCosmosInsertDocumentOperator
.. autoclass:: airflow.contrib.operators.azure_cosmos_operator.AzureCosmosInsertDocumentOperator

AzureCosmosDocumentSensor
"""""""""
"""""""""""""""""""""""""

.. autoclass:: airflow.contrib.sensors.azure_cosmos_sensor.AzureCosmosDocumentSensor

Expand Down
Loading

0 comments on commit 4371969

Please sign in to comment.