forked from influxdata/influxdb-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection_check.py
61 lines (51 loc) · 1.92 KB
/
connection_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""
How to check that connection credentials are suitable for queries and writes from/into specified bucket.
"""
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
"""
Define credentials
"""
url = "http://localhost:8086"
token = "my-token"
org = "my-org"
bucket = "my-bucket"
def check_connection():
"""Check that the InfluxDB is running."""
print("> Checking connection ...", end=" ")
client.api_client.call_api('/ping', 'GET')
print("ok")
def check_query():
"""Check that the credentials has permission to query from the Bucket"""
print("> Checking credentials for query ...", end=" ")
try:
client.query_api().query(f"from(bucket:\"{bucket}\") |> range(start: -1m) |> limit(n:1)", org)
except ApiException as e:
# missing credentials
if e.status == 404:
raise Exception(f"The specified token doesn't have sufficient credentials to read from '{bucket}' "
f"or specified bucket doesn't exists.") from e
raise
print("ok")
def check_write():
"""Check that the credentials has permission to write into the Bucket"""
print("> Checking credentials for write ...", end=" ")
try:
client.write_api(write_options=SYNCHRONOUS).write(bucket, org, b"")
except ApiException as e:
# bucket does not exist
if e.status == 404:
raise Exception(f"The specified bucket does not exist.") from e
# insufficient permissions
if e.status == 403:
raise Exception(f"The specified token does not have sufficient credentials to write to '{bucket}'.") from e
# 400 (BadRequest) caused by empty LineProtocol
if e.status != 400:
raise
print("ok")
with InfluxDBClient(url=url, token=token, org=org) as client:
check_connection()
check_query()
check_write()
pass