-
Notifications
You must be signed in to change notification settings - Fork 23
/
speedtest2influx.py
executable file
·127 lines (110 loc) · 4.46 KB
/
speedtest2influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import time
import json
import subprocess
import os
from influxdb import InfluxDBClient
from datetime import datetime
# InfluxDB Settings
DB_ADDRESS = os.environ.get('DB_ADDRESS', 'db_hostname.network')
DB_PORT = os.environ.get('DB_PORT', 8086)
DB_USER = os.environ.get('DB_USER', 'db_username')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'db_password')
DB_DATABASE = os.environ.get('DB_DATABASE', 'speedtest_db')
DB_RETRY_INVERVAL = int(os.environ.get('DB_RETRY_INVERVAL', 60)) # Time before retrying a failed data upload.
# Speedtest Settings
TEST_INTERVAL = int(os.environ.get('TEST_INTERVAL', 1800)) # Time between tests (in seconds).
TEST_FAIL_INTERVAL = int(os.environ.get('TEST_FAIL_INTERVAL', 60)) # Time before retrying a failed Speedtest (in seconds).
PRINT_DATA = os.environ.get('PRINT_DATA', "False") # Do you want to see the results in your logs? Type must be str. Will be converted to bool.
influxdb_client = InfluxDBClient(
DB_ADDRESS, DB_PORT, DB_USER, DB_PASSWORD, None)
def str2bool(v):
return v.lower() in ("yes", "true", "t", "1")
def logger(level, message):
print(level, ":", datetime.now().strftime("%d/%m/%Y %H:%M:%S"), ":", message)
def init_db():
try:
databases = influxdb_client.get_list_database()
except:
logger("Error", "Unable to get list of databases")
raise RuntimeError("No DB connection") from error
else:
if len(list(filter(lambda x: x['name'] == DB_DATABASE, databases))) == 0:
influxdb_client.create_database(
DB_DATABASE) # Create if does not exist.
else:
influxdb_client.switch_database(DB_DATABASE) # Switch to if does exist.
def format_for_influx(cliout):
data = json.loads(cliout)
# There is additional data in the speedtest-cli output but it is likely not necessary to store.
influx_data = [
{
'measurement': 'ping',
'time': data['timestamp'],
'fields': {
'jitter': float(data['ping']['jitter']),
'latency': float(data['ping']['latency'])
}
},
{
'measurement': 'download',
'time': data['timestamp'],
'fields': {
# Byte to Megabit
'bandwidth': data['download']['bandwidth'] / 125000,
'bytes': data['download']['bytes'],
'elapsed': data['download']['elapsed']
}
},
{
'measurement': 'upload',
'time': data['timestamp'],
'fields': {
# Byte to Megabit
'bandwidth': data['upload']['bandwidth'] / 125000,
'bytes': data['upload']['bytes'],
'elapsed': data['upload']['elapsed']
}
},
{
'measurement': 'packetLoss',
'time': data['timestamp'],
'fields': {
'packetLoss': float(data.get('packetLoss', 0.0))
}
}
]
return influx_data
def main():
db_initialized = False
while(db_initialized == False):
try:
init_db() # Setup the database if it does not already exist.
except:
logger("Error", "DB initialization error")
time.sleep(int(DB_RETRY_INVERVAL))
else:
logger("Info", "DB initialization complete")
db_initialized = True
while (1): # Run a Speedtest and send the results to influxDB indefinitely.
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True)
if speedtest.returncode == 0: # Speedtest was successful.
data = format_for_influx(speedtest.stdout)
logger("Info", "Speedtest successful")
try:
if influxdb_client.write_points(data) == True:
logger("Info", "Data written to DB successfully")
if str2bool(PRINT_DATA) == True:
logger("Info", data)
time.sleep(TEST_INTERVAL)
except:
logger("Error", "Data write to DB failed")
time.sleep(TEST_FAIL_INTERVAL)
else: # Speedtest failed.
logger("Error", "Speedtest failed")
logger("Error", speedtest.stderr)
logger("Info", speedtest.stdout)
time.sleep(TEST_FAIL_INTERVAL)
if __name__ == '__main__':
logger('Info', 'Speedtest CLI Data Logger to InfluxDB started')
main()