Skip to content

Commit

Permalink
Publish Python nexmark metrics to influxdb
Browse files Browse the repository at this point in the history
  • Loading branch information
apilloud committed Oct 14, 2022
1 parent 3ffdf8d commit 3af6864
Showing 1 changed file with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,15 @@
# pytype: skip-file

import argparse
import json
import logging
import os
import time
import uuid

import requests
from requests.auth import HTTPBasicAuth

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -123,6 +128,13 @@ def __init__(self):
logging.info('creating sub %s', self.topic_name)
sub.create()

self.export_influxdb = self.args.export_summary_to_influxdb
if self.export_influxdb:
self.influx_database = self.args.influx_database
self.influx_host = self.args.influx_host
self.influx_base = self.args.base_influx_measurement
self.influx_retention = self.args.influx_retention_policy

def parse_args(self):
parser = argparse.ArgumentParser()

Expand Down Expand Up @@ -170,6 +182,32 @@ def parse_args(self):
choices=['PUBLISH_ONLY', 'SUBSCRIBE_ONLY', 'COMBINED'],
help='Pubsub mode used in the pipeline.')

parser.add_argument(
'--export_summary_to_influxdb',
default=False,
action='store_true',
help='If set store results in influxdb')
parser.add_argument(
'--influx_database',
type=str,
default='beam_test_metrics',
help='Influx database name')
parser.add_argument(
'--influx_host',
type=str,
default='http://localhost:8086',
help='Influx database url')
parser.add_argument(
'--base_influx_measurement',
type=str,
default='nexmark',
help='Prefix to influx measurement')
parser.add_argument(
'--influx_retention_policy',
type=str,
default='forever',
help='Retention policy for stored results')

self.args, self.pipeline_args = parser.parse_known_args()
logging.basicConfig(
level=getattr(logging, self.args.loglevel, None),
Expand Down Expand Up @@ -243,7 +281,8 @@ def read_from_pubsub(self):
| 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
return events

def run_query(self, query, query_args, pipeline_options, query_errors):
def run_query(
self, query_num, query, query_args, pipeline_options, query_errors):
try:
self.pipeline = beam.Pipeline(options=self.pipeline_options)
nexmark_util.setup_coder()
Expand All @@ -269,6 +308,8 @@ def run_query(self, query, query_args, pipeline_options, query_errors):
result.wait_until_finish()
perf = self.monitor(result, event_monitor, result_monitor)
self.log_performance(perf)
if self.export_influxdb:
self.publish_performance_influxdb(query_num, perf)

except Exception as exc:
query_errors.append(str(exc))
Expand Down Expand Up @@ -349,6 +390,47 @@ def log_performance(perf):
'query run took %.1f seconds and processed %.1f events per second' %
(perf.runtime_sec, perf.event_per_sec))

def publish_performance_influxdb(self, query_num, perf):
processingMode = "streaming" if self.streaming else "batch"
measurement = "%s_%d_python_%s" % (
self.influx_base, query_num, processingMode)

tags = {'runner': self.pipeline_options.view_as(StandardOptions).runner}

mt = ','.join([measurement] + [k + "=" + v for k, v in tags.items()])

fields = {
'numResults': "%di" % (perf.result_count),
'runtimeMs': "%di" % (perf.runtime_sec * 1000),
}

ts = int(time.time())
payload = '\n'.join(
["%s %s=%s %d" % (mt, k, v, ts) for k, v in fields.items()])

url = '%s/write' % (self.influx_host)
query_str = {
'db': self.influx_database,
'rp': self.influx_retention,
'precision': 's',
}

user = os.getenv('INFLUXDB_USER')
password = os.getenv('INFLUXDB_USER_PASSWORD')
auth = HTTPBasicAuth(user, password)

try:
response = requests.post(url, params=query_str, data=payload, auth=auth)
except requests.exceptions.RequestException as e:
logging.warning('Failed to publish metrics to InfluxDB: ' + str(e))
else:
if response.status_code != 204:
content = json.loads(response.content)
logging.warning(
'Failed to publish metrics to InfluxDB. Received status code %s '
'with an error message: %s' %
(response.status_code, content['error']))

@staticmethod
def get_performance(result, event_monitor, result_monitor):
event_count = nexmark_util.get_counter_metric(
Expand Down Expand Up @@ -429,6 +511,7 @@ def run(self):
for i in self.args.query:
logging.info('Running query %d', i)
self.run_query(
i,
queries[i],
query_args,
self.pipeline_options,
Expand Down

0 comments on commit 3af6864

Please sign in to comment.