Skip to content

Commit

Permalink
Add provider for Apache Kafka (apache#30175)
Browse files Browse the repository at this point in the history
* Add provider for Apache Kafka

Pulls in a series of integrations to Kafka from airflow-provider-kafka (https://pypi.org/project/airflow-provider-kafka/) to core airflow.

---------

Co-authored-by: Tamara Janina Fingerlin <[email protected]>
Co-authored-by: Josh Fell <[email protected]>
  • Loading branch information
3 people authored Apr 21, 2023
1 parent 7e01c09 commit 522661b
Show file tree
Hide file tree
Showing 80 changed files with 3,621 additions and 307 deletions.
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ body:
- apache-hdfs
- apache-hive
- apache-impala
- apache-kafka
- apache-kylin
- apache-livy
- apache-pig
Expand Down Expand Up @@ -66,6 +67,7 @@ body:
- influxdb
- jdbc
- jenkins
- apache-kafka
- microsoft-azure
- microsoft-mssql
- microsoft-psrp
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,11 @@ jobs:
breeze testing integration-tests --integration trino --integration kerberos
breeze stop
if: needs.build-info.outputs.runs-on != 'self-hosted'
- name: "Integration Tests Postgres: Kafka"
run: |
breeze testing integration-tests --integration kafka
breeze stop
if: needs.build-info.outputs.runs-on != 'self-hosted'
- name: "Integration Tests Postgres: all-testable"
run: breeze testing integration-tests --integration all-testable
if: needs.build-info.outputs.runs-on == 'self-hosted'
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ repos:
^docs/README.rst$|
^docs/apache-airflow-providers-amazon/secrets-backends/aws-ssm-parameter-store.rst$|
^docs/apache-airflow-providers-apache-hdfs/connections.rst$|
^docs/apache-airflow-providers-apache-kafka/connections/kafka.rst$|
^docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst$|
^docs/apache-airflow-providers-microsoft-azure/connections/azure_cosmos.rst$|
^docs/conf.py$|
Expand Down
24 changes: 12 additions & 12 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -610,18 +610,18 @@ This is the full list of those extras:

.. START EXTRAS HERE
aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kylin,
apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana,
async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel, devel_all,
devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch, exasol,
facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc, hashicorp, hdfs,
hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram,
trino, vertica, virtualenv, webhdfs, winrm, zendesk
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant,
cncf.kubernetes, common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel,
devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch,
exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb,
microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc,
openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot,
plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid,
sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk
.. END EXTRAS HERE
Provider packages
Expand Down
24 changes: 12 additions & 12 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ The list of available extras:

# START EXTRAS HERE
aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kylin,
apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana,
async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel, devel_all,
devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch, exasol,
facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc, hashicorp, hdfs,
hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram,
trino, vertica, virtualenv, webhdfs, winrm, zendesk
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, cloudant,
cncf.kubernetes, common.sql, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api, devel,
devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, druid, elasticsearch,
exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, ldap, leveldb,
microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc,
openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill, password, pinot,
plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid,
sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau, tabular,
telegram, trino, vertica, virtualenv, webhdfs, winrm, zendesk
# END EXTRAS HERE

# For installing Airflow in development environments - see CONTRIBUTING.rst
Expand Down
1 change: 1 addition & 0 deletions airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"gcp",
"gmp",
"google",
"kafka",
"protocol",
"service",
"software",
Expand Down
25 changes: 25 additions & 0 deletions airflow/providers/apache/kafka/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Changelog
---------

1.0.0
.....

Initial version of the provider.
17 changes: 17 additions & 0 deletions airflow/providers/apache/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/apache/kafka/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
79 changes: 79 additions & 0 deletions airflow/providers/apache/kafka/hooks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any

from confluent_kafka.admin import AdminClient

from airflow.compat.functools import cached_property
from airflow.hooks.base import BaseHook


class KafkaBaseHook(BaseHook):
"""
A base hook for interacting with Apache Kafka
:param kafka_config_id: The connection object to use, defaults to "kafka_default"
"""

conn_name_attr = "kafka_config_id"
default_conn_name = "kafka_default"
conn_type = "kafka"
hook_name = "Apache Kafka"

def __init__(self, kafka_config_id=default_conn_name, *args, **kwargs):
"""Initialize our Base"""
super().__init__()
self.kafka_config_id = kafka_config_id
self.get_conn

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["schema", "login", "password", "port", "host"],
"relabeling": {"extra": "Config Dict"},
"placeholders": {
"extra": '{"bootstrap.servers": "localhost:9092"}',
},
}

def _get_client(self, config):
raise NotImplementedError

@cached_property
def get_conn(self) -> Any:
"""get the configuration object"""
config = self.get_connection(self.kafka_config_id).extra_dejson

if not (config.get("bootstrap.servers", None)):
raise ValueError("config['bootstrap.servers'] must be provided.")

return self._get_client(config)

def test_connection(self) -> tuple[bool, str]:
"""Test Connectivity from the UI"""
try:
config = self.get_connection(self.kafka_config_id).extra_dejson
t = AdminClient(config, timeout=10).list_topics()
if t:
return True, "Connection successful."
except Exception as e:
False, str(e)

return False, "Failed to establish connection."
63 changes: 63 additions & 0 deletions airflow/providers/apache/kafka/hooks/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any, Sequence

from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook


class KafkaAdminClientHook(KafkaBaseHook):
"""
A hook for interacting with the Kafka Cluster
:param kafka_config_id: The connection object to use, defaults to "kafka_default"
"""

def __init__(self, kafka_config_id=KafkaBaseHook.default_conn_name) -> None:
super().__init__(kafka_config_id=kafka_config_id)

def _get_client(self, config) -> AdminClient:
return AdminClient(config)

def create_topic(
self,
topics: Sequence[Sequence[Any]],
) -> None:
"""creates a topic
:param topics: a list of topics to create including the number of partitions for the topic
and the replication factor. Format: [ ("topic_name", number of partitions, replication factor)]
"""
admin_client = self.get_conn

new_topics = [NewTopic(t[0], num_partitions=t[1], replication_factor=t[2]) for t in topics]

futures = admin_client.create_topics(new_topics)

for t, f in futures.items():
try:
f.result()
self.log.info("The topic %s has been created.", t)
except KafkaException as e:
if e.args[0].name == "TOPIC_ALREADY_EXISTS":
self.log.warning("The topic %s already exists.", t)
else:
raise
47 changes: 47 additions & 0 deletions airflow/providers/apache/kafka/hooks/consume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Sequence

from confluent_kafka import Consumer

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook


class KafkaConsumerHook(KafkaBaseHook):
"""
A hook for creating a Kafka Consumer
:param kafka_config_id: The connection object to use, defaults to "kafka_default"
:param topics: A list of topics to subscribe to.
"""

def __init__(self, topics: Sequence[str], kafka_config_id=KafkaBaseHook.default_conn_name) -> None:

super().__init__(kafka_config_id=kafka_config_id)
self.topics = topics

def _get_client(self, config) -> Consumer:
return Consumer(config)

def get_consumer(self) -> Consumer:
"""Returns a Consumer that has been subscribed to topics."""
consumer = self.get_conn
consumer.subscribe(self.topics)

return consumer
Loading

0 comments on commit 522661b

Please sign in to comment.