Skip to content

Commit

Permalink
Showing 3 changed files with 42 additions and 4 deletions.
26 changes: 24 additions & 2 deletions pulsar-functions/instance/src/main/python/log.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
''' log.py '''
import logging
from logging.handlers import RotatingFileHandler
import pulsar

# Create the logger
# pylint: disable=invalid-name
@@ -34,6 +35,19 @@
# see time formatter documentation for more
date_format = "%Y-%m-%d %H:%M:%S %z"

class LogTopicHandler(logging.Handler):
def __init__(self, topic_name, pulsar_client):
Log.info("Setting up producer for log topic %s" % topic_name)
self.producer = pulsar_client.create_producer(
str(topic_name),
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=100,
compression_type=pulsar._pulsar.CompressionType.LZ4)

def emit(self, record):
self.producer.send_async(record)

def configure(level=logging.INFO):
""" Configure logger which dumps log on terminal
@@ -50,14 +64,22 @@ def configure(level=logging.INFO):
Log.handlers.remove(handler)

Log.setLevel(level)
stream_handler = logging.StreamHandler()
add_handler(stream_handler)

def remove_all_handlers():
retval = None
for handler in Log.handlers:
Log.handlers.remove(handler)
retval = handler
return retval

def add_handler(stream_handler):
log_format = "[%(asctime)s] [%(levelname)s]: %(message)s"
formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
Log.addHandler(stream_handler)


def init_rotating_logger(level, logfile, max_files, max_bytes):
"""Initializes a rotating logger
16 changes: 15 additions & 1 deletion pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
@@ -94,10 +94,13 @@ def compute_latency(self):
return self.latency / self.nsuccessfullyprocessed

class PythonInstance(object):
def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, pulsar_client):
def __init__(self, instance_id, function_id, function_version, function_config, max_buffered_tuples, user_code, log_topic, pulsar_client):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_config, max_buffered_tuples)
self.user_code = user_code
self.queue = Queue.Queue(max_buffered_tuples)
self.log_topic_handler = None
if log_topic is not None:
self.log_topic_handler = log.LogTopicHandler(str(log_topic), pulsar_client)
self.pulsar_client = pulsar_client
self.input_serdes = {}
self.consumers = {}
@@ -174,7 +177,11 @@ def actual_execution(self):
continue
self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic)
output_object = None
self.saved_log_handler = None
try:
if self.log_topic_handler is not None:
self.saved_log_handler = log.remove_all_handlers()
log.add_handler(self.log_topic_handler)
start_time = time.time()
self.current_stats.increment_processed(int(start_time) * 1000)
self.total_stats.increment_processed(int(start_time) * 1000)
@@ -188,9 +195,16 @@ def actual_execution(self):
self.current_stats.increment_successfully_processed(latency)
self.process_result(output_object, msg)
except Exception as e:
if self.log_topic_handler is not None:
log.remove_all_handlers()
log.add_handler(self.saved_log_handler)
Log.exception("Exception while executing user method")
self.total_stats.record_user_exception(e)
self.current_stats.record_user_exception(e)
finally:
if self.log_topic_handler is not None:
log.remove_all_handlers()
log.add_handler(self.saved_log_handler)

def done_producing(self, consumer, orig_message, result, sent_message):
if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
Original file line number Diff line number Diff line change
@@ -73,6 +73,7 @@ def main():
parser.add_argument('--logging_directory', required=True, help='Logging Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages')

args = parser.parse_args()
log_file = os.path.join(args.logging_directory, args.logging_file + ".log.0")
@@ -117,7 +118,8 @@ def main():
pulsar_client = pulsar.Client(args.pulsar_serviceurl)
pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
str(args.function_version), function_config,
int(args.max_buffered_tuples), str(args.py), pulsar_client)
int(args.max_buffered_tuples), str(args.py),
args.log_topic, pulsar_client)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)

0 comments on commit a8a595d

Please sign in to comment.