Skip to content

Commit

Permalink
[apache#24515] Ensure that Go pipelines on Dataflow are not allowed t…
Browse files Browse the repository at this point in the history
…o opt out of runner v2. (apache#24767)

* [apache#24515] Ensure that Go pipelines on Dataflow are not allowed to opt out of runner v2.

This is towards apache#24515

* Drop the unused worker jar and update breaking changes.
  • Loading branch information
lukecwik authored Dec 27, 2022
1 parent 96f4391 commit bcfdb7b
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 84 deletions.
6 changes: 3 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@

## Breaking Changes

* Python streaming pipelines and portable Python batch pipelines on Dataflow are required to
* Go pipelines, Python streaming pipelines, and portable Python batch pipelines on Dataflow are required to
use Runner V2. The `disable_runner_v2`, `disable_runner_v2_until_2023`, `disable_prime_runner_v2`
experiments will raise an error during pipeline construction. Note that non-portable Python
batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515))
experiments will raise an error during pipeline construction. You can no longer specify the Dataflow worker
jar override. Note that non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)).

## Deprecations

Expand Down
1 change: 0 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ createCrossLanguageValidatesRunnerTask(
"--region ${dataflowRegion}",
"--tests \"./test/integration/xlang ./test/integration/io/xlang/...\"",
"--sdk_overrides \".*java.*,${dockerJavaImageContainer}:${dockerTag}\"",
"--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}",
],
)

Expand Down
65 changes: 48 additions & 17 deletions sdks/go/pkg/beam/runners/dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
Expand Down Expand Up @@ -71,7 +72,6 @@ var (
tempLocation = flag.String("temp_location", "", "Temp location (optional)")
machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)")
minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)")
workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)")
workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)")
workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)")
dataflowServiceOptions = flag.String("dataflow_service_options", "", "Comma separated list of additional job modes and configurations (optional)")
Expand Down Expand Up @@ -177,25 +177,26 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
panic("Beam has not been initialized. Call beam.Init() before pipeline construction.")
}

edges, nodes, err := p.Build()
if err != nil {
return nil, err
}
streaming := !graph.Bounded(nodes)

beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
opts, err := getJobOptions(ctx)
opts, err := getJobOptions(ctx, streaming)
if err != nil {
return nil, err
}

// (1) Build and submit
// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker".
id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())

modelURL := gcsx.Join(*stagingLocation, id, "model")
workerURL := gcsx.Join(*stagingLocation, id, "worker")
jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
xlangURL := gcsx.Join(*stagingLocation, id, "xlang")

edges, _, err := p.Build()
if err != nil {
return nil, err
}
artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL)
if err != nil {
return nil, errors.WithContext(err, "resolving cross-language artifacts")
Expand All @@ -221,25 +222,25 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
log.Info(ctx, "Dry-run: not submitting job!")

log.Info(ctx, proto.MarshalTextString(model))
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, jarURL, modelURL)
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL)
if err != nil {
return nil, err
}
dataflowlib.PrintJob(ctx, job)
return nil, nil
}

return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, modelURL, *endpoint, *jobopts.Async)
return dataflowlib.Execute(ctx, model, opts, workerURL, modelURL, *endpoint, *jobopts.Async)
}

func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions, error) {
project := gcpopts.GetProjectFromFlagOrEnvironment(ctx)
if project == "" {
return nil, errors.New("no Google Cloud project specified. Use --project=<project>")
}
region := gcpopts.GetRegion(ctx)
if region == "" {
return nil, errors.New("No Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
return nil, errors.New("no Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
}
if *stagingLocation == "" {
return nil, errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
Expand Down Expand Up @@ -269,6 +270,9 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
}
}
if !streaming && *transformMapping != "" {
return nil, errors.New("provided transform_name_mapping for a batch pipeline, did you mean to construct a streaming pipeline?")
}
if !*update && *transformMapping != "" {
return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated")
}
Expand All @@ -282,24 +286,51 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
hooks.SerializeHooksToOptions()

experiments := jobopts.GetExperiments()
// Always use runner v2, unless set already.
var v2set, portaSubmission bool
// Ensure that we enable the same set of experiments across all SDKs
// for runner v2.
var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
for _, e := range experiments {
if strings.Contains(e, "use_runner_v2") || strings.Contains(e, "use_unified_worker") {
if strings.Contains(e, "beam_fn_api") {
fnApiSet = true
}
if strings.Contains(e, "use_runner_v2") {
v2set = true
}
if strings.Contains(e, "use_unified_worker") {
uwSet = true
}
if strings.Contains(e, "use_portable_job_submission") {
portaSubmission = true
}
if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") {
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
}
}
// Enable default experiments.
if !fnApiSet {
experiments = append(experiments, "beam_fn_api")
}
// Enable by default unified worker, and portable job submission.
if !v2set {
experiments = append(experiments, "use_runner_v2")
}
if !uwSet {
experiments = append(experiments, "use_unified_worker")
}
if !portaSubmission {
experiments = append(experiments, "use_portable_job_submission")
}

// Ensure that streaming specific experiments are set for streaming pipelines
// since runner v2 only supports using streaming engine.
if streaming {
if !seSet {
experiments = append(experiments, "enable_streaming_engine")
}
if !wsSet {
experiments = append(experiments, "enable_windmill_service")
}
}

if *minCPUPlatform != "" {
experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform))
}
Expand All @@ -312,6 +343,7 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
opts := &dataflowlib.JobOptions{
Name: jobopts.GetJobName(),
Streaming: streaming,
Experiments: experiments,
DataflowServiceOptions: dfServiceOptions,
Options: beam.PipelineOptions.Export(),
Expand All @@ -335,7 +367,6 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
TempLocation: *tempLocation,
TemplateLocation: *templateLocation,
Worker: *jobopts.WorkerBinary,
WorkerJar: *workerJar,
WorkerRegion: *workerRegion,
WorkerZone: *workerZone,
TeardownPolicy: *teardownPolicy,
Expand Down
Loading

0 comments on commit bcfdb7b

Please sign in to comment.