From 8fb71f60c59563717ef7f466e2062e77f1a57715 Mon Sep 17 00:00:00 2001 From: Philip Date: Tue, 27 May 2014 15:38:41 +0100 Subject: [PATCH] Initial logging packets and data structures --- .gitignore | 1 + WatchyServer/StatsAggregator.py | 45 ++++++++----- WatchyServer/StatsServer.py | 19 +++++- WatchyServer/templates/index.html | 62 ++++++++++++++++- WatchyServer/templates/logs.html | 85 ++++++++++++++++++++++++ src/Makefile.am | 5 +- src/libwatchy.c | 107 +++++++++++++++++++++--------- src/log.c | 44 ++++++++++++ src/osdep-linux.c | 22 +----- src/watchy.h | 81 +++++++++++++--------- 10 files changed, 368 insertions(+), 103 deletions(-) create mode 100644 WatchyServer/templates/logs.html create mode 100644 src/log.c diff --git a/.gitignore b/.gitignore index ed44ee5..ef52995 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ build/ bindings/python/pywatchy.c *.log src/test +src/log src/watcher src/config.h.in~ libtool diff --git a/WatchyServer/StatsAggregator.py b/WatchyServer/StatsAggregator.py index f7e3bf6..c49facb 100644 --- a/WatchyServer/StatsAggregator.py +++ b/WatchyServer/StatsAggregator.py @@ -6,14 +6,15 @@ import traceback import ServerUtil +from StatsServer import StatSession_Logs from StatsServer import StatSession_Hosts from StatsServer import StatSession_Metrics -def consume (func): +def consumer (func): def decorated (*args, **kwargs): if hasattr (args [0], 'backend'): - # TODO consume to backend - pass + if args [0].backend is not None: + args [0].backend.consume (*args, **kwargs) return func (*args, **kwargs) return decorated @@ -31,7 +32,7 @@ def __init__ (self, host='localhost', port=8080, climit=20, backend=None): self.serverSocket.setblocking (0) threading.Thread.__init__ (self) - @consume + @consumer def consumeMetric (self, key, data): if key not in StatSession_Metrics: StatSession_Metrics [key] = [] @@ -39,7 +40,7 @@ def consumeMetric (self, key, data): StatSession_Metrics [key] = StatSession_Metrics [key][1:] StatSession_Metrics [key].append (data) - @consume + @consumer def consumeHost (self, key, data): if key not in StatSession_Hosts: StatSession_Hosts [key] = [] @@ -47,18 +48,25 @@ def consumeHost (self, key, data): StatSession_Hosts [key] = StatSession_Hosts [key][1:] StatSession_Hosts [key].append (data) + @consumer + def consumeLog (self, key, data): + if key not in StatSession_Logs: + StatSession_Logs [key] = [] + if len (StatSession_Logs [key]) >= self.climit: + StatSession_Logs [key] = StatSession_Logs [key][1:] + StatSession_Logs [key].append (data) + def consume (self, data): - try: - key = data ['name'] - which = data ['type'] - if which == 'host': - self.consumeHost (key, data) - elif which == 'metric': - self.consumeMetric (key, data) - else: - ServerUtil.warning ('Invalid type [%s]' % which) - except: - pass + key = data ['name'] + which = data ['type'] + if which == 'host': + self.consumeHost (key, data) + elif which == 'metric': + self.consumeMetric (key, data) + elif which == 'log': + self.consumeLog (key, data) + else: + ServerUtil.warning ('Invalid type [%s]' % which) def run (self): ServerUtil.info ("Starting StatsAggregator on %s:%s" % (self.host, self.port)) @@ -78,8 +86,9 @@ def run (self): sobject ['host'] = { 'host': rdata [1][0], 'port': rdata [1][1] } - ServerUtil.info ('Recieved stats from [%s:%i] for [%s]' \ - % (sobject ['host']['host'], + ServerUtil.info ('Recieved [%s] from [%s:%i] for [%s]' \ + % (sobject ['type'], + sobject ['host']['host'], sobject ['host']['port'], sobject ['name'])) self.consume (sobject) diff --git a/WatchyServer/StatsServer.py b/WatchyServer/StatsServer.py index 1bf92c4..effa920 100644 --- a/WatchyServer/StatsServer.py +++ b/WatchyServer/StatsServer.py @@ -14,8 +14,10 @@ from gevent.pywsgi import WSGIServer from geventwebsocket.handler import WebSocketHandler +StatSession_Logs = { } StatSession_Hosts = { } StatSession_Metrics = { } + tfolder = os.path.join (os.path.dirname (os.path.abspath (__file__)), 'templates') sfolder = os.path.join (os.path.dirname (os.path.abspath (__file__)), 'static') app = Flask ('WatchyServer', template_folder=tfolder, static_folder=sfolder) @@ -35,16 +37,25 @@ def metricsGraph (key): def hostsGraph (key): return render_template ('host.html', node=key) +@app.route ("/logs/") +def logsView (key): + return render_template ('logs.html', node=key) + @app.route ("/api/hosts/keys") def getHosts (): nodes = StatSession_Hosts.keys () return jsonify ({'keys':nodes, 'len':len (nodes)}) @app.route ("/api/metrics/keys") -def getKeys (): +def getMetrics (): nodes = StatSession_Metrics.keys () return jsonify ({'keys':nodes, 'len':len (nodes)}) +@app.route ("/api/logs/keys") +def getLogs (): + nodes = StatSession_Logs.keys () + return jsonify ({'keys':nodes, 'len':len (nodes)}) + @app.route ("/api/metrics/graph/") def getMetricGraph (key): if key not in StatSession_Metrics: @@ -81,6 +92,12 @@ def getHostData (key): return jsonify ({'data':None, 'len':0}) return jsonify ({'data':StatSession_Hosts [key], 'len':len (StatSession_Hosts [key])}) +@app.route ("/api/logs/data/") +def getLogData (key): + if key not in StatSession_Logs: + return jsonify ({'data':None, 'len':0}) + return jsonify ({'data':StatSession_Logs [key], 'len':len (StatSession_Logs [key])}) + @app.route ("/deps/") def statics (path): return app.send_static_file (path) diff --git a/WatchyServer/templates/index.html b/WatchyServer/templates/index.html index 50f3db7..00c8a60 100644 --- a/WatchyServer/templates/index.html +++ b/WatchyServer/templates/index.html @@ -11,7 +11,6 @@ - + + + + + + + + + + +
+
+ +
+

Dashboard {{ node }}

+ + DO SOMETHING HERE + +
+
+
+ + + diff --git a/src/Makefile.am b/src/Makefile.am index 84ea2b0..6a916c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,4 +1,4 @@ -bin_PROGRAMS = test$(EXEEXT) watcher$(EXEEXT) +bin_PROGRAMS = test$(EXEEXT) log$(EXEEXT) watcher$(EXEEXT) lib_LTLIBRARIES = libwatchy.la pkgconfigdir = $(libdir)/pkgconfig @@ -16,3 +16,6 @@ watcher_LDADD = libwatchy.la test_SOURCES = test.c test_LDADD = libwatchy.la + +log_SOURCES = log.c +log_LDADD = libwatchy.la diff --git a/src/libwatchy.c b/src/libwatchy.c index d2db443..7998298 100644 --- a/src/libwatchy.c +++ b/src/libwatchy.c @@ -16,11 +16,6 @@ #include #include -// this is a very basic protocol thsi is the size of every object -// means the server can simly .read (256) and you know you will -// get all the data no need to keep reading etc.. -#define WTCY_PACKET_SIZE 256 - #ifndef nitems # define nitems(_a) (sizeof((_a)) / sizeof((_a)[0])) #endif @@ -32,9 +27,22 @@ static const char * watchy_error_strings [] = { [WTCY_FORK_FAIL] = "fork of runtime has failed", [WTCY_SOCK_FAIL] = "create socket failed see errno", [WTCY_IS_RUNNING] = "watch me is already running", + [WTCY_PACKET_ERR] = "Error converting to json", [WTCY_UNKNOWN] = "unknown error code", }; +inline int +watchy_setTimeStamp (char * const buffer, const size_t len) +{ + time_t ltime = time (NULL); + struct tm *tm; + tm = localtime (<ime); + + return snprintf (buffer, len,"%04d%02d%02d%02d%02d%02d", + tm->tm_year+1900, tm->tm_mon, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec); +} + /* Utility trim function */ char * watchy_trim (const char * buffer, const size_t len) @@ -54,6 +62,16 @@ watchy_trim (const char * buffer, const size_t len) return rbuf; } +inline int +watchy_logPacket (struct watchy_data * const data, const char * message, const char * key) +{ + data->T = LOG; + strncpy (data->key, key, sizeof (data->key)); + watchy_setTimeStamp (data->tsp, sizeof (data->tsp)); + memset (data->value.buffer, 0, sizeof (data->value.buffer)); + return snprintf (data->value.buffer, sizeof (data->value.buffer) - 1, "%s", message); +} + const char * watchy_strerror (const int code) { @@ -82,30 +100,46 @@ watchy_socket (const char * bind, const int port, int * const sockfd, /** * this is a dirty method but its handy for now and we only need a very simply json object **/ -inline int +int watchy_statsToJson (const struct watchy_data * const stats, const size_t blen, char * const buffer) { char type [10]; memset (type, 0, sizeof (type)); - if (stats->T == METRIC) - strcpy (type, "metric"); - else if (stats->T == HOST) - strcpy (type, "host"); - else - strcpy (type, "unknown"); - - return snprintf (buffer, blen, "{ " - "\"type\" : \"%s\", " - "\"name\" : \"%s\", " - "\"timeStamp\" : \"%s\", " - "\"state\" : \"%s\", " - "\"pid\" : %i, " - "\"threads\" : %zi, " - "\"memory\" : %i" - " }", type, - stats->pname, stats->tsp, - stats->status, stats->cpid, - stats->nthreads, stats->memory); + + if (stats->T == METRIC || stats->T == HOST) + { + if (stats->T == METRIC) + strncpy (type, "metric", sizeof (type)); + else + strncpy (type, "host", sizeof (type)); + return snprintf (buffer, blen, "{ " + "\"type\" : \"%s\", " + "\"name\" : \"%s\", " + "\"timeStamp\" : \"%s\", " + "\"state\" : \"%s\", " + "\"pid\" : %i, " + "\"threads\" : %zi, " + "\"memory\" : %i" + " }", type, + stats->key, stats->tsp, + stats->value.metric.status, + stats->value.metric.cpid, + stats->value.metric.nthreads, + stats->value.metric.memory); + } + else if (stats->T == LOG) + { + strncpy (type, "log", sizeof (type)); + return snprintf (buffer, blen, "{ " + "\"type\" : \"%s\", " + "\"name\" : \"%s\", " + "\"timeStamp\" : \"%s\", " + "\"message\" : \"%s\"" + " }", type, + stats->key, stats->tsp, + stats->value.buffer); + } + return WTCY_PACKET_ERR; } int @@ -147,9 +181,12 @@ watchy_watchme (const char * name, const char * bind, const int port) { struct watchy_data stats; memset (&stats, 0, sizeof (stats)); - - watchy_getStats (&stats, cpid); - strncpy (stats.pname, name, sizeof (stats.pname)); + + stats.T = METRIC; + strncpy (stats.key, name, sizeof (stats.key)); + watchy_setTimeStamp (stats.tsp, sizeof (stats.tsp)); + + watchy_getStats (&stats.value.metric, cpid); char buffer [WTCY_PACKET_SIZE]; memset (buffer, 0, sizeof (buffer)); @@ -194,8 +231,11 @@ watchy_watchpid (const char * name, const char * bind, const int port, const pid struct watchy_data stats; memset (&stats, 0, sizeof (stats)); - watchy_getStats (&stats, iproc); - strncpy (stats.pname, name, sizeof (stats.pname)); + stats.T = METRIC; + strncpy (stats.key, name, sizeof (stats.key)); + watchy_setTimeStamp (stats.tsp, sizeof (stats.tsp)); + + watchy_getStats (&stats.value.metric, iproc); char buffer [WTCY_PACKET_SIZE]; memset (buffer, 0, sizeof (buffer)); @@ -227,8 +267,11 @@ watchy_watchHost (const char * name, const char * bind, const int port) struct watchy_data stats; memset (&stats, 0, sizeof (stats)); - watchy_getHostStats (&stats); - strncpy (stats.pname, name, sizeof (stats.pname)); + stats.T = HOST; + strncpy (stats.key, name, sizeof (stats.key)); + watchy_setTimeStamp (stats.tsp, sizeof (stats.tsp)); + + watchy_getHostStats (&stats.value.metric); char buffer [WTCY_PACKET_SIZE]; memset (buffer, 0, sizeof (buffer)); diff --git a/src/log.c b/src/log.c new file mode 100644 index 0000000..370db5d --- /dev/null +++ b/src/log.c @@ -0,0 +1,44 @@ +#include "config.h" + +#include +#include +#include +#include +#include + +#include "watchy.h" + +#include +#include +#include +#include +#include +#include + +int main (int argc, char **argv) +{ + int sockfd; + struct sockaddr_in servaddr; + memset (&servaddr, 0, sizeof (servaddr)); + watchy_socket ("localhost", 7878, &sockfd, &servaddr); + + size_t i; + for (i = 0; i < 5; ++i) + { + struct watchy_data stats; + memset (&stats, 0, sizeof (stats)); + + watchy_logPacket (&stats, "Hello World", "log1"); + + char buffer [WTCY_PACKET_SIZE]; + memset (buffer, 0, sizeof (buffer)); + watchy_statsToJson (&stats, WTCY_PACKET_SIZE, buffer); + sendto (sockfd, buffer, WTCY_PACKET_SIZE, 0, + (const struct sockaddr *) &servaddr, sizeof (servaddr)); + + sleep (1); + } + close (sockfd); + + return 0; +} diff --git a/src/osdep-linux.c b/src/osdep-linux.c index 413b175..2a9396c 100644 --- a/src/osdep-linux.c +++ b/src/osdep-linux.c @@ -11,19 +11,10 @@ #include "watchy.h" -void watchy_getStats (struct watchy_data * const stats, const pid_t ipid) +void watchy_getStats (struct watchy_metric * const stats, const pid_t ipid) { - stats->T = METRIC; stats->cpid = ipid; - time_t ltime = time (NULL); - struct tm *tm; - tm = localtime (<ime); - - snprintf (stats->tsp, sizeof (stats->tsp),"%04d%02d%02d%02d%02d%02d", - tm->tm_year+1900, tm->tm_mon, tm->tm_mday, - tm->tm_hour, tm->tm_min, tm->tm_sec); - // looking up /proc/pid/status for these stats for now. char buf [PATH_MAX]; memset (buf, 0, sizeof (buf)); @@ -75,19 +66,10 @@ void watchy_getStats (struct watchy_data * const stats, const pid_t ipid) fclose (fd); } -void watchy_getHostStats (struct watchy_data * const stats) +void watchy_getHostStats (struct watchy_metric * const stats) { - stats->T = HOST; strncpy (stats->status, "running", sizeof (stats->status)); - time_t ltime = time (NULL); - struct tm *tm; - tm = localtime (<ime); - - snprintf (stats->tsp, sizeof (stats->tsp),"%04d%02d%02d%02d%02d%02d", - tm->tm_year+1900, tm->tm_mon, tm->tm_mday, - tm->tm_hour, tm->tm_min, tm->tm_sec); - FILE * fd = fopen ("/proc/meminfo", "rb"); if (fd == NULL) return; diff --git a/src/watchy.h b/src/watchy.h index 8da1bc6..9f666c2 100644 --- a/src/watchy.h +++ b/src/watchy.h @@ -17,43 +17,64 @@ #define WTCY_FORK_FAIL 2 #define WTCY_SOCK_FAIL 3 #define WTCY_IS_RUNNING 4 -#define WTCY_UNKNOWN 5 +#define WTCY_PACKET_ERR 5 +#define WTCY_UNKNOWN 6 + +// this is a very basic protocol thsi is the size of every object +// means the server can simly .read (256) and you know you will +// get all the data no need to keep reading etc.. +#define WTCY_PACKET_SIZE 256 #ifdef __cplusplus extern "C" { #endif -typedef enum { METRIC, HOST } WATCHY_TYPE; -struct watchy_data { -WATCHY_TYPE T; -char pname [32]; -char tsp [32]; -char status [16]; -pid_t cpid; -size_t nthreads; -unsigned int memory; -}; - -// this return is allocated.. -extern char * watchy_trim (const char *, const size_t); + typedef enum { METRIC, HOST, LOG } WATCHY_TYPE; + + struct watchy_metric { + char status [16]; + pid_t cpid; + size_t nthreads; + unsigned int memory; + }; + + struct watchy_data { + WATCHY_TYPE T; + char key [32]; + char tsp [32]; + union { + struct watchy_metric metric; + char buffer [WTCY_PACKET_SIZE-25]; + } value ; + }; + + // this return is allocated.. + extern char * watchy_trim (const char *, const size_t); -// setup socket [bind, port, fd, addr_Desc] -extern int watchy_socket (const char *, const int, int * const, struct sockaddr_in * const); - -// get the stats this function is osdep-*.c depends at compile time -extern void watchy_getStats (struct watchy_data * const, const pid_t); -extern void watchy_getHostStats (struct watchy_data * const); -extern int watchy_statsToJson (const struct watchy_data * const, const size_t, char * const); - -// return the error string for the error code (string is not allocated) -extern const char * watchy_strerror (const int); -// watch the current process [name, host, port] - non blocking -extern int watchy_watchme (const char *, const char *, const int); -// watch a specified pid [name, host, port, pid] - blocking -extern int watchy_watchpid (const char *, const char *, const int, const pid_t); -// watch the host system [name, host, port] - blocking -extern int watchy_watchHost (const char *, const char *, const int); + // setup socket [bind, port, fd, addr_Desc] + extern int watchy_socket (const char *, const int, int * const, struct sockaddr_in * const); + + // utility functions + extern int watchy_setTimeStamp (char * const, const size_t); + extern int watchy_logPacket (struct watchy_data * const, const char *, const char *); + // get the stats this function is osdep-*.c depends at compile time + extern void watchy_getStats (struct watchy_metric * const, const pid_t); + extern void watchy_getHostStats (struct watchy_metric * const); + + // agnostic of tagged union + extern int watchy_statsToJson (const struct watchy_data * const, const size_t, char * const); + + // return the error string for the error code (string is not allocated) + extern const char * watchy_strerror (const int); + + // FIXME fork/daemon issues + extern int watchy_watchme (const char *, const char *, const int); + // watch a specified pid [name, host, port, pid] - blocking + extern int watchy_watchpid (const char *, const char *, const int, const pid_t); + // watch the host system [name, host, port] - blocking + extern int watchy_watchHost (const char *, const char *, const int); + #ifdef __cplusplus } #endif