Skip to content

Commit

Permalink
[improve][fn] support reading config options from file in Function Py…
Browse files Browse the repository at this point in the history
…thon Runner (apache#18951)

Signed-off-by: laminar <[email protected]>
  • Loading branch information
tpiperatgod authored Feb 8, 2023
1 parent 524288c commit 6506f9b
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 21 deletions.
4 changes: 3 additions & 1 deletion pulsar-functions/instance/src/main/python/python_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def __init__(self,
pulsar_client,
secrets_provider,
cluster_name,
state_storage_serviceurl):
state_storage_serviceurl,
config_file):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
self.user_code = user_code
# set queue size to one since consumers already have internal queues. Just use queue to communicate message from
Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(self,
instance_id, cluster_name,
"%s/%s/%s" % (function_details.tenant, function_details.namespace, function_details.name)]
self.stats = Stats(self.metrics_labels)
self.config_file = config_file

def health_check(self):
self.last_health_check_ts = time.time()
Expand Down
107 changes: 87 additions & 20 deletions pulsar-functions/instance/src/main/python/python_instance_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,47 +49,113 @@
to_run = True
Log = log.Log


def atexit_function(signo, _frame):
global to_run
Log.info("Interrupted by %d, shutting down" % signo)
to_run = False

def main():
# Setup signal handlers
signal.signal(signal.SIGTERM, atexit_function)
signal.signal(signal.SIGHUP, atexit_function)
signal.signal(signal.SIGINT, atexit_function)

def generate_arguments_parser():
parser = argparse.ArgumentParser(description='Pulsar Functions Python Instance')
parser.add_argument('--function_details', required=True, help='Function Details Json String')
parser.add_argument('--py', required=True, help='Full Path of Function Code File')
parser.add_argument('--instance_id', required=True, help='Instance Id')
parser.add_argument('--function_id', required=True, help='Function Id')
parser.add_argument('--function_version', required=True, help='Function Version')
parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url')
parser.add_argument('--function_details', required=False, help='Function Details Json String')
parser.add_argument('--py', required=False, help='Full Path of Function Code File')
parser.add_argument('--instance_id', required=False, help='Instance Id')
parser.add_argument('--function_id', required=False, help='Function Id')
parser.add_argument('--function_version', required=False, help='Function Version')
parser.add_argument('--pulsar_serviceurl', required=False, help='Pulsar Service Url')
parser.add_argument('--client_auth_plugin', required=False, help='Client authentication plugin')
parser.add_argument('--client_auth_params', required=False, help='Client authentication params')
parser.add_argument('--use_tls', required=False, help='Use tls')
parser.add_argument('--tls_allow_insecure_connection', required=False, help='Tls allow insecure connection')
parser.add_argument('--hostname_verification_enabled', required=False, help='Enable hostname verification')
parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust cert file path')
parser.add_argument('--port', required=True, help='Instance Port', type=int)
parser.add_argument('--metrics_port', required=True, help="Port metrics will be exposed on", type=int)
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
parser.add_argument('--logging_directory', required=True, help='Logging Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
parser.add_argument('--port', required=False, help='Instance Port', type=int)
parser.add_argument('--metrics_port', required=False, help="Port metrics will be exposed on", type=int)
parser.add_argument('--max_buffered_tuples', required=False, help='Maximum number of Buffered tuples')
parser.add_argument('--logging_directory', required=False, help='Logging Directory')
parser.add_argument('--logging_file', required=False, help='Log file name')
parser.add_argument('--logging_level', required=False, help='Logging level')
parser.add_argument('--logging_config_file', required=True, help='Config file for logging')
parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
parser.add_argument('--logging_config_file', required=False, help='Config file for logging')
parser.add_argument('--expected_healthcheck_interval', required=False, help='Expected time in seconds between health checks', type=int)
parser.add_argument('--secrets_provider', required=False, help='The classname of the secrets provider')
parser.add_argument('--secrets_provider_config', required=False, help='The config that needs to be passed to secrets provider')
parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from')
parser.add_argument('--state_storage_serviceurl', required=False, help='Managed State Storage Service Url')
parser.add_argument('--cluster_name', required=True, help='The name of the cluster this instance is running on')
parser.add_argument('--cluster_name', required=False, help='The name of the cluster this instance is running on')
parser.add_argument('--config_file', required=False, default="", help='Configuration file name', type=str)
return parser

def merge_arguments(args, config_file):
"""
This function is used to merge arguments passed in via the command line
and those passed in via the configuration file during initialization.
:param args: arguments passed in via the command line
:param config_file: configuration file name (path)
During the merge process, the arguments passed in via the command line have higher priority,
so only optional arguments need to be merged.
"""
if config_file is None:
return
config = util.read_config(config_file)
if not config:
return
default_config = config["DEFAULT"]
if not default_config:
return
for k, v in vars(args).items():
if k == "config_file":
continue
if not v and default_config.get(k, None):
vars(args)[k] = default_config.get(k)


def validate_arguments(args):
"""
This function is used to verify the merged arguments,
mainly to check whether the mandatory arguments are assigned properly.
:param args: arguments after merging
"""
mandatory_args_map = {
"function_details": args.function_details,
"py": args.py,
"instance_id": args.instance_id,
"function_id": args.function_id,
"function_version": args.function_version,
"pulsar_serviceurl": args.pulsar_serviceurl,
"port": args.port,
"metrics_port": args.metrics_port,
"max_buffered_tuples": args.max_buffered_tuples,
"logging_directory": args.logging_directory,
"logging_file": args.logging_file,
"logging_config_file": args.logging_config_file,
"expected_healthcheck_interval": args.expected_healthcheck_interval,
"cluster_name": args.cluster_name
}
missing_args = []
for k, v in mandatory_args_map.items():
if v is None:
missing_args.append(k)
if missing_args:
print("The following arguments are required:", missing_args)
sys.exit(1)


def main():
# Setup signal handlers
signal.signal(signal.SIGTERM, atexit_function)
signal.signal(signal.SIGHUP, atexit_function)
signal.signal(signal.SIGINT, atexit_function)

parser = generate_arguments_parser()
args = parser.parse_args()
merge_arguments(args, args.config_file)
validate_arguments(args)
function_details = Function_pb2.FunctionDetails()
args.function_details = str(args.function_details)
if args.function_details[0] == '\'':
Expand Down Expand Up @@ -216,7 +282,8 @@ def main():
pulsar_client,
secrets_provider,
args.cluster_name,
state_storage_serviceurl)
state_storage_serviceurl,
args.config_file)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)

Expand Down
19 changes: 19 additions & 0 deletions pulsar-functions/instance/src/main/python/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import inspect
import sys
import importlib
import configparser

from threading import Timer
from pulsar.functions import serde

Expand Down Expand Up @@ -80,6 +82,23 @@ def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id):
def get_properties(fullyQualifiedName, instanceId):
return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)}

def read_config(config_file):
"""
The content of the configuration file is styled as follows:
[DEFAULT]
parameter1 = value1
parameter2 = value2
parameter3 = value3
...
"""
if config_file == "":
return None

cfg = configparser.ConfigParser()
cfg.read(config_file)
return cfg

class FixedTimer():

def __init__(self, t, hFunction, name="timer-thread"):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#


# DEPENDENCIES: unittest2
import python_instance_main

import os
import log
import unittest

class TestContextImpl(unittest.TestCase):

def Any(cls):
class Any(cls):
def __eq__(self, other):
return True
return Any()

def setUp(self):
log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + "/conf/functions-logging/console_logging_config.ini")

def test_arguments(self):
parser = python_instance_main.generate_arguments_parser()
argv = [
"--function_details", "test_function_details",
"--py", "test_py",
"--instance_id", "test_instance_id",
"--function_id", "test_function_id",
"--function_version", "test_function_version",
"--pulsar_serviceurl", "test_pulsar_serviceurl",
"--client_auth_plugin", "test_client_auth_plugin",
"--client_auth_params", "test_client_auth_params",
"--tls_allow_insecure_connection", "true",
"--hostname_verification_enabled", "true",
"--tls_trust_cert_path", "test_tls_trust_cert_path",
"--port", "1000",
"--metrics_port", "1001",
"--max_buffered_tuples", "100",
"--config_file", "test_python_runtime_config.ini"
]
args = parser.parse_args(argv)
python_instance_main.merge_arguments(args, args.config_file)
# argument from command line test
self.assertEqual(args.function_details, "test_function_details")
# argument from config file test
self.assertEqual(args.use_tls, "true")
# argument read priority test
self.assertEqual(args.port, 1000)
# mandatory argument test
self.assertEqual(args.expected_healthcheck_interval, "50")
# optional argument test
self.assertEqual(args.secrets_provider, None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

[DEFAULT]
port=5000
metrics_port=5001
use_tls=true
logging_directory=test_logging_directory
logging_file=test_logging_file
logging_config_file=test_logging_config_file
expected_healthcheck_interval=50

0 comments on commit 6506f9b

Please sign in to comment.