Skip to content

Commit

Permalink
add timeout to influx db (apache#40439)
Browse files Browse the repository at this point in the history
  • Loading branch information
romsharon98 authored Jun 27, 2024
1 parent 92cb6aa commit e4c125f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
35 changes: 24 additions & 11 deletions airflow/providers/influxdb/hooks/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from influxdb_client import InfluxDBClient
from influxdb_client.client.write.point import Point
Expand Down Expand Up @@ -60,10 +60,29 @@ def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None:
self.client = None
self.extras: dict = {}
self.uri = None
self.org_name = None

def get_client(self, uri, token, org_name):
return InfluxDBClient(url=uri, token=token, org=org_name)
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Return connection widgets to add to connection form."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
from wtforms.validators import InputRequired

return {
"token": StringField(
lazy_gettext("Token"), widget=BS3TextFieldWidget(), default="", validators=[InputRequired()]
),
"org": StringField(
lazy_gettext("Organization Name"),
widget=BS3TextFieldWidget(),
default="",
validators=[InputRequired()],
),
}

def get_client(self, uri, kwargs):
return InfluxDBClient(url=uri, **kwargs)

def get_uri(self, conn: Connection):
"""Add additional parameters to the URI based on InfluxDB host requirements."""
Expand All @@ -82,13 +101,7 @@ def get_conn(self) -> InfluxDBClient:
if self.client is not None:
return self.client

token = self.connection.extra_dejson.get("token")
self.org_name = self.connection.extra_dejson.get("org_name")

self.log.info("URI: %s", self.uri)
self.log.info("Organization: %s", self.org_name)

self.client = self.get_client(self.uri, token, self.org_name)
self.client = self.get_client(self.uri, self.extras)

return self.client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,12 @@ Extra (required)
Specify the extra parameters (as json dictionary) that can be used in InfluxDB
connection.

``token``: (required) `Create token <https://docs.influxdata.com/influxdb/cloud/security/tokens/create-token/>`_
using the influxdb cli or UI

``org_name``: (required) `Create org <https://docs.influxdata.com/influxdb/cloud/reference/cli/influx/org/create/>`_
name using influxdb cli or UI

Example "extras" field:

.. code-block:: JSON
{
"token": "343434343423234234234343434",
"org_name": "Test"
"org_name": "Test",
"timeout": 10000
}
11 changes: 7 additions & 4 deletions tests/providers/influxdb/hooks/test_influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,25 @@ def setup_method(self):
self.influxdb_hook = InfluxDBHook()
extra = {}
extra["token"] = "123456789"
extra["org_name"] = "test"
extra["org"] = "test"
extra["timeout"] = 10000

self.connection = Connection(schema="http", host="localhost", extra=extra)

def test_get_conn(self):
@mock.patch("airflow.providers.influxdb.hooks.influxdb.InfluxDBClient")
def test_get_conn(self, influx_db_client):
self.influxdb_hook.get_connection = mock.Mock()
self.influxdb_hook.get_connection.return_value = self.connection

self.influxdb_hook.get_client = mock.Mock()
self.influxdb_hook.get_conn()

assert self.influxdb_hook.org_name == "test"
assert self.influxdb_hook.uri == "http://localhost:7687"

assert self.influxdb_hook.get_connection.return_value.schema == "http"
assert self.influxdb_hook.get_connection.return_value.host == "localhost"
influx_db_client.assert_called_once_with(
url="http://localhost:7687", token="123456789", org="test", timeout=10000
)

assert self.influxdb_hook.get_client is not None

Expand Down

0 comments on commit e4c125f

Please sign in to comment.