forked from influxdata/influxdb-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_QueryApiDataFrame.py
416 lines (357 loc) · 24.9 KB
/
test_QueryApiDataFrame.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import random
import httpretty
import pytest
import reactivex as rx
import pandas
import warnings
from pandas import DataFrame
from pandas._libs.tslibs.timestamps import Timestamp
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, Point, WritePrecision, WriteOptions
from influxdb_client.client.warnings import MissingPivotFunction
from influxdb_client.rest import ApiException
from tests.base_test import BaseTest, current_milli_time
class QueryDataFrameApi(BaseTest):
def setUp(self) -> None:
super(QueryDataFrameApi, self).setUp()
httpretty.enable()
httpretty.reset()
def tearDown(self) -> None:
self.client.close()
httpretty.disable()
def test_one_table(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrame = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "used")',
"my-org")
self.assertEqual(DataFrame, type(_dataFrame))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrame.columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrame.index))
self.assertEqual(5, len(_dataFrame))
self.assertEqual("_result", _dataFrame['result'][0])
self.assertEqual("_result", _dataFrame['result'][1])
self.assertEqual("_result", _dataFrame['result'][2])
self.assertEqual("_result", _dataFrame['result'][3])
self.assertEqual("_result", _dataFrame['result'][4])
self.assertEqual(0, _dataFrame['table'][0], None)
self.assertEqual(0, _dataFrame['table'][1], None)
self.assertEqual(0, _dataFrame['table'][2], None)
self.assertEqual(0, _dataFrame['table'][3], None)
self.assertEqual(0, _dataFrame['table'][4], None)
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:04.795385+0000'), _dataFrame['_start'][4])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:09.795385+0000'), _dataFrame['_stop'][4])
self.assertEqual(Timestamp('2019-11-12 08:09:05+0000'), _dataFrame['_time'][0])
self.assertEqual(Timestamp('2019-11-12 08:09:06+0000'), _dataFrame['_time'][1])
self.assertEqual(Timestamp('2019-11-12 08:09:07+0000'), _dataFrame['_time'][2])
self.assertEqual(Timestamp('2019-11-12 08:09:08+0000'), _dataFrame['_time'][3])
self.assertEqual(Timestamp('2019-11-12 08:09:09+0000'), _dataFrame['_time'][4])
self.assertEqual(11125907456, _dataFrame['_value'][0])
self.assertEqual(11127103488, _dataFrame['_value'][1])
self.assertEqual(11127291904, _dataFrame['_value'][2])
self.assertEqual(11126190080, _dataFrame['_value'][3])
self.assertEqual(11127832576, _dataFrame['_value'][4])
self.assertEqual('used', _dataFrame['_field'][0])
self.assertEqual('used', _dataFrame['_field'][1])
self.assertEqual('used', _dataFrame['_field'][2])
self.assertEqual('used', _dataFrame['_field'][3])
self.assertEqual('used', _dataFrame['_field'][4])
self.assertEqual('mem', _dataFrame['_measurement'][0])
self.assertEqual('mem', _dataFrame['_measurement'][1])
self.assertEqual('mem', _dataFrame['_measurement'][2])
self.assertEqual('mem', _dataFrame['_measurement'][3])
self.assertEqual('mem', _dataFrame['_measurement'][4])
self.assertEqual('mac.local', _dataFrame['host'][0])
self.assertEqual('mac.local', _dataFrame['host'][1])
self.assertEqual('mac.local', _dataFrame['host'][2])
self.assertEqual('mac.local', _dataFrame['host'][3])
self.assertEqual('mac.local', _dataFrame['host'][4])
def test_more_table(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrames = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
"my-org")
self.assertEqual(list, type(_dataFrames))
self.assertEqual(len(_dataFrames), 3)
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[0].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[0].index))
self.assertEqual(5, len(_dataFrames[0]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[1].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[1].index))
self.assertEqual(5, len(_dataFrames[1]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_value", "_field", "_measurement", "host"],
list(_dataFrames[2].columns))
self.assertListEqual([0, 1, 2, 3, 4], list(_dataFrames[2].index))
self.assertEqual(5, len(_dataFrames[2]))
def test_empty_data_set(self):
query_response = '\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrame = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "not_exit")',
"my-org")
self.assertEqual(DataFrame, type(_dataFrame))
self.assertListEqual([], list(_dataFrame.columns))
self.assertListEqual([], list(_dataFrame.index))
def test_more_table_custom_index(self):
query_response = \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,11125907456,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,11127103488,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,11127291904,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,11126190080,used,mem,mac.local\n' \
',,0,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,11127832576,used,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,6053961728,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,6052765696,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,6052577280,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,6053679104,available,mem,mac.local\n' \
',,1,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,6052036608,available,mem,mac.local\n' \
'\n\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string\n' \
'#group,false,false,true,true,false,false,true,true,true\n' \
'#default,_result,,,,,,,,\n' \
',result,table,_start,_stop,_time,_value,_field,_measurement,host\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:05Z,18632704,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:06Z,17420288,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:07Z,17256448,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:08Z,18362368,free,mem,mac.local\n' \
',,2,2019-11-12T08:09:04.795385031Z,2019-11-12T08:09:09.795385031Z,2019-11-12T08:09:09Z,16723968,free,mem,mac.local\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrames = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")',
"my-org", data_frame_index=["_time"])
self.assertEqual(list, type(_dataFrames))
self.assertEqual(len(_dataFrames), 3)
print(_dataFrames[0].to_string())
self.assertListEqual(
["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
list(_dataFrames[0].columns))
self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[0].index))
self.assertEqual(5, len(_dataFrames[0]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
list(_dataFrames[1].columns))
self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[1].index))
self.assertEqual(5, len(_dataFrames[1]))
self.assertListEqual(
["result", "table", "_start", "_stop", "_value", "_field", "_measurement", "host"],
list(_dataFrames[2].columns))
self.assertListEqual([Timestamp('2019-11-12 08:09:05+0000'), Timestamp('2019-11-12 08:09:06+0000'),
Timestamp('2019-11-12 08:09:07+0000'), Timestamp('2019-11-12 08:09:08+0000'),
Timestamp('2019-11-12 08:09:09+0000')], list(_dataFrames[2].index))
self.assertEqual(5, len(_dataFrames[2]))
def test_query_with_warning(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body='\n')
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
with pytest.warns(MissingPivotFunction) as warnings:
self.client.query_api().query_data_frame(
'from(bucket: "my-bucket")'
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
"my-org")
self.assertEqual(1, len([w for w in warnings if w.category == MissingPivotFunction]))
def test_query_without_warning(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body='\n')
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
with warnings.catch_warnings(record=True) as warns:
self.client.query_api().query_data_frame(
'import "influxdata/influxdb/schema"'
''
'from(bucket: "my-bucket")'
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> schema.fieldsAsCols() '
"my-org")
self.assertEqual(0, len([w for w in warns if w.category == MissingPivotFunction]))
with warnings.catch_warnings(record=True) as warns:
self.client.query_api().query_data_frame(
'from(bucket: "my-bucket")'
'|> range(start: -5s, stop: now()) '
'|> filter(fn: (r) => r._measurement == "mem") '
'|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")'
"my-org")
self.assertEqual(0, len([w for w in warns if w.category == MissingPivotFunction]))
def test_pivoted_data(self):
query_response = \
'#group,false,false,true,true,false,true,false,false,false,false\n' \
'#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,double,long,string,boolean\n' \
'#default,_result,,,,,,,,,\n' \
',result,table,_start,_stop,_time,_measurement,test_double,test_long,test_string,test_boolean\n' \
',,0,2023-12-15T13:19:45Z,2023-12-15T13:20:00Z,2023-12-15T13:19:55Z,test,4,,,\n' \
',,0,2023-12-15T13:19:45Z,2023-12-15T13:20:00Z,2023-12-15T13:19:56Z,test,,1,,\n' \
',,0,2023-12-15T13:19:45Z,2023-12-15T13:20:00Z,2023-12-15T13:19:57Z,test,,,hi,\n' \
',,0,2023-12-15T13:19:45Z,2023-12-15T13:20:00Z,2023-12-15T13:19:58Z,test,,,,true\n' \
'\n\n'
httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
_dataFrame = self.client.query_api().query_data_frame(
'from(bucket: "my-bucket") '
'|> range(start: 2023-12-15T13:19:45Z, stop: 2023-12-15T13:20:00Z)'
'|> filter(fn: (r) => r["_measurement"] == "test")'
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
"my-org", use_extension_dtypes=True)
self.assertEqual(DataFrame, type(_dataFrame))
self.assertListEqual(
["result", "table", "_start", "_stop", "_time", "_measurement",
"test_double", "test_long", "test_string", "test_boolean"],
list(_dataFrame.columns))
self.assertListEqual([0, 1, 2, 3], list(_dataFrame.index))
# self.assertEqual('Int64', _dataFrame.dtypes['test_long'].name)
# self.assertEqual('Float64', _dataFrame.dtypes['test_double'].name)
self.assertEqual('string', _dataFrame.dtypes['test_string'].name)
self.assertEqual('boolean', _dataFrame.dtypes['test_boolean'].name)
self.assertEqual(4, len(_dataFrame))
self.assertEqual("_result", _dataFrame['result'][0])
self.assertEqual("_result", _dataFrame['result'][1])
self.assertEqual("_result", _dataFrame['result'][2])
self.assertEqual("_result", _dataFrame['result'][3])
self.assertEqual(0, _dataFrame['table'][0], None)
self.assertEqual(0, _dataFrame['table'][1], None)
self.assertEqual(0, _dataFrame['table'][2], None)
self.assertEqual(0, _dataFrame['table'][3], None)
self.assertEqual(Timestamp('2023-12-15 13:19:45+0000'), _dataFrame['_start'][0])
self.assertEqual(Timestamp('2023-12-15 13:19:45+0000'), _dataFrame['_start'][1])
self.assertEqual(Timestamp('2023-12-15 13:19:45+0000'), _dataFrame['_start'][2])
self.assertEqual(Timestamp('2023-12-15 13:19:45+0000'), _dataFrame['_start'][3])
self.assertEqual(Timestamp('2023-12-15 13:20:00+0000'), _dataFrame['_stop'][0])
self.assertEqual(Timestamp('2023-12-15 13:20:00+0000'), _dataFrame['_stop'][1])
self.assertEqual(Timestamp('2023-12-15 13:20:00+0000'), _dataFrame['_stop'][2])
self.assertEqual(Timestamp('2023-12-15 13:20:00+0000'), _dataFrame['_stop'][3])
self.assertEqual(Timestamp('2023-12-15 13:19:55+0000'), _dataFrame['_time'][0])
self.assertEqual(Timestamp('2023-12-15 13:19:56+0000'), _dataFrame['_time'][1])
self.assertEqual(Timestamp('2023-12-15 13:19:57+0000'), _dataFrame['_time'][2])
self.assertEqual(Timestamp('2023-12-15 13:19:58+0000'), _dataFrame['_time'][3])
self.assertEqual(4, _dataFrame['test_double'][0])
self.assertTrue(pandas.isna(_dataFrame['test_double'][1]))
self.assertTrue(pandas.isna(_dataFrame['test_double'][2]))
self.assertTrue(pandas.isna(_dataFrame['test_double'][3]))
self.assertTrue(pandas.isna(_dataFrame['test_long'][0]))
self.assertEqual(1, _dataFrame['test_long'][1])
self.assertTrue(pandas.isna(_dataFrame['test_long'][2]))
self.assertTrue(pandas.isna(_dataFrame['test_long'][3]))
self.assertTrue(pandas.isna(_dataFrame['test_string'][0]))
self.assertTrue(pandas.isna(_dataFrame['test_string'][1]))
self.assertEqual('hi', _dataFrame['test_string'][2])
self.assertTrue(pandas.isna(_dataFrame['test_string'][3]))
self.assertTrue(pandas.isna(_dataFrame['test_boolean'][0]))
self.assertTrue(pandas.isna(_dataFrame['test_boolean'][1]))
self.assertTrue(pandas.isna(_dataFrame['test_boolean'][2]))
self.assertEqual(True, _dataFrame['test_boolean'][3])
class QueryDataFrameIntegrationApi(BaseTest):
def test_large_amount_of_data(self):
_measurement_name = "data_frame_" + str(current_milli_time())
def _create_point(index) -> Point:
return Point(_measurement_name) \
.tag("deviceType", str(random.choice(['A', 'B']))) \
.tag("name", random.choice(['A', 'B'])) \
.field("uuid", random.randint(0, 10_000)) \
.field("co2", random.randint(0, 10_000)) \
.field("humid", random.randint(0, 10_000)) \
.field("lux", random.randint(0, 10_000)) \
.field("water", random.randint(0, 10_000)) \
.field("shine", random.randint(0, 10_000)) \
.field("temp", random.randint(0, 10_000)) \
.field("voc", random.randint(0, 10_000)) \
.time(time=(1583828781 + index), write_precision=WritePrecision.S)
data = rx.range(0, 2_000).pipe(ops.map(lambda index: _create_point(index)))
write_api = self.client.write_api(write_options=WriteOptions(batch_size=500))
write_api.write(org="my-org", bucket="my-bucket", record=data, write_precision=WritePrecision.S)
write_api.close()
query = 'from(bucket: "my-bucket")' \
'|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
f'|> filter(fn: (r) => r._measurement == "{_measurement_name}")'
result = self.client.query_api().query_data_frame(org="my-org", query=query)
self.assertGreater(len(result), 1)
def test_query_without_credentials(self):
_client = InfluxDBClient(url="http://localhost:8086", token="my-token-wrong-credentials", org="my-org")
with self.assertRaises(ApiException) as ae:
query = 'from(bucket: "my-bucket")' \
'|> range(start: 2020-02-19T23:30:00Z, stop: now())' \
f'|> filter(fn: (r) => r._measurement == "my-measurement")'
_client.query_api().query_data_frame(query=query)
exception = ae.exception
self.assertEqual(401, exception.status)
self.assertEqual("Unauthorized", exception.reason)
_client.close()