Skip to content

Commit

Permalink
Merge pull request apache#12576: [BEAM-10671] Add environment configu…
Browse files Browse the repository at this point in the history
…ration fields as first-class pipeline options
  • Loading branch information
mxm authored Sep 30, 2020
2 parents 36268e1 + c032963 commit d0fbcb2
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def chicagoTaxiJob = { scope ->
def pipelineOptions = [
parallelism : numberOfWorkers,
job_endpoint : 'localhost:8099',
environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_python3.7_sdk:latest",
environment_options : "docker_container_image=${DOCKER_CONTAINER_REGISTRY}/beam_python3.7_sdk:latest",
environment_type : 'DOCKER',
execution_mode_for_batch: 'BATCH_FORCED',
]
Expand Down
4 changes: 2 additions & 2 deletions runners/flink/job-server/test_flink_uber_jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ if [[ "$SAVE_MAIN_SESSION" -eq 0 ]]; then
--flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
--parallelism 1 \
--environment_type DOCKER \
--environment_config "$PYTHON_CONTAINER_IMAGE" \
--environment_options "docker_container_image=$PYTHON_CONTAINER_IMAGE" \
--flink_master "localhost:$FLINK_PORT" \
--flink_submit_uber_jar \
) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
Expand All @@ -149,7 +149,7 @@ else
--flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
--parallelism 1 \
--environment_type DOCKER \
--environment_config "$PYTHON_CONTAINER_IMAGE" \
--environment_options "docker_container_image=$PYTHON_CONTAINER_IMAGE" \
--flink_master "localhost:$FLINK_PORT" \
--flink_submit_uber_jar \
--save_main_session
Expand Down
2 changes: 1 addition & 1 deletion runners/portability/test_pipeline_jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ OUTPUT_JAR=flink-test-$(date +%Y%m%d-%H%M%S).jar
--parallelism 1 \
--sdk_worker_parallelism 1 \
--environment_type DOCKER \
--environment_config=$PYTHON_CONTAINER_IMAGE \
--environment_options "docker_container_image=$PYTHON_CONTAINER_IMAGE" \
) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting

if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then
Expand Down
35 changes: 34 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,20 @@ def _add_argparse_args(cls, parser):
'form {"os": "<OS>", "arch": "<ARCHITECTURE>", "command": '
'"<process to execute>", "env":{"<Environment variables 1>": '
'"<ENV_VAL>"} }. All fields in the json are optional except '
'command.'))
'command.\n\nPrefer using --environment_options instead.'))
parser.add_argument(
'--environment_option',
'--environment_options',
dest='environment_options',
action='append',
default=None,
help=(
'Environment configuration for running the user code. '
'Recognized options depend on --environment_type.\n '
'For DOCKER: docker_container_image (optional)\n '
'For PROCESS: process_command (required), process_variables '
'(optional, comma-separated)\n '
'For EXTERNAL: external_service_address (required)'))
parser.add_argument(
'--sdk_worker_parallelism',
default=1,
Expand All @@ -1135,6 +1148,26 @@ def _add_argparse_args(cls, parser):
'Create an executable jar at this path rather than running '
'the pipeline.'))

def validate(self, validator):
return validator.validate_environment_options(self)

def add_environment_option(self, option):
# pylint: disable=access-member-before-definition
if self.environment_options is None:
self.environment_options = []
if option not in self.environment_options:
self.environment_options.append(option)

def lookup_environment_option(self, key, default=None):
if not self.environment_options:
return default
elif key in self.environment_options:
return True
for option in self.environment_options:
if option.startswith(key + '='):
return option.split('=', 1)[1]
return default


class JobServerOptions(PipelineOptions):
"""Options for starting a Beam job server. Roughly corresponds to
Expand Down
68 changes: 66 additions & 2 deletions sdks/python/apache_beam/options/pipeline_options_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TestOptions
Expand All @@ -56,13 +57,28 @@ class PipelineOptionsValidator(object):
OPTIONS = [
DebugOptions,
GoogleCloudOptions,
PortableOptions,
SetupOptions,
StandardOptions,
TestOptions,
TypeOptions,
WorkerOptions,
TestOptions
WorkerOptions
]

# Mutually exclusive options for different types of portable environments.
REQUIRED_ENVIRONMENT_OPTIONS = {
'DOCKER': [],
'PROCESS': ['process_command'],
'EXTERNAL': ['external_service_address'],
'LOOPBACK': []
}
OPTIONAL_ENVIRONMENT_OPTIONS = {
'DOCKER': ['docker_container_image'],
'PROCESS': ['process_variables'],
'EXTERNAL': [],
'LOOPBACK': []
}

# Possible validation errors.
ERR_MISSING_OPTION = 'Missing required option: %s.'
ERR_MISSING_GCS_PATH = 'Missing GCS path option: %s.'
Expand Down Expand Up @@ -93,6 +109,12 @@ class PipelineOptionsValidator(object):
ERR_INVALID_TRANSFORM_NAME_MAPPING = (
'Invalid transform name mapping format. Please make sure the mapping is '
'string key-value pairs. Invalid pair: (%s:%s)')
ERR_INVALID_ENVIRONMENT = (
'Option %s is not compatible with environment type %s.')
ERR_ENVIRONMENT_CONFIG = (
'Option environment_config is incompatible with option(s) %s.')
ERR_MISSING_REQUIRED_ENVIRONMENT_OPTION = (
'Option %s is required for environment type %s.')

# GCS path specific patterns.
GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
Expand Down Expand Up @@ -267,3 +289,45 @@ def validate_test_matcher(self, view, arg_name):
pickled_matcher,
arg_name))
return errors

def validate_environment_options(self, view):
"""Validates portable environment options."""
errors = []
actual_environment_type = (
view.environment_type.upper() if view.environment_type else None)
for environment_type, required in self.REQUIRED_ENVIRONMENT_OPTIONS.items():
found_required_options = [
opt for opt in required
if view.lookup_environment_option(opt) is not None
]
found_optional_options = [
opt for opt in self.OPTIONAL_ENVIRONMENT_OPTIONS[environment_type]
if view.lookup_environment_option(opt) is not None
]
found_options = found_required_options + found_optional_options
if environment_type == actual_environment_type:
if view.environment_config:
if found_options:
errors.extend(
self._validate_error(
self.ERR_ENVIRONMENT_CONFIG, ', '.join(found_options)))
else:
missing_options = set(required).difference(
set(found_required_options))
for opt in missing_options:
errors.extend(
self._validate_error(
self.ERR_MISSING_REQUIRED_ENVIRONMENT_OPTION,
opt,
environment_type))
else:
# Environment options classes are mutually exclusive.
for opt in found_options:
errors.extend(
self._validate_error(
self.ERR_INVALID_ENVIRONMENT, opt, actual_environment_type))
if actual_environment_type == 'LOOPBACK' and view.environment_config:
errors.extend(
self._validate_error(
self.ERR_INVALID_ENVIRONMENT, 'environment_config', 'LOOPBACK'))
return errors
192 changes: 191 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options_validator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def check_errors_for_arguments(self, errors, args):
found = True
break
if not found:
missing.append('Missing error for: ' + arg)
missing.append('Missing error for: %s.' % arg)

# Return missing and remaining (not matched) errors.
return missing + remaining
Expand Down Expand Up @@ -575,6 +575,196 @@ def test_type_check_additional_unrecognized_feature(self):
errors = validator.validate()
self.assertTrue(errors)

def test_environment_options(self):
test_cases = [
{
'options': ['--environment_type=dOcKeR'], 'errors': []
},
{
'options': [
'--environment_type=dOcKeR',
'--environment_options=docker_container_image=foo'
],
'errors': []
},
{
'options': [
'--environment_type=dOcKeR', '--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=dOcKeR',
'--environment_options=docker_container_image=foo',
'--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=dOcKeR',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo'
],
'errors': [
'process_command',
'process_variables',
'external_service_address'
]
},
{
'options': ['--environment_type=pRoCeSs'],
'errors': ['process_command']
},
{
'options': [
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo'
],
'errors': []
},
{
'options': [
'--environment_type=pRoCeSs', '--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo',
'--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=docker_container_image=foo',
'--environment_options=external_service_address=foo'
],
'errors': ['docker_container_image', 'external_service_address']
},
{
'options': ['--environment_type=eXtErNaL'],
'errors': ['external_service_address']
},
{
'options': [
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo'
],
'errors': []
},
{
'options': [
'--environment_type=eXtErNaL', '--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo',
'--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=docker_container_image=foo',
],
'errors': [
'process_command',
'process_variables',
'docker_container_image'
]
},
{
'options': ['--environment_type=lOoPbACk'], 'errors': []
},
{
'options': [
'--environment_type=lOoPbACk', '--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=lOoPbACk',
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo',
],
'errors': [
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]
},
{
'options': ['--environment_type=beam:env:foo:v1'], 'errors': []
},
{
'options': [
'--environment_type=beam:env:foo:v1',
'--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=beam:env:foo:v1',
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo',
],
'errors': [
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]
},
{
'options': [
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo',
],
'errors': [
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]
},
]
errors = []
for case in test_cases:
validator = PipelineOptionsValidator(
PipelineOptions(case['options']), MockRunners.OtherRunner())
validation_result = validator.validate()
validation_errors = self.check_errors_for_arguments(
validation_result, case['errors'])
if validation_errors:
errors.append(
'Options "%s" had unexpected validation results: "%s"' %
(' '.join(case['options']), ' '.join(validation_errors)))
self.assertEqual(errors, [])


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading

0 comments on commit d0fbcb2

Please sign in to comment.