Skip to content

Commit

Permalink
Influxdb Hook (apache#17068)
Browse files Browse the repository at this point in the history
* Added transfer operator for bigquery to mssql and tests

* Update airflow/providers/google/cloud/transfers/bigquery_to_mssql.py

Co-authored-by: Tomek Urbaszek <[email protected]>

* Update airflow/providers/google/cloud/transfers/bigquery_to_mssql.py

Code change based on review comments.

Co-authored-by: Tomek Urbaszek <[email protected]>

* Added comment in bigquery to mssql transfer

* Added influx operator files

* Delete bigquery_to_mssql.py

* Update test_bigquery_to_bigquery.py

* Delete test_bigquery_to_mssql.py

* [14168] Added functions to create organization, bucket and run query to influxdb.

* [14168] Added file that was accidentally deleted.

* [14168] Added support to write Point.

* [14168] Added Influx provider support

* [14168] Added Influx provider support

* [14168] Added test case for InfluxDBHook

* [14168] Added test case for InfluxDBHook

* [14168] Fixed test case for InfluxDBHook

* [14168] Fixed test case for InfluxDBHook

* [14168] Added example DAG for influxDBHook

* [14168] Changed org_id to org_name for clarity.

* [14168] Removed README, fixed influxDBHook class name in provider.yml

* [14168] Static code check fixes.

* [14168] fixed license header influxdb.rst for connections

* [14168] fixed flak tests and order of requirements - influxdb

* [14168] Renamed provider.yaml from provider.yml

* [14168] fixed imports in influxdb.py

* [14168] Renamed function for passing flak8 tests.

* [14168] Fixed influxDB connection rst

* [14168] Fixed order in CONTRIBUTING.rst and spelling_wordlist.txt

* [14168] ADded operators/influxdb.rst to satisfy pre commit check

* [14168] fixed docs error influxdb

* [14168] fixed example DAG path for influxDB

* Update airflow/providers/influxdb/hooks/influxdb.py

Co-authored-by: Tzu-ping Chung <[email protected]>

* Rebase from master

* Fixed PR review comments

* Fixed influxdb documentation

* Fixed documentation

* Added empty doc for InfluxDB operator to workaround build-docs error

* Fixed table for influxdb dependency

* Fixed connection-type in influxdb provider

* Added init to example_dags

* Added influxdb to airflow_providers_bug_report

* Added missing init file in influxdb hook

* Added missing init in tests/influxdb

* Removed link in toc tree for influxdb

* Moved influxdb tests from operators to hooks, added tests

* Added more tests for influxdb hook

* Added commits.rst and installing-providers-from-sources.rst for influxdb

* Added commits to toctree of index

* Added pandas dependency to influxdb

Co-authored-by: Kanthi <[email protected]>
Co-authored-by: Tomek Urbaszek <[email protected]>
Co-authored-by: Kanthi Subramanian <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
5 people authored Sep 28, 2021
1 parent 24e57d6 commit c747bce
Show file tree
Hide file tree
Showing 21 changed files with 635 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ body:
- hashicorp
- http
- imap
- influxdb
- jdbc
- jenkins
- jira
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cas
cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel,
devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol,
facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive,
http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
http, imap, influxdb, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
opsgenie, oracle, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, qds,
qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
Expand Down
2 changes: 1 addition & 1 deletion INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cas
cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel,
devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol,
facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive,
http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
http, imap, influxdb, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
opsgenie, oracle, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, qds,
qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
Expand Down
26 changes: 26 additions & 0 deletions airflow/providers/influxdb/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@


.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Changelog
---------

1.0.0
.....

Initial version of the provider.
16 changes: 16 additions & 0 deletions airflow/providers/influxdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/influxdb/example_dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
57 changes: 57 additions & 0 deletions airflow/providers/influxdb/example_dags/example_influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.influxdb.hooks.influxdb import InfluxDBHook
from airflow.utils.dates import days_ago


def test_influxdb_hook():
bucket_name = 'test-influx'
influxdb_hook = InfluxDBHook("influxdb_default")
client = influxdb_hook.get_conn()
print(client)
print(f"Organization name {influxdb_hook.org_name}")

# Make sure enough permissions to create bucket.
influxdb_hook.create_bucket(bucket_name, "Bucket to test influxdb connection", influxdb_hook.org_name)
influxdb_hook.write(bucket_name, "test_point", "location", "Prague", "temperature", 25.3, True)

tables = influxdb_hook.query('from(bucket:"test-influx") |> range(start: -10m)')

for table in tables:
print(table)
for record in table.records:
print(record.values)

bucket_id = influxdb_hook.find_bucket_id_by_name(bucket_name)
print(bucket_id)
# Delete bucket takes bucket id.
influxdb_hook.delete_bucket(bucket_name)


with DAG(
dag_id='influxdb_example_dag',
schedule_interval=None,
start_date=days_ago(2),
max_active_runs=1,
tags=['example'],
) as dag:
influxdb_task = PythonOperator(task_id="influxdb_task", python_callable=test_influxdb_hook)

influxdb_task
16 changes: 16 additions & 0 deletions airflow/providers/influxdb/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
163 changes: 163 additions & 0 deletions airflow/providers/influxdb/hooks/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""This module allows to connect to a InfluxDB database."""

from typing import List

import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.flux_table import FluxTable
from influxdb_client.client.write.point import Point
from influxdb_client.client.write_api import SYNCHRONOUS

from airflow.hooks.base import BaseHook
from airflow.models import Connection


class InfluxDBHook(BaseHook):
"""
Interact with InfluxDB.
Performs a connection to InfluxDB and retrieves client.
:param influxdb_conn_id: Reference to :ref:`Influxdb connection id <howto/connection:influxdb>`.
:type influxdb_conn_id: str
"""

conn_name_attr = 'influxdb_conn_id'
default_conn_name = 'influxdb_default'
conn_type = 'influxdb'
hook_name = 'Influxdb'

def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.influxdb_conn_id = conn_id
self.connection = kwargs.pop("connection", None)
self.client = None
self.extras = None
self.uri = None
self.org_name = None

def get_client(self, uri, token, org_name):
return InfluxDBClient(url=uri, token=token, org=org_name)

def get_uri(self, conn: Connection):
"""
Function to add additional parameters to the URI
based on SSL or other InfluxDB host requirements
"""
return '{scheme}://{host}:{port}'.format(
scheme='https' if conn.schema is None else f'{conn.schema}',
host=conn.host,
port='7687' if conn.port is None else f'{conn.port}',
)

def get_conn(self) -> InfluxDBClient:
"""
Function that initiates a new InfluxDB connection
with token and organization name
"""
self.connection = self.get_connection(self.influxdb_conn_id)
self.extras = self.connection.extra_dejson.copy()

self.uri = self.get_uri(self.connection)
self.log.info('URI: %s', self.uri)

if self.client is not None:
return self.client

token = self.connection.extra_dejson.get('token')
self.org_name = self.connection.extra_dejson.get('org_name')

self.log.info('URI: %s', self.uri)
self.log.info('Organization: %s', self.org_name)

self.client = self.get_client(self.uri, token, self.org_name)

return self.client

def query(self, query) -> List[FluxTable]:
"""
Function to to run the query.
Note: The bucket name
should be included in the query
:param query: InfluxDB query
:return: List
"""
client = self.get_conn()

query_api = client.query_api()
return query_api.query(query)

def query_to_df(self, query) -> pd.DataFrame:
"""
Function to run the query and
return a pandas dataframe
Note: The bucket name
should be included in the query
:param query: InfluxDB query
:return: pd.DataFrame
"""
client = self.get_conn()

query_api = client.query_api()
return query_api.query_data_frame(query)

def write(self, bucket_name, point_name, tag_name, tag_value, field_name, field_value, synchronous=False):
"""
Writes a Point to the bucket specified.
Example: Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
"""
# By defaults its Batching
if synchronous:
write_api = self.client.write_api(write_options=SYNCHRONOUS)
else:
write_api = self.client.write_api()

p = Point(point_name).tag(tag_name, tag_value).field(field_name, field_value)

write_api.write(bucket=bucket_name, record=p)

def create_organization(self, name):
"""Function to create a new organization"""
return self.client.organizations_api().create_organization(name=name)

def delete_organization(self, org_id):
"""Function to delete organization by organization id"""
return self.client.organizations_api().delete_organization(org_id=org_id)

def create_bucket(self, bucket_name, description, org_id, retention_rules=None):
"""Function to create a bucket for an organization"""
return self.client.buckets_api().create_bucket(
bucket_name=bucket_name, description=description, org_id=org_id, retention_rules=None
)

def find_bucket_id_by_name(self, bucket_name):
"""Function to get bucket id by name."""
bucket = self.client.buckets_api().find_bucket_by_name(bucket_name)

return "" if bucket is None else bucket.id

def delete_bucket(self, bucket_name):
"""Function to delete bucket by bucket name."""
bucket = self.find_bucket_id_by_name(bucket_name)
return self.client.buckets_api().delete_bucket(bucket)
39 changes: 39 additions & 0 deletions airflow/providers/influxdb/provider.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
package-name: apache-airflow-providers-influxdb
name: Influxdb
description: |
`InfluxDB <https://www.influxdata.com/>`__
versions:
- 1.0.0
integrations:
- integration-name: Influxdb
external-doc-url: https://www.influxdata.com/
how-to-guide:
- /docs/apache-airflow-providers-influxdb/operators/influxdb.rst
tags: [software]

hooks:
- integration-name: Influxdb
python-modules:
- airflow.providers.influxdb.hooks.influxdb

connection-types:
- hook-class-name: airflow.providers.influxdb.hooks.influxdb.InfluxDBHook
connection-type: influxdb
21 changes: 21 additions & 0 deletions docs/apache-airflow-providers-influxdb/commits.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Package apache-airflow-providers-influxdb
------------------------------------------------------
Loading

0 comments on commit c747bce

Please sign in to comment.