From 07917f659c7a9c9dbb976ef537a311bac2e98d56 Mon Sep 17 00:00:00 2001 From: Ke Wu Date: Thu, 4 Jun 2020 19:30:21 -0700 Subject: [PATCH] SAMZA-2542: Fix integration-tests Issue: ./bin/integration-tests fail Causes: integration tests have not been updated to comply with config loader. Changes: Update properties file to include job.config.loader.*, update launch script from run-job.sh to run-app.sh Tests: ./bin/integration-tests.sh . yarn-integration-tests && ./bin/integration-tests.sh . standalone-integration-tests API Changes: None Upgrade Instructions: None Usage Instructions: None Author: Ke Wu Reviewers: mynameborat Closes #1377 from kw2542/SAMZA-2542 --- samza-test/src/main/config/negate-number.properties | 4 ++++ .../src/main/config/perf/container-performance.properties | 4 ++++ .../config/perf/kafka-read-write-performance.properties | 4 ++++ samza-test/src/main/python/deployment.py | 2 -- samza-test/src/main/python/samza_job_yarn_deployer.py | 6 ++---- samza-test/src/main/python/tests/smoke_tests.py | 4 ++-- 6 files changed, 16 insertions(+), 8 deletions(-) diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties index 01b7f7e463..1829008732 100644 --- a/samza-test/src/main/config/negate-number.properties +++ b/samza-test/src/main/config/negate-number.properties @@ -22,6 +22,10 @@ job.name=samza-negate-number # YARN yarn.container.count=1 +# Config Loader +job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory +job.config.loader.properties.path=./__package/config/negate-number.properties + # Task task.class=org.apache.samza.test.integration.NegateNumberTask task.inputs=kafka.samza-test-topic diff --git a/samza-test/src/main/config/perf/container-performance.properties b/samza-test/src/main/config/perf/container-performance.properties index 7dcab027c5..1f6656b6aa 100644 --- a/samza-test/src/main/config/perf/container-performance.properties +++ b/samza-test/src/main/config/perf/container-performance.properties @@ -23,6 +23,10 @@ job.name=container-performance yarn.container.count=1 yarn.container.memory.mb=4096 +# Config Loader +job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory +job.config.loader.properties.path=./__package/config/perf/container-performance.properties + # Task task.opts=-Xmx3072m -XX:+UseConcMarkSweepGC task.class=org.apache.samza.test.performance.TestPerformanceTask diff --git a/samza-test/src/main/config/perf/kafka-read-write-performance.properties b/samza-test/src/main/config/perf/kafka-read-write-performance.properties index f5ca60193f..9bf7263b6b 100644 --- a/samza-test/src/main/config/perf/kafka-read-write-performance.properties +++ b/samza-test/src/main/config/perf/kafka-read-write-performance.properties @@ -22,6 +22,10 @@ job.name=kafka-read-write-performance # YARN yarn.container.count=1 +# Config Loader +job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory +job.config.loader.properties.path=./__package/config/perf/kafka-read-write-performance.properties + # Task task.class=org.apache.samza.test.performance.TestPerformanceTask task.inputs=kafka.kafka-read-write-performance-input diff --git a/samza-test/src/main/python/deployment.py b/samza-test/src/main/python/deployment.py index 0b72cc6b76..a4a45467f3 100644 --- a/samza-test/src/main/python/deployment.py +++ b/samza-test/src/main/python/deployment.py @@ -17,7 +17,6 @@ import os import logging -import shutil import urllib import zopkio.runtime as runtime import zopkio.adhoc_deployer as adhoc_deployer @@ -76,7 +75,6 @@ def setup_suite(): # Setup Samza job deployer. samza_job_deployer = SamzaJobYarnDeployer({ - 'config_loader_factory': c('samza_config_loader_factory'), 'yarn_site_template': c('yarn_site_template'), 'yarn_driver_configs': c('yarn_driver_configs'), 'yarn_nm_hosts': c('yarn_nm_hosts').values(), diff --git a/samza-test/src/main/python/samza_job_yarn_deployer.py b/samza-test/src/main/python/samza_job_yarn_deployer.py index 8a785ac30c..e1dc92ea6b 100644 --- a/samza-test/src/main/python/samza_job_yarn_deployer.py +++ b/samza-test/src/main/python/samza_job_yarn_deployer.py @@ -107,7 +107,6 @@ def start(self, job_id, configs={}): package_id: The package_id for the package that contains the code for job_id. Usually, the package_id refers to the .tgz job tarball that contains the code necessary to run job_id. - config_loader_factory: The config loader factory to use to decode the config_file. config_file: Path to the config file for the job to be run. install_path: Path where the package for the job has been installed on remote NMs. properties: (optional) [(property-name,property-value)] Optional override @@ -115,18 +114,17 @@ def start(self, job_id, configs={}): config_file's properties. """ configs = self._get_merged_configs(configs) - self._validate_configs(configs, ['package_id', 'config_loader_factory', 'config_file', 'install_path']) + self._validate_configs(configs, ['package_id', 'config_file', 'install_path']) # Get configs. package_id = configs.get('package_id') - config_loader_factory = configs.get('config_loader_factory') config_file = configs.get('config_file') install_path = configs.get('install_path') properties = configs.get('properties', {}) properties['yarn.package.path'] = 'file:' + os.path.join(install_path, self._get_package_tgz_name(package_id)) # Execute bin/run-job.sh locally from driver machine. - command = "{0} --config job.config.loader.factory={1} --config job.config.loader.properties.path={2}".format(os.path.join(package_id, "bin/run-job.sh"), config_loader_factory, os.path.join(package_id, config_file)) + command = "{0} --config-path={1}".format(os.path.join(package_id, "bin/run-app.sh"), os.path.join(package_id, config_file)) env = self._get_env_vars(package_id) for property_name, property_value in properties.iteritems(): command += " --config {0}={1}".format(property_name, property_value) diff --git a/samza-test/src/main/python/tests/smoke_tests.py b/samza-test/src/main/python/tests/smoke_tests.py index 53d5fa9a22..a8fb9b1577 100644 --- a/samza-test/src/main/python/tests/smoke_tests.py +++ b/samza-test/src/main/python/tests/smoke_tests.py @@ -32,7 +32,7 @@ def test_samza_job(): """ - Runs a job that reads converts input strings to integers, negates the + Runs a job that reads converts input strings to integers, negates the integer, and outputs to a Kafka topic. """ _load_data() @@ -41,7 +41,7 @@ def test_samza_job(): def validate_samza_job(): """ - Validates that negate-number negated all messages, and sent the output to + Validates that negate-number negated all messages, and sent the output to samza-test-topic-output. """ logger.info('Running validate_samza_job')