-
Notifications
You must be signed in to change notification settings - Fork 4
/
broker_to_influxdb.py
98 lines (62 loc) · 3.27 KB
/
broker_to_influxdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import threading, logging, time
import multiprocessing
import msgpack
from kafka import TopicPartition
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import requests, json
import time
import os
import sys
import subprocess
#import urllib, urllib2
from time import localtime, strftime
cmd ="curl -XPOST 'http://localhost:8086/query' --data-urlencode 'q=CREATE DATABASE 'Labs''"
subprocess.call([cmd], shell=True)
timeout = 100
actual_data=[]
consumer = KafkaConsumer('resource',bootstrap_servers=['<NUC_IP>:9091'])
partitions = consumer.poll(timeout)
while partitions == None or len(partitions) == 0:
consumer = KafkaConsumer('resource', bootstrap_servers=['<NUC_IP>:9091'])
message = next(consumer)
print(message.value.decode('utf-8'))
str1 = message.value.decode('utf-8')
str2 = str1.split(',')
str3 = str2[0].split(':')[1] #memory
str4 = str2[1].split(':')[1] #tx
str5 = str2[2].split(':')[1] #rx
str6 = str2[4].split(':')[1] #cpu_usage
str7 = str2[5].split(':')[1] #tx_dropped
str8 = str2[8].split(':')[1] #rxError
str9 = str2[9].split(':')[1] #disk
str10 = str2[10].split(':')[1] #rx_dropped
str11 = str2[11].split(':')[1] #tx_dropped
str12 = str2[12].split(':')[1] #time_stamp
variables = "labs"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST memory=%s'" % (variables, str3)
subprocess.call([cmd], shell=True)
variables = "labs"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST tx=%s'" % (variables, str4)
subprocess.call([cmd], shell=True)
variables = "labs"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST rx=%s'" % (variables, str5)
subprocess.call([cmd], shell=True)
variables = "labs"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST CPU_Usage=%s'" % (variables, str6)
subprocess.call([cmd], shell=True)
variables = "str7"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST tx_dropped=%s'" % (variables, str7)
subprocess.call([cmd], shell=True)
variables = "str8"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST rxError=%s'" % (variables, str8)
subprocess.call([cmd], shell=True)
variables = "str8"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST disk=%s'" % (variables, str9)
subprocess.call([cmd], shell=True)
variables = "str8"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST rx_dropped=%s'" % (variables, str10)
subprocess.call([cmd], shell=True)
variables = "str8"
cmd = "curl -i -XPOST 'http://localhost:8086/write?db=Labs' --data-binary '%s,host=Labs,region=GIST txError=%s'" % (variables, str11)
subprocess.call([cmd], shell=True)