forked from influxdata/influxdb-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_health.py
79 lines (58 loc) · 3.07 KB
/
test_health.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import datetime
import unittest
import httpretty
from urllib3 import Retry
from influxdb_client import InfluxDBClient
from tests.base_test import BaseTest
class TestHealth(BaseTest):
def setUp(self) -> None:
super(TestHealth, self).setUp()
self.client.api_client.configuration.debug = True
def test_health(self):
health = self.client.health()
self.assertEqual(health.message, 'ready for queries and writes')
self.assertEqual(health.status, "pass")
self.assertEqual(health.name, "influxdb")
def test_health_not_running_instance(self):
client_not_running = InfluxDBClient("http://localhost:8099", token="my-token", debug=True)
check = client_not_running.health()
self.assertTrue("Connection refused" in check.message)
self.assertEqual(check.status, "fail")
self.assertEqual(check.name, "influxdb")
def test_ready(self):
ready = self.client.ready()
self.assertEqual(ready.status, "ready")
self.assertIsNotNone(ready.started)
self.assertTrue(datetime.datetime.now(tz=ready.started.tzinfo) > ready.started)
self.assertIsNotNone(ready.up)
class TestHealthMock(unittest.TestCase):
def setUp(self) -> None:
httpretty.enable()
httpretty.reset()
self.influxdb_client = InfluxDBClient(url="http://localhost", token="my-token")
def tearDown(self) -> None:
self.influxdb_client.close()
httpretty.disable()
def test_without_retry(self):
httpretty.register_uri(httpretty.GET, uri="http://localhost/health", status=429,
adding_headers={'Retry-After': '5', 'Content-Type': 'application/json'},
body="{\"message\":\"Health is not working\"}")
check = self.influxdb_client.health()
self.assertTrue("Health is not working" in check.message, msg=check.message)
self.assertEqual(check.status, "fail")
self.assertEqual(check.name, "influxdb")
self.assertEqual(1, len(httpretty.httpretty.latest_requests))
def test_with_retry(self):
self.influxdb_client.close()
self.influxdb_client = InfluxDBClient(url="http://localhost", token="my-token", retries=Retry())
httpretty.register_uri(httpretty.GET, uri="http://localhost/health", status=200,
adding_headers={'Content-Type': 'application/json'},
body="{\"message\":\"ready for queries and writes\", \"name\":\"influxdb\", \"status\":\"pass\"}")
httpretty.register_uri(httpretty.GET, uri="http://localhost/health", status=429,
adding_headers={'Retry-After': '1', 'Content-Type': 'application/json'},
body="{\"message\":\"Health is not working\"}")
health = self.influxdb_client.health()
self.assertEqual(health.message, 'ready for queries and writes')
self.assertEqual(health.status, "pass")
self.assertEqual(health.name, "influxdb")
self.assertEqual(2, len(httpretty.httpretty.latest_requests))