Skip to content

Commit

Permalink
Add Telegram hook and operator (apache#11850)
Browse files Browse the repository at this point in the history
closes: apache#11845

Adds:

Telegram Hook
Telegram Operator
  • Loading branch information
rootcss authored Dec 5, 2020
1 parent 252b047 commit cd66450
Show file tree
Hide file tree
Showing 22 changed files with 1,013 additions and 59 deletions.
4 changes: 2 additions & 2 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hiv
jenkins, jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo,
mssql, mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, sftp,
singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, vertica, virtualenv, webhdfs,
winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci
singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv,
webhdfs, winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci

.. END EXTRAS HERE
Expand Down
4 changes: 2 additions & 2 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hiv
jenkins, jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo,
mssql, mysql, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, sftp,
singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, vertica, virtualenv, webhdfs,
winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci
singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, vertica, virtualenv,
webhdfs, winrm, yandex, yandexcloud, zendesk, all, devel, devel_hadoop, doc, devel_all, devel_ci

# END EXTRAS HERE

Expand Down
59 changes: 59 additions & 0 deletions airflow/providers/telegram/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->


# Package apache-airflow-providers-telegram

**Table of contents**

- [Provider package](#provider-package)
- [Installation](#installation)
- [PIP requirements](#pip-requirements)
- [Cross provider package dependencies](#cross-provider-package-dependencies)

## Provider package

This is a provider package for `telegram` provider. All classes for this provider package
are in `airflow.providers.telegram` python package.

## Installation

You can install this package on top of an existing airflow 2.* installation via
`pip install apache-airflow-providers-telegram`

## PIP requirements

| PIP package | Version required |
|:----------------------|:-------------------|
| python-telegram-bot | 13.0 |

## Cross provider package dependencies

Those are dependencies that might be needed in order to use all the features of the package.
You need to install the specified backport providers package in order to use them.

You can install such cross-provider dependencies when installing from PyPI. For example:

```bash
pip install apache-airflow-providers-telegram[http]
```

| Dependent package | Extra |
|:----------------------------------------------------------------------------------------|:--------|
| [apache-airflow-providers-http](https://pypi.org/project/apache-airflow-providers-http) | http |
17 changes: 17 additions & 0 deletions airflow/providers/telegram/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
17 changes: 17 additions & 0 deletions airflow/providers/telegram/example_dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
47 changes: 47 additions & 0 deletions airflow/providers/telegram/example_dags/example_telegram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example use of Telegram operator.
"""

from airflow import DAG
from airflow.providers.telegram.operators.telegram import TelegramOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
}

dag = DAG(
'example_telegram',
default_args=default_args,
start_date=days_ago(2),
tags=['example'],
)

# [START howto_operator_telegram]

send_message_telegram_task = TelegramOperator(
task_id='send_message_telegram',
telegram_conn_id='telegram_conn_id',
chat_id='-3222103937',
text='Hello from Airflow!',
dag=dag,
)

# [END howto_operator_telegram]
17 changes: 17 additions & 0 deletions airflow/providers/telegram/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
152 changes: 152 additions & 0 deletions airflow/providers/telegram/hooks/telegram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Telegram"""
from typing import Optional

import telegram
import tenacity

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


class TelegramHook(BaseHook):
"""
This hook allows you to post messages to Telegram using the telegram python-telegram-bot library.
The library can be found here: https://github.com/python-telegram-bot/python-telegram-bot
It accepts both telegram bot API token directly or connection that has telegram bot API token.
If both supplied, token parameter will be given precedence, otherwise 'password' field in the connection
from telegram_conn_id will be used.
chat_id can also be provided in the connection using 'host' field in connection.
Following is the details of a telegram_connection:
name: 'telegram-connection-name'
conn_type: 'http'
password: 'TELEGRAM_TOKEN'
host: 'chat_id' (optional)
Examples:
.. code-block:: python
# Create hook
telegram_hook = TelegramHook(telegram_conn_id='telegram_default')
# or telegram_hook = TelegramHook(telegram_conn_id='telegram_default', chat_id='-1xxx')
# or telegram_hook = TelegramHook(token='xxx:xxx', chat_id='-1xxx')
# Call method from telegram bot client
telegram_hook.send_message(None', {"text": "message", "chat_id": "-1xxx"})
# or telegram_hook.send_message(None', {"text": "message"})
:param telegram_conn_id: connection that optionally has Telegram API token in the password field
:type telegram_conn_id: str
:param token: optional telegram API token
:type token: str
:param chat_id: optional chat_id of the telegram chat/channel/group
:type chat_id: str
"""

def __init__(
self,
telegram_conn_id: Optional[str] = None,
token: Optional[str] = None,
chat_id: Optional[str] = None,
) -> None:
super().__init__()
self.token = self.__get_token(token, telegram_conn_id)
self.chat_id = self.__get_chat_id(chat_id, telegram_conn_id)
self.connection = self.get_conn()

def get_conn(self) -> telegram.bot.Bot:
"""
Returns the telegram bot client
:return: telegram bot client
:rtype: telegram.bot.Bot
"""
return telegram.bot.Bot(token=self.token)

def __get_token(self, token: Optional[str], telegram_conn_id: str) -> str:
"""
Returns the telegram API token
:param token: telegram API token
:type token: str
:param telegram_conn_id: telegram connection name
:type telegram_conn_id: str
:return: telegram API token
:rtype: str
"""
if token is not None:
return token

if telegram_conn_id is not None:
conn = self.get_connection(telegram_conn_id)

if not conn.password:
raise AirflowException("Missing token(password) in Telegram connection")

return conn.password

raise AirflowException("Cannot get token: No valid Telegram connection supplied.")

def __get_chat_id(self, chat_id: Optional[str], telegram_conn_id: str) -> Optional[str]:
"""
Returns the telegram chat ID for a chat/channel/group
:param chat_id: optional chat ID
:type chat_id: str
:param telegram_conn_id: telegram connection name
:type telegram_conn_id: str
:return: telegram chat ID
:rtype: str
"""
if chat_id is not None:
return chat_id

if telegram_conn_id is not None:
conn = self.get_connection(telegram_conn_id)
return conn.host

return None

@tenacity.retry(
retry=tenacity.retry_if_exception_type(telegram.error.TelegramError),
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_fixed(1),
)
def send_message(self, api_params: dict) -> None:
"""
Sends the message to a telegram channel or chat.
:param api_params: params for telegram_instance.send_message. It can also be used to override chat_id
:type api_params: dict
"""
kwargs = {
"chat_id": self.chat_id,
"parse_mode": telegram.parsemode.ParseMode.HTML,
"disable_web_page_preview": True,
}
kwargs.update(api_params)

if 'text' not in kwargs or kwargs['text'] is None:
raise AirflowException("'text' must be provided for telegram message")

if kwargs['chat_id'] is None:
raise AirflowException("'chat_id' must be provided for telegram message")

response = self.connection.send_message(**kwargs)
self.log.debug(response)
17 changes: 17 additions & 0 deletions airflow/providers/telegram/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit cd66450

Please sign in to comment.