diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 86e2c8919da9..ec7314cfbd82 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -14,12 +14,12 @@ See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
-Lang | SDK | Apex | Dataflow | Flink | Samza | Spark
+Lang | SDK | Dataflow | Flink | Samza | Spark
--- | --- | --- | --- | --- | --- | ---
-Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
-Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
-Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
-XLang | --- | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
+Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
+Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
+Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
+XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
diff --git a/.github/autolabeler.yml b/.github/autolabeler.yml
index 933f4f6d0212..20db446faa90 100644
--- a/.github/autolabeler.yml
+++ b/.github/autolabeler.yml
@@ -74,7 +74,6 @@ io: ["sdks/go/pkg/beam/io/*", "sdks/java/io/*", "sdks/python/apache_beam/io/*"]
# Runners
"runners": ["runners/*", "sdks/go/pkg/beam/runners/*", "sdks/python/runners/*"]
-"apex": ["runners/apex/*"]
"core": ["runners/core-construction-java/*", "runners/core-java/*"]
"dataflow": ["runners/google-cloud-dataflow-java/*", "sdks/go/pkg/beam/runners/dataflow/*", "sdks/python/runners/dataflow/*"]
"direct": ["runners/direct-java/*", "sdks/go/pkg/beam/runners/direct/*", "sdks/python/runners/direct/*"]
diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 08876a5d5f78..2f758384a1f2 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -66,7 +66,6 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/)
| beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11 | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11_PR/) | `Run Dataflow ValidatesRunner Java 11` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11) |
| beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Java11 | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Java11/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Java11_PR/) | `Run Dataflow PortabilityApi ValidatesRunner Java 11` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Java11/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_Java11) |
| beam_PostCommit_Java_ValidatesRunner_Direct_Java11 | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Direct_Java11), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Direct_Java11_PR) | `Run Direct ValidatesRunner Java 11` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Direct_Java11/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Direct_Java11) |
-| beam_PostCommit_Java_ValidatesRunner_Apex | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_PR/) | `Run Apex ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex) |
| beam_PostCommit_Java_ValidatesRunner_Dataflow | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/) | `Run Dataflow ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow) |
| beam_PostCommit_Java_ValidatesRunner_Flink | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_PR/) | `Run Flink ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink) |
| beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/) | `Run Dataflow PortabilityApi ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow) |
diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Apex.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Apex.groovy
deleted file mode 100644
index 3cbf3966afec..000000000000
--- a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Apex.groovy
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import CommonJobProperties as commonJobProperties
-import PostcommitJobBuilder
-
-// This job runs the suite of ValidatesRunner tests against the Apex runner.
-PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_Apex',
- 'Run Apex ValidatesRunner', 'Apache Apex Runner ValidatesRunner Tests', this) {
- description('Runs the ValidatesRunner suite on the Apex runner.')
-
- // Set common parameters.
- commonJobProperties.setTopLevelMainJobProperties(delegate)
- previousNames(/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/)
-
- // Publish all test results to Jenkins
- publishers {
- archiveJunit('**/build/test-results/**/*.xml')
- }
-
- // Gradle goals for this job.
- steps {
- gradle {
- rootBuildScriptDir(commonJobProperties.checkoutDir)
- tasks(':runners:apex:validatesRunner')
- commonJobProperties.setGradleSwitches(delegate)
- }
- }
-}
diff --git a/README.md b/README.md
index 24ac4901133d..6c4e1e23e161 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@
# Apache Beam
-[Apache Beam](http://beam.apache.org/) is a unified model for defining both batch and streaming data-parallel processing pipelines, as well as a set of language-specific SDKs for constructing pipelines and Runners for executing them on distributed processing backends, including [Apache Apex](http://apex.apache.org/), [Apache Flink](http://flink.apache.org/), [Apache Spark](http://spark.apache.org/), [Google Cloud Dataflow](http://cloud.google.com/dataflow/) and [Hazelcast Jet](https://jet.hazelcast.org/).
+[Apache Beam](http://beam.apache.org/) is a unified model for defining both batch and streaming data-parallel processing pipelines, as well as a set of language-specific SDKs for constructing pipelines and Runners for executing them on distributed processing backends, including [Apache Flink](http://flink.apache.org/), [Apache Spark](http://spark.apache.org/), [Google Cloud Dataflow](http://cloud.google.com/dataflow/) and [Hazelcast Jet](https://jet.hazelcast.org/).
## Status
@@ -32,12 +32,12 @@
### Post-commit tests status (on master branch)
-Lang | SDK | Apex | Dataflow | Flink | Samza | Spark
---- | --- | --- | --- | --- | --- | ---
-Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
-Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
-Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
-XLang | --- | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
+Lang | SDK | Dataflow | Flink | Samza | Spark
+--- | --- | --- | --- | --- | ---
+Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
+Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
+Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/) [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
+XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
## Overview
@@ -73,7 +73,6 @@ Have ideas for new SDKs or DSLs? See the [JIRA](https://issues.apache.org/jira/i
Beam supports executing programs on multiple distributed processing backends through PipelineRunners. Currently, the following PipelineRunners are available:
- The `DirectRunner` runs the pipeline on your local machine.
-- The `ApexRunner` runs the pipeline on an Apache Hadoop YARN cluster (or in embedded mode).
- The `DataflowRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
- The `FlinkRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam.
- The `SparkRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam.
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 0bbd8325d30a..114c72afa8f0 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -178,7 +178,7 @@ class BeamModulePlugin implements Plugin {
// and also for the script name, ${type}-java-${runner}.toLowerCase().
String type
- // runner [Direct, Dataflow, Spark, Flink, FlinkLocal, Apex]
+ // runner [Direct, Dataflow, Spark, Flink, FlinkLocal]
String runner
// gcpProject sets the gcpProject argument when executing examples.
@@ -378,8 +378,6 @@ class BeamModulePlugin implements Plugin {
// These versions are defined here because they represent
// a dependency version which should match across multiple
// Maven artifacts.
- def apex_core_version = "3.7.0"
- def apex_malhar_version = "3.4.0"
def aws_java_sdk_version = "1.11.718"
def aws_java_sdk2_version = "2.10.61"
def cassandra_driver_version = "3.8.0"
@@ -426,8 +424,6 @@ class BeamModulePlugin implements Plugin {
activemq_mqtt : "org.apache.activemq:activemq-mqtt:5.13.1",
antlr : "org.antlr:antlr4:4.7",
antlr_runtime : "org.antlr:antlr4-runtime:4.7",
- apex_common : "org.apache.apex:apex-common:$apex_core_version",
- apex_engine : "org.apache.apex:apex-engine:$apex_core_version",
args4j : "args4j:args4j:2.33",
avro : "org.apache.avro:avro:1.8.2",
avro_tests : "org.apache.avro:avro:1.8.2:tests",
@@ -528,7 +524,6 @@ class BeamModulePlugin implements Plugin {
junit : "junit:junit:4.13-beta-3",
kafka : "org.apache.kafka:kafka_2.11:$kafka_version",
kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version",
- malhar_library : "org.apache.apex:malhar-library:$apex_malhar_version",
mockito_core : "org.mockito:mockito-core:3.0.0",
nemo_compiler_frontend_beam : "org.apache.nemo:nemo-compiler-frontend-beam:$nemo_version",
netty_handler : "io.netty:netty-handler:$netty_version",
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 5b08348aa5c8..222a06b39ce7 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -32,7 +32,6 @@ artifact includes all Apache Beam Java SDK examples."""
* Some runners are run from separate projects, see the preCommit task below
* for details.
*/
-// TODO: Add apexRunner - https://issues.apache.org/jira/browse/BEAM-3583
def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"]
for (String runner : preCommitRunners) {
configurations.create(runner + "PreCommit")
@@ -75,8 +74,6 @@ dependencies {
delegate.add(runner + "PreCommit", project(":examples:java"))
delegate.add(runner + "PreCommit", project(path: ":examples:java", configuration: "testRuntime"))
}
- // https://issues.apache.org/jira/browse/BEAM-3583
- // apexRunnerPreCommit project(":runners:apex")
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:1.10")
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
@@ -93,7 +90,6 @@ dependencies {
* of integration tests for WordCount and WindowedWordCount.
*/
def preCommitRunnerClass = [
- apexRunner: "org.apache.beam.runners.apex.TestApexRunner",
directRunner: "org.apache.beam.runners.direct.DirectRunner",
flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner",
sparkRunner: "org.apache.beam.runners.spark.TestSparkRunner",
diff --git a/examples/kotlin/build.gradle b/examples/kotlin/build.gradle
index 6d38c64dc480..847cd808c346 100644
--- a/examples/kotlin/build.gradle
+++ b/examples/kotlin/build.gradle
@@ -35,7 +35,6 @@ artifact includes all Apache Beam Kotlin SDK examples."""
* Some runners are run from separate projects, see the preCommit task below
* for details.
*/
-// TODO: Add apexRunner - https://issues.apache.org/jira/browse/BEAM-3583
def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"]
for (String runner : preCommitRunners) {
configurations.create(runner + "PreCommit")
@@ -78,8 +77,6 @@ dependencies {
delegate.add(runner + "PreCommit", project(":examples:kotlin"))
delegate.add(runner + "PreCommit", project(path: ":examples:kotlin", configuration: "testRuntime"))
}
- // https://issues.apache.org/jira/browse/BEAM-3583
- // apexRunnerPreCommit project(":runners:apex")
directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow")
flinkRunnerPreCommit project(":runners:flink:1.10")
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
@@ -97,7 +94,6 @@ dependencies {
* of integration tests for WordCount and WindowedWordCount.
*/
def preCommitRunnerClass = [
- apexRunner: "org.apache.beam.runners.apex.TestApexRunner",
directRunner: "org.apache.beam.runners.direct.DirectRunner",
flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner",
sparkRunner: "org.apache.beam.runners.spark.TestSparkRunner",
diff --git a/learning/katas/go/Introduction/Hello Beam/Hello Beam/task.md b/learning/katas/go/Introduction/Hello Beam/Hello Beam/task.md
index beaa497e5797..c7cbc5345546 100644
--- a/learning/katas/go/Introduction/Hello Beam/Hello Beam/task.md
+++ b/learning/katas/go/Introduction/Hello Beam/Hello Beam/task.md
@@ -22,7 +22,7 @@
Apache Beam is an open source, unified model for defining both batch and streaming data-parallel
processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the
pipeline. The pipeline is then executed by one of Beam’s supported distributed processing
-back-ends, which include Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.
+back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.
Beam is particularly useful for Embarrassingly Parallel data processing tasks, in which the
problem can be decomposed into many smaller bundles of data that can be processed independently
diff --git a/learning/katas/kotlin/Introduction/Hello Beam/Hello Beam/task.md b/learning/katas/kotlin/Introduction/Hello Beam/Hello Beam/task.md
index c1ef872d6d7a..7ef5bd391c55 100644
--- a/learning/katas/kotlin/Introduction/Hello Beam/Hello Beam/task.md
+++ b/learning/katas/kotlin/Introduction/Hello Beam/Hello Beam/task.md
@@ -22,7 +22,7 @@ Welcome To Apache Beam
Apache Beam is an open source, unified model for defining both batch and streaming data-parallel
processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the
pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends,
-which include Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.
+which include Apache Flink, Apache Spark, and Google Cloud Dataflow.
Beam is particularly useful for Embarrassingly Parallel data processing tasks, in which the problem
can be decomposed into many smaller bundles of data that can be processed independently and in
diff --git a/learning/katas/python/Introduction/Hello Beam/Hello Beam/task.md b/learning/katas/python/Introduction/Hello Beam/Hello Beam/task.md
index b6df12c28d86..9f0c1b75bf65 100644
--- a/learning/katas/python/Introduction/Hello Beam/Hello Beam/task.md
+++ b/learning/katas/python/Introduction/Hello Beam/Hello Beam/task.md
@@ -22,7 +22,7 @@ Welcome To Apache Beam
Apache Beam is an open source, unified model for defining both batch and streaming data-parallel
processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the
pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends,
-which include Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.
+which include Apache Flink, Apache Spark, and Google Cloud Dataflow.
Beam is particularly useful for Embarrassingly Parallel data processing tasks, in which the problem
can be decomposed into many smaller bundles of data that can be processed independently and in
diff --git a/ownership/JAVA_DEPENDENCY_OWNERS.yaml b/ownership/JAVA_DEPENDENCY_OWNERS.yaml
index 73d146c71ee8..904f4404660e 100644
--- a/ownership/JAVA_DEPENDENCY_OWNERS.yaml
+++ b/ownership/JAVA_DEPENDENCY_OWNERS.yaml
@@ -599,21 +599,6 @@ deps:
artifact: activemq-junit
owners:
- org.apache.apex:apex-common:
- group: org.apache.apex
- artifact: apex-common
- owners:
-
- org.apache.apex:apex-engine:
- group: org.apache.apex
- artifact: apex-engine
- owners:
-
- org.apache.apex:malhar-library:
- group: org.apache.apex
- artifact: malhar-library
- owners:
-
org.apache.avro:avro:
group: org.apache.avro
artifact: avro
diff --git a/release/build.gradle b/release/build.gradle
index 21ab9d882d61..f4d9cfe35b85 100644
--- a/release/build.gradle
+++ b/release/build.gradle
@@ -32,7 +32,6 @@ task runJavaExamplesValidationTask {
description = "Run the Beam quickstart across all Java runners"
dependsOn ":runners:direct-java:runQuickstartJavaDirect"
dependsOn ":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow"
- dependsOn ":runners:apex:runQuickstartJavaApex"
dependsOn ":runners:spark:runQuickstartJavaSpark"
dependsOn ":runners:flink:1.10:runQuickstartJavaFlinkLocal"
dependsOn ":runners:direct-java:runMobileGamingJavaDirect"
diff --git a/release/src/main/groovy/MobileGamingCommands.groovy b/release/src/main/groovy/MobileGamingCommands.groovy
index 1042062e4cb5..cceca98bc42b 100644
--- a/release/src/main/groovy/MobileGamingCommands.groovy
+++ b/release/src/main/groovy/MobileGamingCommands.groovy
@@ -27,7 +27,6 @@ class MobileGamingCommands {
public static final RUNNERS = [DirectRunner: "direct-runner",
DataflowRunner: "dataflow-runner",
SparkRunner: "spark-runner",
- ApexRunner: "apex-runner",
FlinkRunner: "flink-runner"]
public static final EXECUTION_TIMEOUT_IN_MINUTES = 20
diff --git a/release/src/main/groovy/quickstart-java-apex.groovy b/release/src/main/groovy/quickstart-java-apex.groovy
deleted file mode 100644
index 3e8e00521a0e..000000000000
--- a/release/src/main/groovy/quickstart-java-apex.groovy
+++ /dev/null
@@ -1,45 +0,0 @@
-#!groovy
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-t = new TestScripts(args)
-
-/*
- * Run the Apex quickstart from https://beam.apache.org/get-started/quickstart-java/
- */
-
-t.describe 'Run Apache Beam Java SDK Quickstart - Apex'
-
- t.intent 'Gets the WordCount Example Code'
- QuickstartArchetype.generate(t)
-
- t.intent 'Runs the WordCount Code with Apex runner'
- // Run the wordcount example with the apex runner
- t.run """mvn compile exec:java -q \
- -Dexec.mainClass=org.apache.beam.examples.WordCount \
- -Dexec.args="--inputFile=pom.xml \
- --output=counts \
- --runner=ApexRunner" \
- -Papex-runner"""
-
- // Verify text from the pom.xml input file
- String result = t.run "grep Foundation counts*"
- t.see "Foundation: 1", result
-
- // Clean up
- t.done()
diff --git a/release/src/main/scripts/mass_comment.py b/release/src/main/scripts/mass_comment.py
index 4de984c9da89..e4dff8484664 100644
--- a/release/src/main/scripts/mass_comment.py
+++ b/release/src/main/scripts/mass_comment.py
@@ -33,7 +33,6 @@
"Run Java PostCommit",
"Run Java Flink PortableValidatesRunner Batch",
"Run Java Flink PortableValidatesRunner Streaming",
- "Run Apex ValidatesRunner",
"Run Dataflow ValidatesRunner",
"Run Flink ValidatesRunner",
"Run Samza ValidatesRunner",
diff --git a/release/src/main/scripts/run_rc_validation.sh b/release/src/main/scripts/run_rc_validation.sh
index 54c76aef1282..a9a21796b5b5 100755
--- a/release/src/main/scripts/run_rc_validation.sh
+++ b/release/src/main/scripts/run_rc_validation.sh
@@ -193,18 +193,6 @@ else
echo "* Skip Java quickstart with direct runner"
fi
-echo "[Current task] Java quickstart with Apex local runner"
-if [[ "$java_quickstart_apex_local" = true ]]; then
- echo "*************************************************************"
- echo "* Running Java Quickstart with Apex local runner"
- echo "*************************************************************"
- ./gradlew :runners:apex:runQuickstartJavaApex \
- -Prepourl=${REPO_URL} \
- -Pver=${RELEASE_VER}
-else
- echo "* Skip Java quickstart with Apex local runner"
-fi
-
echo "[Current task] Java quickstart with Flink local runner"
if [[ "$java_quickstart_flink_local" = true ]]; then
echo "*************************************************************"
diff --git a/runners/apex/build.gradle b/runners/apex/build.gradle
deleted file mode 100644
index 31749294adbb..000000000000
--- a/runners/apex/build.gradle
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import groovy.json.JsonOutput
-
-plugins { id 'org.apache.beam.module' }
-applyJavaNature(automaticModuleName: 'org.apache.beam.runners.apex')
-
-description = "Apache Beam :: Runners :: Apex"
-
-/*
- * We need to rely on manually specifying these evaluationDependsOn to ensure that
- * the following projects are evaluated before we evaluate this project. This is because
- * we are attempting to reference the "sourceSets.test.output" directly.
- */
-evaluationDependsOn(":sdks:java:core")
-
-configurations {
- validatesRunner
-}
-
-dependencies {
- compile project(path: ":model:pipeline", configuration: "shadow")
- compile project(path: ":sdks:java:core", configuration: "shadow")
- compile project(":runners:core-construction-java")
- compile project(":runners:core-java")
- compile library.java.apex_common
- compile library.java.malhar_library
- compile library.java.apex_engine
- compile library.java.apex_engine
- testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
- // ApexStateInternalsTest extends abstract StateInternalsTest
- testCompile project(path: ":runners:core-java", configuration: "testRuntime")
- testCompile library.java.hamcrest_core
- testCompile library.java.junit
- testCompile library.java.mockito_core
- testCompile library.java.jackson_dataformat_yaml
- validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
- validatesRunner project(path: ":runners:core-java", configuration: "testRuntime")
- validatesRunner project(project.path)
-}
-
-// TODO: Update this so that the generated file is added to the explicitly added instead of
-// just outputting the file in the correct path.
-task buildDependencyTree(type: DependencyReportTask) {
- configurations = [project.configurations.testRuntimeClasspath]
- outputFile = new File(buildDir, "classes/java/main/org/apache/beam/runners/apex/dependency-tree")
- // TODO: Migrate ApexYarnLauncher to use the Gradles dependency tree output instead of Mavens
- // so we don't have to try to replace the format of the file on the fly
- doLast {
- // Filter out lines which don't have any dependencies by looking for lines with "--- "
- ant.replaceregexp(file: outputFile, match: "^((?!--- ).)*\$", replace: "", byline: true)
- // Remove empty lines
- ant.replaceregexp(file: outputFile, match: "\\n\\n", replace: "", flags: "gm")
- // Replace strings with ":a.b.c -> x.y.z" to just be ":x.y.z" getting the used version of the dependency.
- ant.replaceregexp(file: outputFile, match: ":([^:]*) -> (.*)", replace: ":\\2", byline: true)
- // Remove a trailing " (*)" off the end to so there is nothing after the version identifier.
- ant.replaceregexp(file: outputFile, match: " \\(\\*\\)", replace: "", byline: true)
- // Add ":jar" to the maven dependency string assuming that all resource types are jars.
- ant.replaceregexp(file: outputFile, match: "[^:]*:[^:]*", replace: "\\0:jar", byline: true)
- }
-}
-compileJava.dependsOn buildDependencyTree
-
-task validatesRunnerBatch(type: Test) {
- group = "Verification"
- systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
- "--runner=TestApexRunner",
- ])
-
- classpath = configurations.validatesRunner
- testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
- useJUnit {
- includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
- excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
- excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
- excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
- excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
- excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
- excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
- excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
- excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
- excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
- excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
- excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
- // TODO[BEAM-8304]: Support multiple side inputs with different coders.
- excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders'
- excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
- }
-
- // apex runner is run in embedded mode. Increase default HeapSize
- maxHeapSize = '4g'
-}
-
-task validatesRunner {
- group = "Verification"
- description "Validates Apex runner"
- dependsOn validatesRunnerBatch
-}
-
-// Generates :runners:apex:runQuickstartJavaApex
-createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner:'Apex')
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
deleted file mode 100644
index ba3bf53e8db6..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/** Options that configure the Apex pipeline. */
-public interface ApexPipelineOptions extends PipelineOptions {
-
- @Description("set unique application name for Apex runner")
- void setApplicationName(String name);
-
- String getApplicationName();
-
- @Description("execute the pipeline with embedded cluster")
- void setEmbeddedExecution(boolean embedded);
-
- @Default.Boolean(true)
- boolean isEmbeddedExecution();
-
- @Description("configure embedded execution with debug friendly options")
- void setEmbeddedExecutionDebugMode(boolean embeddedDebug);
-
- @Default.Boolean(true)
- boolean isEmbeddedExecutionDebugMode();
-
- @Description("output data received and emitted on ports (for debugging)")
- void setTupleTracingEnabled(boolean enabled);
-
- @Default.Boolean(false)
- boolean isTupleTracingEnabled();
-
- @Description("how long the client should wait for the pipeline to run")
- void setRunMillis(long runMillis);
-
- @Default.Long(0)
- long getRunMillis();
-
- @Description("configuration properties file for the Apex engine")
- void setConfigFile(String name);
-
- @Default.String("classpath:/beam-runners-apex.properties")
- String getConfigFile();
-
- @Description("configure whether to perform ParDo fusion")
- void setParDoFusionEnabled(boolean enabled);
-
- @Default.Boolean(true)
- boolean isParDoFusionEnabled();
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
deleted file mode 100644
index e728460a5ff8..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context.DAGContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.apex.api.EmbeddedAppLauncher;
-import org.apache.apex.api.Launcher;
-import org.apache.apex.api.Launcher.AppHandle;
-import org.apache.apex.api.Launcher.LaunchMode;
-import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.PrimitiveCreate;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.runners.core.construction.SplittableParDo;
-import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
-import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A {@link PipelineRunner} that translates the pipeline to an Apex DAG and executes it on an Apex
- * cluster.
- */
-public class ApexRunner extends PipelineRunner {
-
- private final ApexPipelineOptions options;
- public static final String CLASSPATH_SCHEME = "classpath";
- protected boolean translateOnly = false;
-
- /**
- * TODO: this isn't thread safe and may cause issues when tests run in parallel Holds any most
- * resent assertion error that was raised while processing elements. Used in the unit test driver
- * in embedded mode to propagate the exception.
- */
- public static final AtomicReference ASSERTION_ERROR = new AtomicReference<>();
-
- public ApexRunner(ApexPipelineOptions options) {
- this.options = options;
- }
-
- public static ApexRunner fromOptions(PipelineOptions options) {
- ApexPipelineOptions apexPipelineOptions =
- PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
- return new ApexRunner(apexPipelineOptions);
- }
-
- @SuppressWarnings({"rawtypes"})
- protected List getOverrides() {
- return ImmutableList.builder()
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(Create.Values.class),
- new PrimitiveCreate.Factory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
- new StreamingViewAsIterable.Factory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
- new StreamingViewAsIterable.Factory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
- new StreamingViewAsIterable.Factory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
- new StreamingViewAsIterable.Factory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
- new StreamingWrapSingletonInList.Factory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableParDoMulti(), new SplittableParDo.OverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableProcessKeyedBounded(),
- new SplittableParDoNaiveBounded.OverrideFactory<>()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableProcessKeyedUnbounded(),
- new SplittableParDoViaKeyedWorkItems.OverrideFactory<>()))
- // TODO: [BEAM-5360] Support @RequiresStableInput on Apex runner
- .add(
- PTransformOverride.of(
- PTransformMatchers.requiresStableInputParDoMulti(),
- UnsupportedOverrideFactory.withMessage(
- "Apex runner currently doesn't support @RequiresStableInput annotation.")))
- .build();
- }
-
- @Override
- public ApexRunnerResult run(final Pipeline pipeline) {
- pipeline.replaceAll(getOverrides());
-
- final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
- final AtomicReference apexDAG = new AtomicReference<>();
- final AtomicReference tempDir = new AtomicReference<>();
-
- StreamingApplication apexApp =
- (dag, conf) -> {
- apexDAG.set(dag);
- dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
- if (options.isEmbeddedExecution()) {
- // set unique path for application state to allow for parallel execution of unit tests
- // (the embedded cluster would set it to a fixed location under ./target)
- tempDir.set(Files.createTempDir());
- dag.setAttribute(DAGContext.APPLICATION_PATH, tempDir.get().toURI().toString());
- }
- translator.translate(pipeline, dag);
- };
-
- Properties configProperties = new Properties();
- try {
- if (options.getConfigFile() != null) {
- URI configURL = new URI(options.getConfigFile());
- if (CLASSPATH_SCHEME.equals(configURL.getScheme())) {
- InputStream is = this.getClass().getResourceAsStream(configURL.getPath());
- if (is != null) {
- configProperties.load(is);
- is.close();
- }
- } else {
- if (!configURL.isAbsolute()) {
- // resolve as local file name
- File f = new File(options.getConfigFile());
- configURL = f.toURI();
- }
- try (InputStream is = configURL.toURL().openStream()) {
- configProperties.load(is);
- }
- }
- }
- } catch (IOException | URISyntaxException ex) {
- throw new RuntimeException("Error loading properties", ex);
- }
-
- if (options.isEmbeddedExecution()) {
- EmbeddedAppLauncher> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
- Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
- launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
- if (options.isEmbeddedExecutionDebugMode()) {
- // turns off timeout checking for operator progress
- launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
- }
- Configuration conf = new Configuration(false);
- ApexYarnLauncher.addProperties(conf, configProperties);
- try {
- if (translateOnly) {
- launcher.prepareDAG(apexApp, conf);
- return new ApexRunnerResult(launcher.getDAG(), null);
- }
- ApexRunner.ASSERTION_ERROR.set(null);
- AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes);
- return new ApexRunnerResult(apexDAG.get(), apexAppResult) {
- @Override
- protected void cleanupOnCancelOrFinish() {
- if (tempDir.get() != null) {
- FileUtils.deleteQuietly(tempDir.get());
- }
- }
- };
- } catch (Exception e) {
- Throwables.throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
- } else {
- try {
- ApexYarnLauncher yarnLauncher = new ApexYarnLauncher();
- AppHandle apexAppResult = yarnLauncher.launchApp(apexApp, configProperties);
- return new ApexRunnerResult(apexDAG.get(), apexAppResult);
- } catch (IOException e) {
- throw new RuntimeException("Failed to launch the application on YARN.", e);
- }
- }
- }
-
- ////////////////////////////////////////////
- // Adapted from FlinkRunner for View support
-
- /**
- * Creates a primitive {@link PCollectionView}.
- *
- *
For internal use only by runner implementors.
- *
- * @param The type of the elements of the input PCollection
- * @param The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateApexPCollectionView
- extends PTransform, PCollection> {
- private static final long serialVersionUID = 1L;
- private PCollectionView view;
-
- private CreateApexPCollectionView(PCollectionView view) {
- this.view = view;
- }
-
- public static CreateApexPCollectionView of(
- PCollectionView view) {
- return new CreateApexPCollectionView<>(view);
- }
-
- @Override
- public PCollection expand(PCollection input) {
- return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
- }
-
- public PCollectionView getView() {
- return view;
- }
- }
-
- private static class WrapAsList extends DoFn> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Collections.singletonList(c.element()));
- }
- }
-
- private static class StreamingWrapSingletonInList
- extends PTransform, PCollection> {
- private static final long serialVersionUID = 1L;
- CreatePCollectionView transform;
-
- /** Builds an instance of this class from the overridden transform. */
- private StreamingWrapSingletonInList(CreatePCollectionView transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollection expand(PCollection input) {
- input
- .apply(ParDo.of(new WrapAsList<>()))
- .apply(CreateApexPCollectionView.of(transform.getView()));
- return input;
- }
-
- @Override
- protected String getKindString() {
- return "StreamingWrapSingletonInList";
- }
-
- static class Factory
- extends SingleInputOutputOverrideFactory<
- PCollection, PCollection, CreatePCollectionView> {
- @Override
- public PTransformReplacement, PCollection> getReplacementTransform(
- AppliedPTransform, PCollection, CreatePCollectionView>
- transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- new StreamingWrapSingletonInList<>(transform.getTransform()));
- }
- }
- }
-
- private static class StreamingViewAsIterable
- extends PTransform, PCollection> {
- private static final long serialVersionUID = 1L;
- private final PCollectionView> view;
-
- private StreamingViewAsIterable(PCollectionView> view) {
- this.view = view;
- }
-
- @Override
- public PCollection expand(PCollection input) {
- return ((PCollection)
- input.apply(Combine.globally(new Concatenate()).withoutDefaults()))
- .apply(CreateApexPCollectionView.of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
-
- static class Factory
- extends SingleInputOutputOverrideFactory<
- PCollection, PCollection, CreatePCollectionView>> {
- @Override
- public PTransformReplacement, PCollection> getReplacementTransform(
- AppliedPTransform, PCollection, CreatePCollectionView>>
- transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- new StreamingViewAsIterable<>(transform.getTransform().getView()));
- }
- }
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List} containing all inputs. They
- * require the input {@link PCollection} fits in memory. For a large {@link PCollection} this is
- * expected to crash!
- *
- * @param the type of elements to concatenate.
- */
- private static class Concatenate extends Combine.CombineFn, List> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public List createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List addInput(List accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List mergeAccumulators(Iterable> accumulators) {
- List result = createAccumulator();
- for (List accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List extractOutput(List accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder> getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder> getDefaultOutputCoder(CoderRegistry registry, Coder inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
deleted file mode 100644
index ad89723576ad..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex;
-
-import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the {@link
- * ApexRunner}.
- *
- *
{@link AutoService} will register Apex's implementations of the {@link PipelineRunner} and
- * {@link PipelineOptions} as available pipeline runner services.
- */
-public final class ApexRunnerRegistrar {
- private ApexRunnerRegistrar() {}
-
- /** Registers the {@link ApexRunner}. */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class Runner implements PipelineRunnerRegistrar {
- @Override
- public Iterable>> getPipelineRunners() {
- return ImmutableList.of(ApexRunner.class, TestApexRunner.class);
- }
- }
-
- /** Registers the {@link ApexPipelineOptions}. */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class Options implements PipelineOptionsRegistrar {
- @Override
- public Iterable> getPipelineOptions() {
- return ImmutableList.of(ApexPipelineOptions.class);
- }
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
deleted file mode 100644
index e62a65965401..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex;
-
-import com.datatorrent.api.DAG;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.apex.api.Launcher.AppHandle;
-import org.apache.apex.api.Launcher.ShutdownMode;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.joda.time.Duration;
-
-/** Result of executing a {@link Pipeline} with Apex in embedded mode. */
-public class ApexRunnerResult implements PipelineResult {
- private final DAG apexDAG;
- private final AppHandle apexApp;
- private State state = State.UNKNOWN;
-
- public ApexRunnerResult(DAG dag, AppHandle apexApp) {
- this.apexDAG = dag;
- this.apexApp = apexApp;
- }
-
- @Override
- public State getState() {
- return state;
- }
-
- @Override
- public State cancel() throws IOException {
- apexApp.shutdown(ShutdownMode.KILL);
- cleanupOnCancelOrFinish();
- state = State.CANCELLED;
- return state;
- }
-
- @Override
- @Nullable
- public State waitUntilFinish(@Nullable Duration duration) {
- long timeout =
- (duration == null || duration.getMillis() < 1)
- ? Long.MAX_VALUE
- : System.currentTimeMillis() + duration.getMillis();
- try {
- while (!apexApp.isFinished() && System.currentTimeMillis() < timeout) {
- if (ApexRunner.ASSERTION_ERROR.get() != null) {
- throw ApexRunner.ASSERTION_ERROR.get();
- }
- Thread.sleep(500);
- }
- if (apexApp.isFinished()) {
- cleanupOnCancelOrFinish();
- return State.DONE;
- }
- return null;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public State waitUntilFinish() {
- return waitUntilFinish(null);
- }
-
- @Override
- public MetricResults metrics() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Return the DAG executed by the pipeline.
- *
- * @return DAG from translation.
- */
- public DAG getApexDAG() {
- return apexDAG;
- }
-
- /** Opportunity for a subclass to perform cleanup, such as removing temporary files. */
- protected void cleanupOnCancelOrFinish() {}
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
deleted file mode 100644
index 15e3968914dc..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex;
-
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Attribute.AttributeMap;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.AccessibleObject;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileSystem;
-import java.nio.file.FileSystems;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.jar.JarFile;
-import java.util.jar.Manifest;
-import org.apache.apex.api.EmbeddedAppLauncher;
-import org.apache.apex.api.Launcher;
-import org.apache.apex.api.Launcher.AppHandle;
-import org.apache.apex.api.Launcher.LaunchMode;
-import org.apache.apex.api.Launcher.LauncherException;
-import org.apache.apex.api.Launcher.ShutdownMode;
-import org.apache.apex.api.YarnAppLauncher;
-import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Proxy to launch the YARN application through the hadoop script to run in the pre-configured
- * environment (class path, configuration, native libraries etc.).
- *
- *
The proxy takes the DAG and communicates with the Hadoop services to launch it on the cluster.
- */
-public class ApexYarnLauncher {
- private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class);
-
- public AppHandle launchApp(StreamingApplication app, Properties configProperties)
- throws IOException {
-
- List jarsToShip = getYarnDeployDependencies();
- StringBuilder classpath = new StringBuilder();
- for (File path : jarsToShip) {
- if (path.isDirectory()) {
- File tmpJar = File.createTempFile("beam-runners-apex-", ".jar");
- createJar(path, tmpJar);
- tmpJar.deleteOnExit();
- path = tmpJar;
- }
- if (classpath.length() != 0) {
- classpath.append(':');
- }
- classpath.append(path.getAbsolutePath());
- }
-
- EmbeddedAppLauncher> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
- DAG dag = embeddedLauncher.getDAG();
- app.populateDAG(dag, new Configuration(false));
-
- Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
- launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ','));
- LaunchParams lp = new LaunchParams(dag, launchAttributes, configProperties);
- lp.cmd = "hadoop " + ApexYarnLauncher.class.getName();
- HashMap env = new HashMap<>();
- env.put("HADOOP_USER_CLASSPATH_FIRST", "1");
- env.put("HADOOP_CLASSPATH", classpath.toString());
- lp.env = env;
- return launchApp(lp);
- }
-
- protected AppHandle launchApp(LaunchParams params) throws IOException {
- File tmpFile = File.createTempFile("beam-runner-apex", "params");
- tmpFile.deleteOnExit();
- try (FileOutputStream fos = new FileOutputStream(tmpFile)) {
- SerializationUtils.serialize(params, fos);
- }
- if (params.getCmd() == null) {
- ApexYarnLauncher.main(new String[] {tmpFile.getAbsolutePath()});
- } else {
- String cmd = params.getCmd() + " " + tmpFile.getAbsolutePath();
- ByteArrayOutputStream consoleOutput = new ByteArrayOutputStream();
- LOG.info("Executing: {} with {}", cmd, params.getEnv());
-
- ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
- Map env = pb.environment();
- env.putAll(params.getEnv());
- Process p = pb.start();
- ProcessWatcher pw = new ProcessWatcher(p);
- InputStream output = p.getInputStream();
- InputStream error = p.getErrorStream();
- while (!pw.isFinished()) {
- IOUtils.copy(output, consoleOutput);
- IOUtils.copy(error, consoleOutput);
- }
- if (pw.rc != 0) {
- String msg =
- "The Beam Apex runner in non-embedded mode requires the Hadoop client"
- + " to be installed on the machine from which you launch the job"
- + " and the 'hadoop' script in $PATH";
- LOG.error(msg);
- throw new RuntimeException(
- "Failed to run: "
- + cmd
- + " (exit code "
- + pw.rc
- + ")"
- + "\n"
- + consoleOutput.toString());
- }
- }
- return new AppHandle() {
- @Override
- public boolean isFinished() {
- // TODO (future PR): interaction with child process
- LOG.warn("YARN application runs asynchronously and status check not implemented.");
- return true;
- }
-
- @Override
- public void shutdown(ShutdownMode arg0) throws LauncherException {
- // TODO (future PR): interaction with child process
- throw new UnsupportedOperationException();
- }
- };
- }
-
- /**
- * From the current classpath, find the jar files that need to be deployed with the application to
- * run on YARN. Hadoop dependencies are provided through the Hadoop installation and the
- * application should not bundle them to avoid conflicts. This is done by removing the Hadoop
- * compile dependencies (transitively) by parsing the Maven dependency tree.
- *
- * @return list of jar files to ship
- * @throws IOException when dependency information cannot be read
- */
- public static List getYarnDeployDependencies() throws IOException {
- try (InputStream dependencyTree = ApexRunner.class.getResourceAsStream("dependency-tree")) {
- try (BufferedReader br =
- new BufferedReader(new InputStreamReader(dependencyTree, StandardCharsets.UTF_8))) {
- String line;
- List excludes = new ArrayList<>();
- int excludeLevel = Integer.MAX_VALUE;
- while ((line = br.readLine()) != null) {
- for (int i = 0; i < line.length(); i++) {
- char c = line.charAt(i);
- if (Character.isLetter(c)) {
- if (i > excludeLevel) {
- excludes.add(line.substring(i));
- } else {
- if (line.substring(i).startsWith("org.apache.hadoop")) {
- excludeLevel = i;
- excludes.add(line.substring(i));
- } else {
- excludeLevel = Integer.MAX_VALUE;
- }
- }
- break;
- }
- }
- }
-
- Set excludeJarFileNames = Sets.newHashSet();
- for (String exclude : excludes) {
- List strings = Splitter.on(':').splitToList(exclude);
- String[] mvnc = strings.toArray(new String[strings.size()]);
- String fileName = mvnc[1] + "-";
- if (mvnc.length == 6) {
- fileName += mvnc[4] + "-" + mvnc[3]; // with classifier
- } else {
- fileName += mvnc[3];
- }
- fileName += ".jar";
- excludeJarFileNames.add(fileName);
- }
-
- ClassLoader classLoader = ApexYarnLauncher.class.getClassLoader();
- URL[] urls = ((URLClassLoader) classLoader).getURLs();
- List dependencyJars = new ArrayList<>();
- for (URL url : urls) {
- File f = new File(url.getFile());
- // dependencies can also be directories in the build reactor,
- // the Apex client will automatically create jar files for those.
- if (f.exists() && !excludeJarFileNames.contains(f.getName())) {
- dependencyJars.add(f);
- }
- }
- return dependencyJars;
- }
- }
- }
-
- /**
- * Create a jar file from the given directory.
- *
- * @param dir source directory
- * @param jarFile jar file name
- * @throws IOException when file cannot be created
- */
- public static void createJar(File dir, File jarFile) throws IOException {
-
- final Map env = Collections.singletonMap("create", "true");
- if (jarFile.exists() && !jarFile.delete()) {
- throw new RuntimeException("Failed to remove " + jarFile);
- }
- URI uri = URI.create("jar:" + jarFile.toURI());
- try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env)) {
-
- File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
- Files.createDirectory(zipfs.getPath("META-INF"));
- try (final OutputStream out = Files.newOutputStream(zipfs.getPath(JarFile.MANIFEST_NAME))) {
- if (!manifestFile.exists()) {
- new Manifest().write(out);
- } else {
- Files.copy(manifestFile.toPath(), out);
- }
- }
-
- final Path root = dir.toPath();
- Files.walkFileTree(
- root,
- new java.nio.file.SimpleFileVisitor() {
- String relativePath;
-
- @Override
- public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
- throws IOException {
- relativePath = root.relativize(dir).toString();
- if (!relativePath.isEmpty()) {
- if (!relativePath.endsWith("/")) {
- relativePath += "/";
- }
- if (!"META-INF/".equals(relativePath)) {
- final Path dstDir = zipfs.getPath(relativePath);
- Files.createDirectory(dstDir);
- }
- }
- return super.preVisitDirectory(dir, attrs);
- }
-
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
- throws IOException {
- String name = relativePath + file.getFileName();
- if (!JarFile.MANIFEST_NAME.equals(name)) {
- try (final OutputStream out = Files.newOutputStream(zipfs.getPath(name))) {
- Files.copy(file, out);
- }
- }
- return super.visitFile(file, attrs);
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc)
- throws IOException {
- relativePath = root.relativize(dir.getParent()).toString();
- if (!relativePath.isEmpty() && !relativePath.endsWith("/")) {
- relativePath += "/";
- }
- return super.postVisitDirectory(dir, exc);
- }
- });
- }
- }
-
- /** Transfer the properties to the configuration object. */
- public static void addProperties(Configuration conf, Properties props) {
- for (final String propertyName : props.stringPropertyNames()) {
- String propertyValue = props.getProperty(propertyName);
- conf.set(propertyName, propertyValue);
- }
- }
-
- /**
- * The main method expects the serialized DAG and will launch the YARN application.
- *
- * @param args location of launch parameters
- * @throws IOException when parameters cannot be read
- */
- public static void main(String[] args) throws IOException {
- checkArgument(args.length == 1, "exactly one argument expected");
- File file = new File(args[0]);
- checkArgument(file.exists() && file.isFile(), "invalid file path %s", file);
- final LaunchParams params = SerializationUtils.deserialize(new FileInputStream(file));
- StreamingApplication apexApp = (dag, conf) -> copyShallow(params.dag, dag);
- Configuration conf = new Configuration(); // configuration from Hadoop client
- addProperties(conf, params.configProperties);
- AppHandle appHandle =
- params.getApexLauncher().launchApp(apexApp, conf, params.launchAttributes);
- if (appHandle == null) {
- throw new AssertionError("Launch returns null handle.");
- }
- // TODO (future PR)
- // At this point the application is running, but this process should remain active to
- // allow the parent to implement the runner result.
- }
-
- /** Launch parameters that will be serialized and passed to the child process. */
- @VisibleForTesting
- protected static class LaunchParams implements Serializable {
- private static final long serialVersionUID = 1L;
- private final DAG dag;
- private final Attribute.AttributeMap launchAttributes;
- private final Properties configProperties;
- private HashMap env;
- private String cmd;
-
- protected LaunchParams(DAG dag, AttributeMap launchAttributes, Properties configProperties) {
- this.dag = dag;
- this.launchAttributes = launchAttributes;
- this.configProperties = configProperties;
- }
-
- protected Launcher> getApexLauncher() {
- return Launcher.getLauncher(LaunchMode.YARN);
- }
-
- protected String getCmd() {
- return cmd;
- }
-
- protected Map getEnv() {
- return env;
- }
- }
-
- private static void copyShallow(DAG from, DAG to) {
- checkArgument(
- from.getClass() == to.getClass(),
- "must be same class %s %s",
- from.getClass(),
- to.getClass());
- Field[] fields = from.getClass().getDeclaredFields();
- AccessibleObject.setAccessible(fields, true);
- for (Field field : fields) {
- if (!Modifier.isStatic(field.getModifiers())) {
- try {
- field.set(to, field.get(from));
- } catch (IllegalArgumentException | IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- /** Starts a command and waits for it to complete. */
- public static class ProcessWatcher implements Runnable {
- private final Process p;
- private volatile boolean finished = false;
- private volatile int rc;
-
- public ProcessWatcher(Process p) {
- this.p = p;
- new Thread(this).start();
- }
-
- public boolean isFinished() {
- return finished;
- }
-
- @Override
- public void run() {
- try {
- rc = p.waitFor();
- } catch (Exception e) {
- // ignore
- }
- finished = true;
- }
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
deleted file mode 100644
index c53f48f0ff93..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex;
-
-import com.datatorrent.api.DAG;
-import java.io.IOException;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.joda.time.Duration;
-
-/** Apex {@link PipelineRunner} for testing. */
-public class TestApexRunner extends PipelineRunner {
-
- private static final int RUN_WAIT_MILLIS = 20000;
- private final ApexRunner delegate;
-
- private TestApexRunner(ApexPipelineOptions options) {
- options.setEmbeddedExecution(true);
- // options.setEmbeddedExecutionDebugMode(false);
- this.delegate = ApexRunner.fromOptions(options);
- }
-
- public static TestApexRunner fromOptions(PipelineOptions options) {
- ApexPipelineOptions apexOptions =
- PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
- return new TestApexRunner(apexOptions);
- }
-
- public static DAG translate(Pipeline pipeline, ApexPipelineOptions options) {
- ApexRunner delegate = new ApexRunner(options);
- delegate.translateOnly = true;
- return delegate.run(pipeline).getApexDAG();
- }
-
- @Override
- @SuppressWarnings("Finally")
- public ApexRunnerResult run(Pipeline pipeline) {
- ApexRunnerResult result = delegate.run(pipeline);
- try {
- // this is necessary for tests that just call run() and not waitUntilFinish
- result.waitUntilFinish(Duration.millis(RUN_WAIT_MILLIS));
- return result;
- } finally {
- try {
- result.cancel();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
deleted file mode 100644
index cbbea17abc94..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Implementation of the Beam runner for Apache Apex. */
-package org.apache.beam.runners.apex;
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
deleted file mode 100644
index 4cc33f4b838f..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation;
-
-import com.datatorrent.api.DAG;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
-import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator;
-import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PrimitiveCreate;
-import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link ApexPipelineTranslator} translates {@link Pipeline} objects into Apex logical plan {@link
- * DAG}.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
- private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
-
- /**
- * A map from {@link PTransform} subclass to the corresponding {@link TransformTranslator} to use
- * to translate that transform.
- */
- private static final Map, TransformTranslator> transformTranslators =
- new HashMap<>();
-
- private final TranslationContext translationContext;
-
- static {
- // register TransformTranslators
- registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>());
- registerTransformTranslator(
- SplittableParDoViaKeyedWorkItems.ProcessElements.class,
- new ParDoTranslator.SplittableProcessElementsTranslator());
- registerTransformTranslator(GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator());
- registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
- registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
- registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
- registerTransformTranslator(Flatten.PCollections.class, new FlattenPCollectionTranslator());
- registerTransformTranslator(PrimitiveCreate.class, new CreateValuesTranslator());
- registerTransformTranslator(
- CreateApexPCollectionView.class, new CreateApexPCollectionViewTranslator());
- registerTransformTranslator(CreatePCollectionView.class, new CreatePCollectionViewTranslator());
- registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
- }
-
- public ApexPipelineTranslator(ApexPipelineOptions options) {
- this.translationContext = new TranslationContext(options);
- }
-
- public void translate(Pipeline pipeline, DAG dag) {
- pipeline.traverseTopologically(this);
- translationContext.populateDAG(dag);
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- LOG.debug("entering composite transform {}", node.getTransform());
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- LOG.debug("leaving composite transform {}", node.getTransform());
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- LOG.debug("visiting transform {}", node.getTransform());
- PTransform transform = node.getTransform();
- TransformTranslator translator = getTransformTranslator(transform.getClass());
- if (null == translator) {
- throw new UnsupportedOperationException("no translator registered for " + transform);
- }
- translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
- translator.translate(transform, translationContext);
- }
-
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {
- LOG.debug("visiting value {}", value);
- }
-
- /**
- * Records that instances of the specified PTransform class should be translated by default by the
- * corresponding {@link TransformTranslator}.
- */
- private static void registerTransformTranslator(
- Class transformClass,
- TransformTranslator extends TransformT> transformTranslator) {
- if (transformTranslators.put(transformClass, transformTranslator) != null) {
- throw new IllegalArgumentException("defining multiple translators for " + transformClass);
- }
- }
-
- /**
- * Returns the {@link TransformTranslator} to use for instances of the specified PTransform class,
- * or null if none registered.
- */
- private >
- TransformTranslator getTransformTranslator(Class transformClass) {
- return transformTranslators.get(transformClass);
- }
-
- private static class ReadBoundedTranslator implements TransformTranslator> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(Read.Bounded transform, TranslationContext context) {
- // TODO: adapter is visibleForTesting
- BoundedToUnboundedSourceAdapter unboundedSource =
- new BoundedToUnboundedSourceAdapter<>(transform.getSource());
- ApexReadUnboundedInputOperator operator =
- new ApexReadUnboundedInputOperator<>(unboundedSource, true, context.getPipelineOptions());
- context.addOperator(operator, operator.output);
- }
- }
-
- private static class CreateApexPCollectionViewTranslator
- implements TransformTranslator> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(
- CreateApexPCollectionView transform, TranslationContext context) {
- context.addView(transform.getView());
- LOG.debug("view {}", transform.getView().getName());
- }
- }
-
- private static class CreatePCollectionViewTranslator
- implements TransformTranslator> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(
- CreatePCollectionView transform, TranslationContext context) {
- context.addView(transform.getView());
- LOG.debug("view {}", transform.getView().getName());
- }
- }
-
- private static class GBKIntoKeyedWorkItemsTranslator
- implements TransformTranslator> {
-
- @Override
- public void translate(GBKIntoKeyedWorkItems transform, TranslationContext context) {
- // https://issues.apache.org/jira/browse/BEAM-1850
- ApexProcessFnOperator> operator =
- ApexProcessFnOperator.toKeyedWorkItems(context.getPipelineOptions());
- context.addOperator(operator, operator.outputPort);
- context.addStream(context.getInput(), operator.inputPort);
- }
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
deleted file mode 100644
index 025f3b17ad43..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation;
-
-import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translation.utils.ValuesSource;
-import org.apache.beam.runners.core.construction.PrimitiveCreate;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.values.PCollection;
-
-/** Wraps elements from Create.Values into an {@link UnboundedSource}. mainly used for testing */
-class CreateValuesTranslator implements TransformTranslator> {
- private static final long serialVersionUID = 1451000241832745629L;
-
- @Override
- public void translate(PrimitiveCreate transform, TranslationContext context) {
- UnboundedSource unboundedSource =
- new ValuesSource<>(
- transform.getElements(), ((PCollection) context.getOutput()).getCoder());
- ApexReadUnboundedInputOperator operator =
- new ApexReadUnboundedInputOperator<>(unboundedSource, context.getPipelineOptions());
- context.addOperator(operator, operator.output);
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
deleted file mode 100644
index ff22e9d696aa..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation;
-
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.apex.translation.operators.ApexFlattenOperator;
-import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translation.utils.ValuesSource;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-
-/** {@link Flatten.PCollections} translation to Apex operator. */
-class FlattenPCollectionTranslator implements TransformTranslator> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(Flatten.PCollections transform, TranslationContext context) {
- List> inputCollections = extractPCollections(context.getInputs());
-
- if (inputCollections.isEmpty()) {
- // create a dummy source that never emits anything
- @SuppressWarnings("unchecked")
- UnboundedSource unboundedSource =
- new ValuesSource<>(Collections.EMPTY_LIST, VoidCoder.of());
- ApexReadUnboundedInputOperator operator =
- new ApexReadUnboundedInputOperator<>(unboundedSource, context.getPipelineOptions());
- context.addOperator(operator, operator.output);
- } else if (inputCollections.size() == 1) {
- context.addAlias(context.getOutput(), inputCollections.get(0));
- } else {
- @SuppressWarnings("unchecked")
- PCollection output = (PCollection) context.getOutput();
- Map, Integer> unionTags = Collections.emptyMap();
- flattenCollections(inputCollections, unionTags, output, context);
- }
- }
-
- private List> extractPCollections(Map, PValue> inputs) {
- List> collections = Lists.newArrayList();
- for (PValue pv : inputs.values()) {
- checkArgument(
- pv instanceof PCollection,
- "Non-PCollection provided as input to flatten: %s of type %s",
- pv,
- pv.getClass().getSimpleName());
- collections.add((PCollection) pv);
- }
- return collections;
- }
-
- /**
- * Flatten the given collections into the given result collection. Translates into a cascading
- * merge with 2 input ports per operator. The optional union tags can be used to identify the
- * source in the result stream, used to channel multiple side inputs to a single Apex operator
- * port.
- *
- * @param collections
- * @param unionTags
- * @param finalCollection
- * @param context
- */
- static void flattenCollections(
- List> collections,
- Map, Integer> unionTags,
- PCollection finalCollection,
- TranslationContext context) {
- List> remainingCollections = Lists.newArrayList();
- PCollection firstCollection = null;
- while (!collections.isEmpty()) {
- for (PCollection collection : collections) {
- if (null == firstCollection) {
- firstCollection = collection;
- } else {
- ApexFlattenOperator operator = new ApexFlattenOperator<>();
- context.addStream(firstCollection, operator.data1);
- Integer unionTag = unionTags.get(firstCollection);
- operator.data1Tag = (unionTag != null) ? unionTag : 0;
- context.addStream(collection, operator.data2);
- unionTag = unionTags.get(collection);
- operator.data2Tag = (unionTag != null) ? unionTag : 0;
-
- if (!collection.getCoder().equals(firstCollection.getCoder())) {
- throw new UnsupportedOperationException("coders don't match");
- }
-
- if (collections.size() > 2) {
- PCollection intermediateCollection =
- PCollection.createPrimitiveOutputInternal(
- collection.getPipeline(),
- collection.getWindowingStrategy(),
- collection.isBounded(),
- collection.getCoder());
- context.addOperator(operator, operator.out, intermediateCollection);
- remainingCollections.add(intermediateCollection);
- } else {
- // final stream merge
- context.addOperator(operator, operator.out, finalCollection);
- }
- firstCollection = null;
- }
- }
- if (firstCollection != null) {
- // push to next merge level
- remainingCollections.add(firstCollection);
- firstCollection = null;
- }
- if (remainingCollections.size() > 1) {
- collections = remainingCollections;
- remainingCollections = Lists.newArrayList();
- } else {
- collections = Lists.newArrayList();
- }
- }
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
deleted file mode 100644
index 2dd96f0503a5..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation;
-
-import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/** {@link GroupByKey} translation to Apex operator. */
-class GroupByKeyTranslator implements TransformTranslator> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void translate(GroupByKey transform, TranslationContext context) {
- PCollection> input = context.getInput();
- ApexGroupByKeyOperator group =
- new ApexGroupByKeyOperator<>(
- context.getPipelineOptions(), input, context.getStateBackend());
- context.addOperator(group, group.output);
- context.addStream(input, group.input);
- }
-}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
deleted file mode 100644
index 7144a0eb3454..000000000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation;
-
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.OutputPort;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
-import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link ParDo.MultiOutput} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-class ParDoTranslator
- implements TransformTranslator> {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslator.class);
-
- @Override
- public void translate(ParDo.MultiOutput transform, TranslationContext context) {
- DoFn doFn = transform.getFn();
-
- if (DoFnSignatures.isSplittable(doFn)) {
- throw new UnsupportedOperationException(
- String.format(
- "%s does not support splittable DoFn: %s", ApexRunner.class.getSimpleName(), doFn));
- }
- if (DoFnSignatures.requiresTimeSortedInput(doFn)) {
- throw new UnsupportedOperationException(
- String.format(
- "%s doesn't currently support @RequiresTimeSortedInput",
- ApexRunner.class.getSimpleName()));
- }
- if (DoFnSignatures.usesTimers(doFn)) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
- DoFn.TimerId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- ApexRunner.class.getSimpleName()));
- }
-
- Map, PValue> outputs = context.getOutputs();
- PCollection input = context.getInput();
- Iterable> sideInputs = transform.getSideInputs().values();
-
- DoFnSchemaInformation doFnSchemaInformation;
- doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
-
- Map> sideInputMapping =
- ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
-
- Map, Coder>> outputCoders =
- outputs.entrySet().stream()
- .filter(e -> e.getValue() instanceof PCollection)
- .collect(
- Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
- ApexParDoOperator operator =
- new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn,
- transform.getMainOutputTag(),
- transform.getAdditionalOutputTags().getAll(),
- input.getWindowingStrategy(),
- sideInputs,
- input.getCoder(),
- outputCoders,
- doFnSchemaInformation,
- sideInputMapping,
- context.getStateBackend());
-
- Map, OutputPort>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
- for (Entry, PValue> output : outputs.entrySet()) {
- checkArgument(
- output.getValue() instanceof PCollection,
- "%s %s outputs non-PCollection %s of type %s",
- ParDo.MultiOutput.class.getSimpleName(),
- context.getFullName(),
- output.getValue(),
- output.getValue().getClass().getSimpleName());
- PCollection> pc = (PCollection>) output.getValue();
- if (output.getKey().equals(transform.getMainOutputTag())) {
- ports.put(pc, operator.output);
- } else {
- int portIndex = 0;
- for (TupleTag> tag : transform.getAdditionalOutputTags().getAll()) {
- if (tag.equals(output.getKey())) {
- ports.put(pc, operator.additionalOutputPorts[portIndex]);
- break;
- }
- portIndex++;
- }
- }
- }
- context.addOperator(operator, ports);
- context.addStream(context.getInput(), operator.input);
- if (!Iterables.isEmpty(sideInputs)) {
- addSideInputs(operator.sideInput1, sideInputs, context);
- }
- }
-
- static class SplittableProcessElementsTranslator<
- InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
- implements TransformTranslator<
- ProcessElements> {
-
- @Override
- public void translate(
- ProcessElements
- transform,
- TranslationContext context) {
-
- Map, PValue> outputs = context.getOutputs();
- PCollection input = context.getInput();
- Iterable> sideInputs = transform.getSideInputs();
-
- Map, Coder>> outputCoders =
- outputs.entrySet().stream()
- .filter(e -> e.getValue() instanceof PCollection)
- .collect(
- Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- DoFn doFn = (DoFn) transform.newProcessFn(transform.getFn());
- ApexParDoOperator operator =
- new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn,
- transform.getMainOutputTag(),
- transform.getAdditionalOutputTags().getAll(),
- input.getWindowingStrategy(),
- sideInputs,
- input.getCoder(),
- outputCoders,
- DoFnSchemaInformation.create(),
- Collections.emptyMap(),
- context.getStateBackend());
-
- Map, OutputPort>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
- for (Entry, PValue> output : outputs.entrySet()) {
- checkArgument(
- output.getValue() instanceof PCollection,
- "%s %s outputs non-PCollection %s of type %s",
- ParDo.MultiOutput.class.getSimpleName(),
- context.getFullName(),
- output.getValue(),
- output.getValue().getClass().getSimpleName());
- PCollection> pc = (PCollection>) output.getValue();
- if (output.getKey().equals(transform.getMainOutputTag())) {
- ports.put(pc, operator.output);
- } else {
- int portIndex = 0;
- for (TupleTag> tag : transform.getAdditionalOutputTags().getAll()) {
- if (tag.equals(output.getKey())) {
- ports.put(pc, operator.additionalOutputPorts[portIndex]);
- break;
- }
- portIndex++;
- }
- }
- }
-
- context.addOperator(operator, ports);
- context.addStream(context.getInput(), operator.input);
- if (!Iterables.isEmpty(sideInputs)) {
- addSideInputs(operator.sideInput1, sideInputs, context);
- }
- }
- }
-
- static void addSideInputs(
- Operator.InputPort> sideInputPort,
- Iterable> sideInputs,
- TranslationContext context) {
- Operator.InputPort>[] sideInputPorts = {sideInputPort};
- if (Iterables.size(sideInputs) > sideInputPorts.length) {
- PCollection> unionCollection = unionSideInputs(sideInputs, context);
- context.addStream(unionCollection, sideInputPorts[0]);
- } else {
- // the number of ports for side inputs is fixed and each port can only take one input.
- for (int i = 0; i < Iterables.size(sideInputs); i++) {
- context.addStream(context.getViewInput(Iterables.get(sideInputs, i)), sideInputPorts[i]);
- }
- }
- }
-
- private static PCollection> unionSideInputs(
- Iterable> sideInputs, TranslationContext context) {
- checkArgument(Iterables.size(sideInputs) > 1, "requires multiple side inputs");
- // flatten and assign union tag
- List> sourceCollections = new ArrayList<>();
- Map, Integer> unionTags = new HashMap<>();
- PCollection