From 7efd5940bdf2167a2dec4ce337a138a3c0c9dd6e Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Tue, 7 Jun 2022 15:02:01 -0700 Subject: [PATCH 1/3] [BEAM-14506] Adding testcases and examples for xlang Python RunInference --- .../beam/gradle/BeamModulePlugin.groovy | 9 +- .../google-cloud-dataflow-java/build.gradle | 1 + .../python/transforms/RunInference.java | 102 ++++++++++++++++++ .../transforms/RunInferenceTransformTest.java | 63 +++++++++++ .../portability/expansion_service_test.py | 30 +++++- 5 files changed, 198 insertions(+), 7 deletions(-) create mode 100644 sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java create mode 100644 sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index a0185939ebc1..2c97407cf7e8 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -351,6 +351,8 @@ class BeamModulePlugin implements Plugin { Integer numParallelTests = 1 // Whether the pipeline needs --sdk_location option boolean needsSdkLocation = false + // semi_persist_dir for SDK containers + String semiPersistDir = "/tmp" // classpath for running tests. FileCollection classpath } @@ -2309,8 +2311,10 @@ class BeamModulePlugin implements Plugin { throw new GradleException("unsupported java version.") } def setupTask = project.tasks.register(config.name+"Setup", Exec) { - dependsOn ':sdks:java:container:'+javaContainerSuffix+':docker' - dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker' + if (!project.hasProperty('skipContainerBuilds')) { + dependsOn ':sdks:java:container:'+javaContainerSuffix+':docker' + dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker' + } dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar' dependsOn ":sdks:python:installGcpTest" // setup test env @@ -2347,6 +2351,7 @@ class BeamModulePlugin implements Plugin { systemProperty "beamTestPipelineOptions", JsonOutput.toJson(config.javaPipelineOptions) systemProperty "expansionJar", expansionJar systemProperty "expansionPort", port + systemProperty "semiPersistDir", config.semiPersistDir classpath = config.classpath + project.files( project.project(":runners:core-construction-java").sourceSets.test.runtimeClasspath, project.project(":sdks:java:extensions:python").sourceSets.test.runtimeClasspath diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 84925cbd2bf2..9669a28cc2e9 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -413,6 +413,7 @@ createCrossLanguageValidatesRunnerTask( classpath: configurations.validatesRunner, numParallelTests: Integer.MAX_VALUE, needsSdkLocation: true, + semiPersistDir: "/var/opt/google", pythonPipelineOptions: [ "--runner=TestDataflowRunner", "--project=${dataflowProject}", diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java new file mode 100644 index 000000000000..2bae5b2c322a --- /dev/null +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.python.transforms; + +import java.util.Map; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.extensions.python.PythonExternalTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.PythonCallableSource; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Wrapper for invoking external Python RunInference. */ +public class RunInference extends PTransform, PCollection> { + private final String modelLoader; + private final Schema schema; + private final Map kwargs; + private final String expansionService; + + /** + * Instantiates a multi-language wrapper for a Python RunInference with a given model loader. + * + * @param modelLoader A Python lambda function for a model loader class object. + * @param exampleType A schema field type for the example column in output rows. + * @param inferenceType A schema field type for the inference column in output rows. + * @return A {@link RunInference} for the given model loader. + */ + public static RunInference of( + String modelLoader, Schema.FieldType exampleType, Schema.FieldType inferenceType) { + Schema schema = + Schema.of( + Schema.Field.of("example", exampleType), Schema.Field.of("inference", inferenceType)); + return new RunInference(modelLoader, schema, ImmutableMap.of(), ""); + } + + /** + * Instantiates a multi-language wrapper for a Python RunInference with a given model loader. + * + * @param modelLoader A Python lambda function for a model loader class object. + * @param schema A schema for output rows. + * @return A {@link RunInference} for the given model loader. + */ + public static RunInference of(String modelLoader, Schema schema) { + return new RunInference(modelLoader, schema, ImmutableMap.of(), ""); + } + + /** + * Sets keyword arguments for RunInference constructor. + * + * @return A {@link RunInference} with keyword arguments. + */ + public RunInference withKwarg(String key, Object arg) { + ImmutableMap.Builder builder = + ImmutableMap.builder().putAll(kwargs).put(key, arg); + return new RunInference(modelLoader, schema, builder.build(), expansionService); + } + + /** + * Sets an expansion service endpoint for RunInference. + * + * @param expansionService A URL for a Python expansion service. + * @return A {@link RunInference} for the given expansion service endpoint. + */ + public RunInference withExpansionService(String expansionService) { + return new RunInference(modelLoader, schema, kwargs, expansionService); + } + + private RunInference( + String modelLoader, Schema schema, Map kwargs, String expansionService) { + this.modelLoader = modelLoader; + this.schema = schema; + this.kwargs = kwargs; + this.expansionService = expansionService; + } + + @Override + public PCollection expand(PCollection input) { + return input.apply( + PythonExternalTransform., PCollection>from( + "apache_beam.ml.inference.base.RunInference.create", expansionService) + .withKwarg("model_handler_provider", PythonCallableSource.of(modelLoader)) + .withKwargs(kwargs) + .withOutputCoder(RowCoder.of(schema))); + } +} diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java new file mode 100644 index 000000000000..7c10216e6dd7 --- /dev/null +++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.python.transforms; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.beam.runners.core.construction.BaseExternalTest; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.UsesPythonExpansionService; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RunInferenceTransformTest extends BaseExternalTest { + @Test + @Category({ValidatesRunner.class, UsesPythonExpansionService.class}) + public void testRunInference() { + String stagingLocation = + Optional.ofNullable(System.getProperty("semiPersistDir")).orElse("/tmp"); + Schema schema = + Schema.of( + Schema.Field.of("example", Schema.FieldType.array(Schema.FieldType.INT64)), + Schema.Field.of("inference", Schema.FieldType.INT32)); + Row row0 = Row.withSchema(schema).addArray(0L, 0L).addValue(0).build(); + Row row1 = Row.withSchema(schema).addArray(1L, 1L).addValue(1).build(); + PCollection col = + testPipeline + .apply(Create.>of(Arrays.asList(0L, 0L), Arrays.asList(1L, 1L))) + .setCoder(IterableCoder.of(VarLongCoder.of())) + .apply( + RunInference.of( + "apache_beam.ml.inference.sklearn_inference.SklearnModelHandlerNumpy", + schema) + .withKwarg( + "model_uri", String.format("%s/staged/sklearn_model", stagingLocation)) + .withExpansionService(expansionAddr)); + PAssert.that(col).containsInAnyOrder(row0, row1); + } +} diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_test.py b/sdks/python/apache_beam/runners/portability/expansion_service_test.py index 1af86898aae5..e99a7aa90c7c 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_test.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_test.py @@ -18,6 +18,7 @@ import argparse import logging +import pickle import signal import sys import typing @@ -33,6 +34,7 @@ from apache_beam.portability.api import external_transforms_pb2 from apache_beam.runners.portability import artifact_service from apache_beam.runners.portability import expansion_service +from apache_beam.runners.portability.stager import Stager from apache_beam.transforms import fully_qualified_named_transform from apache_beam.transforms import ptransform from apache_beam.transforms.environments import PyPIArtifactRegistry @@ -347,6 +349,24 @@ def parse_string_payload(input_byte): return RowCoder(payload.schema).decode(payload.payload)._asdict() +def create_test_sklearn_model(file_name): + from sklearn import svm + x = [[0, 0], [1, 1]] + y = [0, 1] + model = svm.SVC() + model.fit(x, y) + with open(file_name, 'wb') as file: + pickle.dump(model, file) + + +def update_sklearn_model_dependency(env): + model_file = "/tmp/sklearn_test_model" + staged_name = "sklearn_model" + create_test_sklearn_model(model_file) + env._artifacts.append( + Stager._create_file_stage_to_artifact(model_file, staged_name)) + + server = None @@ -367,12 +387,12 @@ def main(unused_argv): with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter( options.fully_qualified_name_glob): server = grpc.server(thread_pool_executor.shared_unbounded_instance()) + expansion_servicer = expansion_service.ExpansionServiceServicer( + PipelineOptions( + ["--experiments", "beam_fn_api", "--sdk_location", "container"])) + update_sklearn_model_dependency(expansion_servicer._default_environment) beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server( - expansion_service.ExpansionServiceServicer( - PipelineOptions( - ["--experiments", "beam_fn_api", "--sdk_location", - "container"])), - server) + expansion_servicer, server) beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( artifact_service.ArtifactRetrievalService( artifact_service.BeamFilesystemHandler(None).file_reader), From d5cec82adbefc886e9569533118733fee2084b9f Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Mon, 11 Jul 2022 17:33:15 -0700 Subject: [PATCH 2/3] update --- .../sdk/extensions/python/transforms/RunInference.java | 8 ++++---- .../python/transforms/RunInferenceTransformTest.java | 5 +++++ sdks/python/apache_beam/ml/inference/base.py | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java index 2bae5b2c322a..209c7061d23d 100644 --- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java +++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java @@ -37,7 +37,7 @@ public class RunInference extends PTransform, PCollection> { /** * Instantiates a multi-language wrapper for a Python RunInference with a given model loader. * - * @param modelLoader A Python lambda function for a model loader class object. + * @param modelLoader A Python callable for a model loader class object. * @param exampleType A schema field type for the example column in output rows. * @param inferenceType A schema field type for the inference column in output rows. * @return A {@link RunInference} for the given model loader. @@ -53,7 +53,7 @@ public static RunInference of( /** * Instantiates a multi-language wrapper for a Python RunInference with a given model loader. * - * @param modelLoader A Python lambda function for a model loader class object. + * @param modelLoader A Python callable for a model loader class object. * @param schema A schema for output rows. * @return A {@link RunInference} for the given model loader. */ @@ -62,7 +62,7 @@ public static RunInference of(String modelLoader, Schema schema) { } /** - * Sets keyword arguments for RunInference constructor. + * Sets keyword arguments for the model loader. * * @return A {@link RunInference} with keyword arguments. */ @@ -94,7 +94,7 @@ private RunInference( public PCollection expand(PCollection input) { return input.apply( PythonExternalTransform., PCollection>from( - "apache_beam.ml.inference.base.RunInference.create", expansionService) + "apache_beam.ml.inference.base.RunInference.from_callable", expansionService) .withKwarg("model_handler_provider", PythonCallableSource.of(modelLoader)) .withKwargs(kwargs) .withOutputCoder(RowCoder.of(schema))); diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java index 7c10216e6dd7..2e875de347ef 100644 --- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java +++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/RunInferenceTransformTest.java @@ -56,6 +56,11 @@ public void testRunInference() { "apache_beam.ml.inference.sklearn_inference.SklearnModelHandlerNumpy", schema) .withKwarg( + // The test expansion service creates the test model and saves it to the + // returning external environment as a dependency. + // (sdks/python/apache_beam/runners/portability/expansion_service_test.py) + // The dependencies for Python SDK harness are supposed to be staged to + // $SEMI_PERSIST_DIR/staged directory. "model_uri", String.format("%s/staged/sklearn_model", stagingLocation)) .withExpansionService(expansionAddr)); PAssert.that(col).containsInAnyOrder(row0, row1); diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index 9f1068eebd1a..8ca775602877 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -272,7 +272,7 @@ def __init__( # TODO(BEAM-14046): Add and link to help documentation. @classmethod - def create(cls, model_handler_provider, **kwargs): + def from_callable(cls, model_handler_provider, **kwargs): """Multi-language friendly constructor. This constructor can be used with fully_qualified_named_transform to From 1dc015fdf45c7a8587cb791885e99a18be5f0562 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Mon, 11 Jul 2022 17:45:29 -0700 Subject: [PATCH 3/3] update --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 2c97407cf7e8..2edd47621bd6 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -2311,10 +2311,8 @@ class BeamModulePlugin implements Plugin { throw new GradleException("unsupported java version.") } def setupTask = project.tasks.register(config.name+"Setup", Exec) { - if (!project.hasProperty('skipContainerBuilds')) { - dependsOn ':sdks:java:container:'+javaContainerSuffix+':docker' - dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker' - } + dependsOn ':sdks:java:container:'+javaContainerSuffix+':docker' + dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker' dependsOn ':sdks:java:testing:expansion-service:buildTestExpansionServiceJar' dependsOn ":sdks:python:installGcpTest" // setup test env