forked from influxdata/influxdb-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrite_api_callbacks.py
49 lines (39 loc) · 1.51 KB
/
write_api_callbacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
"""
How to use WriteApi's callbacks to notify about state of background batches.
"""
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
"""
Configuration
"""
url = 'http://localhost:8086'
token = 'my-token'
org = 'my-org'
bucket = 'my-bucket'
"""
Data
"""
points = [Point("my-temperature").tag("location", "Prague").field("temperature", 25.3),
Point("my-temperature").tag("location", "New York").field("temperature", 18.4)]
class BatchingCallback(object):
def success(self, conf: (str, str, str), data: str):
"""Successfully writen batch."""
print(f"Written batch: {conf}, data: {data}")
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
"""Unsuccessfully writen batch."""
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
"""Retryable error."""
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
callback = BatchingCallback()
with InfluxDBClient(url=url, token=token, org=org) as client:
"""
Use batching API
"""
with client.write_api(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
write_api.write(bucket=bucket, record=points)
print()
print("Wait to finishing ingesting...")
print()