Skip to content

Commit

Permalink
SAMZA-2542: Fix integration-tests
Browse files Browse the repository at this point in the history
Issue: ./bin/integration-tests fail
Causes: integration tests have not been updated to comply with config loader.
Changes: Update properties file to include job.config.loader.*, update launch script from run-job.sh to run-app.sh
Tests: ./bin/integration-tests.sh . yarn-integration-tests && ./bin/integration-tests.sh . standalone-integration-tests
API Changes: None
Upgrade Instructions: None
Usage Instructions: None

Author: Ke Wu <[email protected]>

Reviewers: mynameborat <[email protected]>

Closes apache#1377 from kw2542/SAMZA-2542
kw2542 authored and mynameborat committed Jun 5, 2020
1 parent c4f2c5d commit 07917f6
Showing 6 changed files with 16 additions and 8 deletions.
4 changes: 4 additions & 0 deletions samza-test/src/main/config/negate-number.properties
Original file line number Diff line number Diff line change
@@ -22,6 +22,10 @@ job.name=samza-negate-number
# YARN
yarn.container.count=1

# Config Loader
job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
job.config.loader.properties.path=./__package/config/negate-number.properties

# Task
task.class=org.apache.samza.test.integration.NegateNumberTask
task.inputs=kafka.samza-test-topic
Original file line number Diff line number Diff line change
@@ -23,6 +23,10 @@ job.name=container-performance
yarn.container.count=1
yarn.container.memory.mb=4096

# Config Loader
job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
job.config.loader.properties.path=./__package/config/perf/container-performance.properties

# Task
task.opts=-Xmx3072m -XX:+UseConcMarkSweepGC
task.class=org.apache.samza.test.performance.TestPerformanceTask
Original file line number Diff line number Diff line change
@@ -22,6 +22,10 @@ job.name=kafka-read-write-performance
# YARN
yarn.container.count=1

# Config Loader
job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
job.config.loader.properties.path=./__package/config/perf/kafka-read-write-performance.properties

# Task
task.class=org.apache.samza.test.performance.TestPerformanceTask
task.inputs=kafka.kafka-read-write-performance-input
2 changes: 0 additions & 2 deletions samza-test/src/main/python/deployment.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@

import os
import logging
import shutil
import urllib
import zopkio.runtime as runtime
import zopkio.adhoc_deployer as adhoc_deployer
@@ -76,7 +75,6 @@ def setup_suite():

# Setup Samza job deployer.
samza_job_deployer = SamzaJobYarnDeployer({
'config_loader_factory': c('samza_config_loader_factory'),
'yarn_site_template': c('yarn_site_template'),
'yarn_driver_configs': c('yarn_driver_configs'),
'yarn_nm_hosts': c('yarn_nm_hosts').values(),
6 changes: 2 additions & 4 deletions samza-test/src/main/python/samza_job_yarn_deployer.py
Original file line number Diff line number Diff line change
@@ -107,26 +107,24 @@ def start(self, job_id, configs={}):
package_id: The package_id for the package that contains the code for job_id.
Usually, the package_id refers to the .tgz job tarball that contains the
code necessary to run job_id.
config_loader_factory: The config loader factory to use to decode the config_file.
config_file: Path to the config file for the job to be run.
install_path: Path where the package for the job has been installed on remote NMs.
properties: (optional) [(property-name,property-value)] Optional override
properties for the run-job.sh script. These properties override the
config_file's properties.
"""
configs = self._get_merged_configs(configs)
self._validate_configs(configs, ['package_id', 'config_loader_factory', 'config_file', 'install_path'])
self._validate_configs(configs, ['package_id', 'config_file', 'install_path'])

# Get configs.
package_id = configs.get('package_id')
config_loader_factory = configs.get('config_loader_factory')
config_file = configs.get('config_file')
install_path = configs.get('install_path')
properties = configs.get('properties', {})
properties['yarn.package.path'] = 'file:' + os.path.join(install_path, self._get_package_tgz_name(package_id))

# Execute bin/run-job.sh locally from driver machine.
command = "{0} --config job.config.loader.factory={1} --config job.config.loader.properties.path={2}".format(os.path.join(package_id, "bin/run-job.sh"), config_loader_factory, os.path.join(package_id, config_file))
command = "{0} --config-path={1}".format(os.path.join(package_id, "bin/run-app.sh"), os.path.join(package_id, config_file))
env = self._get_env_vars(package_id)
for property_name, property_value in properties.iteritems():
command += " --config {0}={1}".format(property_name, property_value)
4 changes: 2 additions & 2 deletions samza-test/src/main/python/tests/smoke_tests.py
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@

def test_samza_job():
"""
Runs a job that reads converts input strings to integers, negates the
Runs a job that reads converts input strings to integers, negates the
integer, and outputs to a Kafka topic.
"""
_load_data()
@@ -41,7 +41,7 @@ def test_samza_job():

def validate_samza_job():
"""
Validates that negate-number negated all messages, and sent the output to
Validates that negate-number negated all messages, and sent the output to
samza-test-topic-output.
"""
logger.info('Running validate_samza_job')

0 comments on commit 07917f6

Please sign in to comment.