Skip to content

Commit

Permalink
Refactor Slack API Hook and add Connection (apache#25852)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis authored Aug 31, 2022
1 parent 57a6843 commit 214873c
Show file tree
Hide file tree
Showing 11 changed files with 801 additions and 112 deletions.
228 changes: 210 additions & 18 deletions airflow/providers/slack/hooks/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,48 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Slack"""
from typing import Any, Optional

import json
import warnings
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from slack_sdk import WebClient
from slack_sdk.web.slack_response import SlackResponse
from slack_sdk.errors import SlackApiError

from airflow.exceptions import AirflowException
from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.providers.slack.utils import ConnectionExtraConfig, prefixed_extra_field
from airflow.utils.log.secrets_masker import mask_secret

if TYPE_CHECKING:
from slack_sdk.http_retry import RetryHandler
from slack_sdk.web.slack_response import SlackResponse


class SlackHook(BaseHook):
"""
Creates a Slack connection to be used for calls.
Creates a Slack API Connection to be used for calls.
This class provide a thin wrapper around the ``slack_sdk.WebClient``.
.. seealso::
- :ref:`Slack API connection <howto/connection:slack>`
- https://api.slack.com/messaging
- https://slack.dev/python-slack-sdk/web/index.html
.. warning::
This hook intend to use `Slack API` connection
and might not work correctly with `Slack Webhook` and `HTTP` connections.
Takes both Slack API token directly and connection that has Slack API token. If both are
supplied, Slack API token will be used. Also exposes the rest of slack.WebClient args.
Examples:
.. code-block:: python
.. code-block:: python
# Create hook
slack_hook = SlackHook(token="xxx") # or slack_hook = SlackHook(slack_conn_id="slack")
slack_hook = SlackHook(slack_conn_id="slack_api_default")
# Call generic API with parameters (errors are handled by hook)
# For more details check https://api.slack.com/methods/chat.postMessage
Expand All @@ -45,28 +66,124 @@ class SlackHook(BaseHook):
# For more details check https://slack.dev/python-slack-sdk/web/index.html#messaging
slack_hook.client.chat_postMessage(channel="#random", text="Hello world!")
:param token: Slack API token
:param slack_conn_id: :ref:`Slack connection id <howto/connection:slack>`
that has Slack API token in the password field.
:param use_session: A boolean specifying if the client should take advantage of
connection pooling. Default is True.
:param base_url: A string representing the Slack API base URL. Default is
``https://www.slack.com/api/``
:param timeout: The maximum number of seconds the client will wait
to connect and receive a response from Slack. Default is 30 seconds.
:param timeout: The maximum number of seconds the client will wait to connect
and receive a response from Slack. If not set than default WebClient value will use.
:param base_url: A string representing the Slack API base URL.
If not set than default WebClient BASE_URL will use (``https://www.slack.com/api/``).
:param proxy: Proxy to make the Slack Incoming Webhook call.
:param retry_handlers: List of handlers to customize retry logic in WebClient.
:param token: (deprecated) Slack API Token.
"""

conn_name_attr = 'slack_conn_id'
default_conn_name = 'slack_api_default'
conn_type = 'slack'
hook_name = 'Slack API'

def __init__(
self,
token: Optional[str] = None,
slack_conn_id: Optional[str] = None,
**client_args: Any,
base_url: Optional[str] = None,
timeout: Optional[int] = None,
proxy: Optional[str] = None,
retry_handlers: Optional[List["RetryHandler"]] = None,
**extra_client_args: Any,
) -> None:
if not token and not slack_conn_id:
raise AirflowException("Either `slack_conn_id` or `token` should be provided.")
if token:
mask_secret(token)
warnings.warn(
"Provide token as hook argument deprecated by security reason and will be removed "
"in a future releases. Please specify token in `Slack API` connection.",
DeprecationWarning,
stacklevel=2,
)
if not slack_conn_id:
warnings.warn(
"You have not set parameter `slack_conn_id`. Currently `Slack API` connection id optional "
"but in a future release it will mandatory.",
FutureWarning,
stacklevel=2,
)

super().__init__()
self.token = self.__get_token(token, slack_conn_id)
self.client = WebClient(self.token, **client_args)
self._token = token
self.slack_conn_id = slack_conn_id
self.base_url = base_url
self.timeout = timeout
self.proxy = proxy
self.retry_handlers = retry_handlers
self.extra_client_args = extra_client_args
if self.extra_client_args.pop("use_session", None) is not None:
warnings.warn("`use_session` has no affect in slack_sdk.WebClient.", UserWarning, stacklevel=2)

@cached_property
def client(self) -> WebClient:
"""Get the underlying slack_sdk.WebClient (cached)."""
return WebClient(**self._get_conn_params())

def get_conn(self) -> WebClient:
"""Get the underlying slack_sdk.WebClient (cached)."""
return self.client

def _get_conn_params(self) -> Dict[str, Any]:
"""Fetch connection params as a dict and merge it with hook parameters."""
conn = self.get_connection(self.slack_conn_id) if self.slack_conn_id else None
conn_params: Dict[str, Any] = {}

if self._token:
conn_params["token"] = self._token
elif conn:
if not conn.password:
raise AirflowNotFoundException(
f"Connection ID {self.slack_conn_id!r} does not contain password (Slack API Token)."
)
conn_params["token"] = conn.password

extra_config = ConnectionExtraConfig(
conn_type=self.conn_type,
conn_id=conn.conn_id if conn else None,
extra=conn.extra_dejson if conn else {},
)

# Merge Hook parameters with Connection config
conn_params.update(
{
"timeout": self.timeout or extra_config.getint("timeout", default=None),
"base_url": self.base_url or extra_config.get("base_url", default=None),
"proxy": self.proxy or extra_config.get("proxy", default=None),
"retry_handlers": (
self.retry_handlers or extra_config.getimports("retry_handlers", default=None)
),
}
)

# Add additional client args
conn_params.update(self.extra_client_args)
if "logger" not in conn_params:
conn_params["logger"] = self.log

return {k: v for k, v in conn_params.items() if v is not None}

@cached_property
def token(self) -> str:
warnings.warn(
"`SlackHook.token` property deprecated and will be removed in a future releases.",
DeprecationWarning,
stacklevel=2,
)
return self._get_conn_params()["token"]

def __get_token(self, token: Any, slack_conn_id: Any) -> str:
warnings.warn(
"`SlackHook.__get_token` method deprecated and will be removed in a future releases.",
DeprecationWarning,
stacklevel=2,
)
if token is not None:
return token

Expand All @@ -79,7 +196,7 @@ def __get_token(self, token: Any, slack_conn_id: Any) -> str:

raise AirflowException('Cannot get token: No valid Slack token nor slack_conn_id supplied.')

def call(self, api_method: str, **kwargs) -> SlackResponse:
def call(self, api_method: str, **kwargs) -> "SlackResponse":
"""
Calls Slack WebClient `WebClient.api_call` with given arguments.
Expand All @@ -95,3 +212,78 @@ def call(self, api_method: str, **kwargs) -> SlackResponse:
iterated on to execute subsequent requests.
"""
return self.client.api_call(api_method, **kwargs)

def test_connection(self):
"""Tests the Slack API connection.
.. seealso::
https://api.slack.com/methods/auth.test
"""
try:
response = self.call("auth.test")
response.validate()
except SlackApiError as e:
return False, str(e)
except Exception as e:
return False, f"Unknown error occurred while testing connection: {e}"

if isinstance(response.data, bytes):
# If response data binary then return simple message
return True, f"Connection successfully tested (url: {response.api_url})."

try:
return True, json.dumps(response.data)
except TypeError:
return True, str(response)

@classmethod
def get_connection_form_widgets(cls) -> Dict[str, Any]:
"""Returns dictionary of widgets to be added for the hook to handle extra values."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import IntegerField, StringField

return {
prefixed_extra_field("timeout", cls.conn_type): IntegerField(
lazy_gettext("Timeout"),
widget=BS3TextFieldWidget(),
description="Optional. The maximum number of seconds the client will wait to connect "
"and receive a response from Slack API.",
),
prefixed_extra_field("base_url", cls.conn_type): StringField(
lazy_gettext('Base URL'),
widget=BS3TextFieldWidget(),
description="Optional. A string representing the Slack API base URL.",
),
prefixed_extra_field("proxy", cls.conn_type): StringField(
lazy_gettext('Proxy'),
widget=BS3TextFieldWidget(),
description="Optional. Proxy to make the Slack API call.",
),
prefixed_extra_field("retry_handlers", cls.conn_type): StringField(
lazy_gettext('Retry Handlers'),
widget=BS3TextFieldWidget(),
description="Optional. Comma separated list of import paths to zero-argument callable "
"which returns retry handler for Slack WebClient.",
),
}

@classmethod
def get_ui_field_behaviour(cls) -> Dict[str, Any]:
"""Returns custom field behaviour."""
return {
"hidden_fields": ["login", "port", "host", "schema", "extra"],
"relabeling": {
"password": "Slack API Token",
},
"placeholders": {
"password": "xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx",
prefixed_extra_field("timeout", cls.conn_type): "30",
prefixed_extra_field("base_url", cls.conn_type): "https://www.slack.com/api/",
prefixed_extra_field("proxy", cls.conn_type): "http://localhost:9000",
prefixed_extra_field("retry_handlers", cls.conn_type): (
"slack_sdk.http_retry.builtin_handlers.ConnectionErrorRetryHandler,"
"slack_sdk.http_retry.builtin_handlers.RateLimitErrorRetryHandler"
),
},
}
4 changes: 4 additions & 0 deletions airflow/providers/slack/hooks/slack_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class SlackWebhookHook(HttpHook):
If both supplied, http_conn_id will be used as base_url,
and webhook_token will be taken as endpoint, the relative path of the url.
.. warning::
This hook intend to use `Slack Webhook` connection
and might not work correctly with `Slack API` connection.
Each Slack webhook token can be pre-configured to use a specific channel, username and
icon. You can override these defaults in this hook.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/slack/operators/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SlackAPIOperator(BaseOperator):
In the future additional Slack API Operators will be derived from this class as well.
Only one of `slack_conn_id` and `token` is required.
:param slack_conn_id: :ref:`Slack connection id <howto/connection:slack>`
:param slack_conn_id: :ref:`Slack API Connection <howto/connection:slack>`
which its password is Slack API token. Optional
:param token: Slack API token (https://api.slack.com/web). Optional
:param method: The Slack API Method to Call (https://api.slack.com/methods). Optional
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/slack/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,7 @@ transfers:
how-to-guide: /docs/apache-airflow-providers-slack/operators/sql_to_slack.rst

connection-types:
- hook-class-name: airflow.providers.slack.hooks.slack.SlackHook
connection-type: slack
- hook-class-name: airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook
connection-type: slackwebhook
Loading

0 comments on commit 214873c

Please sign in to comment.