Skip to content

Commit

Permalink
Merge pull request apache#8681 from robinyqiu/resilience
Browse files Browse the repository at this point in the history
[BEAM-6777] Add enable_health_checker flag for Dataflow FnAPI worker (Python)
  • Loading branch information
angoenka authored Jun 1, 2019
2 parents 0b58c88 + 265fb6b commit 088c011
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 8 deletions.
17 changes: 12 additions & 5 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,27 @@ def __init__(self, packages, options, environment_version, pipeline_url):
key='major', value=to_json_value(environment_version))])
# TODO: Use enumerated type instead of strings for job types.
if job_type.startswith('FNAPI_'):
self.debug_options.experiments = self.debug_options.experiments or []
debug_options_experiments = self.debug_options.experiments
runner_harness_override = (
get_runner_harness_container_image())
self.debug_options.experiments = self.debug_options.experiments or []
if runner_harness_override:
self.debug_options.experiments.append(
debug_options_experiments.append(
'runner_harness_container_image=' + runner_harness_override)
# Add use_multiple_sdk_containers flag if its not already present. Do not
# Add use_multiple_sdk_containers flag if it's not already present. Do not
# add the flag if 'no_use_multiple_sdk_containers' is present.
# TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
# till version 2.4.
debug_options_experiments = self.debug_options.experiments
if ('use_multiple_sdk_containers' not in debug_options_experiments and
'no_use_multiple_sdk_containers' not in debug_options_experiments):
self.debug_options.experiments.append('use_multiple_sdk_containers')
debug_options_experiments.append('use_multiple_sdk_containers')
# Add enable_health_checker flag if it's not already present. Do not
# add the flag if 'disable_health_checker' is present.
# TODO[BEAM-7466]: Cleanup enable_health_checker once Python SDK 2.13
# becomes unsupported.
if ('enable_health_checker' not in debug_options_experiments and
'disable_health_checker' not in debug_options_experiments):
debug_options_experiments.append('enable_health_checker')
# FlexRS
if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
self.proto.flexResourceSchedulingGoal = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def test_experiment_use_multiple_sdk_containers(self):
'--experiments', 'beam_fn_api'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertIn("use_multiple_sdk_containers", environment.proto.experiments)
self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)

pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
Expand All @@ -488,7 +488,7 @@ def test_experiment_use_multiple_sdk_containers(self):
'--experiments', 'use_multiple_sdk_containers'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertIn("use_multiple_sdk_containers", environment.proto.experiments)
self.assertIn('use_multiple_sdk_containers', environment.proto.experiments)

pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
Expand All @@ -498,7 +498,35 @@ def test_experiment_use_multiple_sdk_containers(self):
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertNotIn(
"use_multiple_sdk_containers", environment.proto.experiments)
'use_multiple_sdk_containers', environment.proto.experiments)

def test_experiment_enable_health_checker(self):
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
'--temp_location', 'gs://test-location/temp',
'--experiments', 'beam_fn_api'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertIn('enable_health_checker', environment.proto.experiments)

pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
'--temp_location', 'gs://test-location/temp',
'--experiments', 'beam_fn_api',
'--experiments', 'enable_health_checker'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertIn('enable_health_checker', environment.proto.experiments)

pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
'--temp_location', 'gs://test-location/temp',
'--experiments', 'beam_fn_api',
'--experiments', 'disable_health_checker'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertNotIn('enable_health_checker', environment.proto.experiments)
self.assertIn('disable_health_checker', environment.proto.experiments)

@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.sys.version_info',
Expand Down

0 comments on commit 088c011

Please sign in to comment.