Skip to content

Commit

Permalink
[FLINK-29014][streaming-java][table] Improve end-to-end story about P…
Browse files Browse the repository at this point in the history
…ipelinesOptions.JARS

This closes apache#20633.
  • Loading branch information
twalthr committed Aug 23, 2022
1 parent 56ac1dc commit eb69b11
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,33 +155,5 @@ public static Set<ConfigOption<?>> getAllConfigOptions(Class<?> configOptionsCla
return options;
}

/**
* Merge a {@link Collection} of values of type {@code T} and option {@link ConfigOption} value
* of type {@link List} of type {@code T} from current {@link Configuration}, then put it to
* {@link Configuration}.
*
* @param configuration the configuration object to get the value out and write
* @param key the {@link ConfigOption option} to serve as the key for the list in the
* configuration
* @param values the collection of values to merge as value for the {@code key}
* @param decodeMapper the decode transformation function.
* @param encodeMapper the encode transformation function.
*/
public static <T, E extends Throwable> void mergeCollectionsToConfig(
final Configuration configuration,
final ConfigOption<List<T>> key,
final Collection<T> values,
final FunctionWithException<T, T, E> decodeMapper,
final Function<T, T> encodeMapper)
throws E {
// decode option value from current configuration
Set<T> valueInConfig =
new HashSet<>(decodeListFromConfig(configuration, key, decodeMapper));
// merge provided option value
valueInConfig.addAll(values);
// encode the merged value to current WritableConfig
encodeCollectionToConfig(configuration, key, valueInConfig, encodeMapper);
}

private ConfigUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
Expand Down Expand Up @@ -1040,18 +1039,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
flag));

// merge PipelineOptions.JARS, user maybe set this option in high level such as table
// module, so here need to merge the jars from both configuration object
configuration
.getOptional(PipelineOptions.JARS)
.ifPresent(
jars ->
ConfigUtils.mergeCollectionsToConfig(
this.configuration,
PipelineOptions.JARS,
Collections.unmodifiableCollection(jars),
String::toString,
String::toString));
.ifPresent(jars -> this.configuration.set(PipelineOptions.JARS, jars));

config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,27 +240,6 @@ public void testLocalEnvironmentExplicitParallelism() {
assertThat(env.getStateBackend(), instanceOf(MemoryStateBackend.class));
}

@Test
public void testMergePipelineJarsWithConfiguration() {
Configuration configuration = new Configuration();
configuration.set(PipelineOptions.JARS, Arrays.asList("/tmp/test1.jar", "/tmp/test2.jar"));
StreamExecutionEnvironment envFromConfiguration =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);

// user configuration with different jars
Configuration userConfiguration = new Configuration();
userConfiguration.set(
PipelineOptions.JARS, Arrays.asList("/tmp/test2.jar", "/tmp/test3.jar"));

// test pipeline.jars merge
envFromConfiguration.configure(
userConfiguration, Thread.currentThread().getContextClassLoader());

assertEquals(
envFromConfiguration.getConfiguration().get(PipelineOptions.JARS),
Arrays.asList("/tmp/test1.jar", "/tmp/test2.jar", "/tmp/test3.jar"));
}

/** JobSubmitted counter listener for unit test. */
public static class BasicJobSubmittedCounter implements JobListener {
private int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ protected <T> DataStream<T> toStreamInternal(Table table, ModifyOperation modify
final Transformation<T> transformation = getTransformation(table, transformations);
executionEnvironment.addOperator(transformation);

resourceManager.addJarConfiguration(tableConfig);

// Reconfigure whenever planner transformations are added
// We pass only the configuration to avoid reconfiguration with the rootConfiguration
executionEnvironment.configure(tableConfig.getConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -179,7 +177,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand All @@ -199,7 +196,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
private static final boolean IS_STREAM_TABLE = true;
private final CatalogManager catalogManager;
private final ModuleManager moduleManager;
private final ResourceManager resourceManager;
protected final ResourceManager resourceManager;
private final OperationTreeBuilder operationTreeBuilder;

protected final TableConfig tableConfig;
Expand Down Expand Up @@ -829,9 +826,7 @@ private TableResultInternal executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);

// Merge user jars to table configuration
mergePipelineJarsToConfig(
resourceManager.getLocalJarResources(), tableConfig.getConfiguration());
resourceManager.addJarConfiguration(tableConfig);

// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Pipeline pipeline =
Expand Down Expand Up @@ -865,9 +860,7 @@ private TableResultInternal executeQueryOperation(QueryOperation operation) {
translate(Collections.singletonList(sinkOperation));
final String defaultJobName = "collect";

// Merge user jars to table configuration
mergePipelineJarsToConfig(
resourceManager.getLocalJarResources(), tableConfig.getConfiguration());
resourceManager.addJarConfiguration(tableConfig);

// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Pipeline pipeline =
Expand Down Expand Up @@ -1928,15 +1921,6 @@ private TableResultInternal dropSystemFunction(DropTempSystemFunctionOperation o
}
}

private void mergePipelineJarsToConfig(Set<URL> jarUrls, Configuration configuration) {
ConfigUtils.mergeCollectionsToConfig(
configuration,
PipelineOptions.JARS,
jarUrls.stream().map(URL::toString).collect(Collectors.toSet()),
String::toString,
String::toString);
}

@VisibleForTesting
public TableImpl createTable(QueryOperation tableOperation) {
return TableImpl.createTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.util.ExceptionUtils;
Expand All @@ -42,8 +44,10 @@
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -152,6 +156,26 @@ public Set<URL> getLocalJarResources() {
.collect(Collectors.toSet());
}

/**
* Adds the local jar resources to the given {@link TableConfig}. It implicitly considers the
* {@link TableConfig#getRootConfiguration()} and stores the merged result into {@link
* TableConfig#getConfiguration()}.
*/
public void addJarConfiguration(TableConfig tableConfig) {
final List<String> jars =
getLocalJarResources().stream().map(URL::toString).collect(Collectors.toList());
if (jars.isEmpty()) {
return;
}
final Set<String> jarFiles =
tableConfig
.getOptional(PipelineOptions.JARS)
.map(LinkedHashSet::new)
.orElseGet(LinkedHashSet::new);
jarFiles.addAll(jars);
tableConfig.set(PipelineOptions.JARS, new ArrayList<>(jarFiles));
}

@Override
public void close() throws IOException {
resourceInfos.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
Expand All @@ -44,6 +45,7 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
Expand All @@ -67,9 +69,11 @@
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.UserClassLoaderJarTestUtils;

import org.assertj.core.api.recursive.comparison.RecursiveComparisonConfiguration;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -101,6 +105,10 @@
import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.sourceWatermark;
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for connecting to the {@link DataStream} API. */
Expand All @@ -121,8 +129,35 @@ public static ObjectReuse[] objectReuse() {

@Parameter public ObjectReuse objectReuse;

private static String udfClassName1;
private static String jarPath1;
private static String udfClassName2;
private static String jarPath2;

@BeforeClass
public static void beforeClass() throws IOException {
udfClassName1 = GENERATED_LOWER_UDF_CLASS;
jarPath1 =
UserClassLoaderJarTestUtils.createJarFile(
TEMPORARY_FOLDER.newFolder("test-jar1"),
"test-classloader-udf1.jar",
udfClassName1,
String.format(GENERATED_LOWER_UDF_CODE, udfClassName1))
.toURI()
.toString();
udfClassName2 = GENERATED_UPPER_UDF_CLASS;
jarPath2 =
UserClassLoaderJarTestUtils.createJarFile(
TEMPORARY_FOLDER.newFolder("test-jar2"),
"test-classloader-udf2.jar",
udfClassName2,
String.format(GENERATED_UPPER_UDF_CODE, udfClassName2))
.toURI()
.toString();
}

@Before
public void before() {
public void before() throws IOException {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(4);
Expand All @@ -131,6 +166,9 @@ public void before() {
} else if (objectReuse == ObjectReuse.DISABLED) {
env.getConfig().disableObjectReuse();
}
final Configuration defaultConfig = new Configuration();
defaultConfig.set(PipelineOptions.JARS, Collections.emptyList());
env.configure(defaultConfig);
}

@Test
Expand Down Expand Up @@ -764,10 +802,64 @@ public void testMultiChangelogStreamUpsert() throws Exception {
resultStream, 0, Row.of(2, null, null, null), Row.of(1, 11.0, "1", "A"));
}

@Test
public void testResourcePropagation() throws Exception {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
assertStreamJarsOf(0);
assertTableJarsOf(tableEnv, 0);

tableEnv.executeSql(
String.format(
"CREATE TEMPORARY SYSTEM FUNCTION myLower AS '%s' USING JAR '%s'",
udfClassName1, jarPath1));
assertStreamJarsOf(0);
assertTableJarsOf(tableEnv, 0);

// This is not recommended, usually this option should be set before
// but just for testing proper merging.
final Configuration customConfig = new Configuration();
customConfig.set(PipelineOptions.JARS, Collections.singletonList(jarPath2));
env.configure(customConfig);
assertStreamJarsOf(1);
assertTableJarsOf(tableEnv, 1);

final DataStream<String> resultStream =
tableEnv.toDataStream(
tableEnv.sqlQuery(
"SELECT myLower(s) FROM (VALUES ('Bob'), ('Alice')) AS T(s)"),
String.class);
assertStreamJarsOf(2);
assertTableJarsOf(tableEnv, 2);

testResult(resultStream, "bob", "alice");

tableEnv.executeSql(
String.format(
"CREATE TEMPORARY SYSTEM FUNCTION myUpper AS '%s' USING JAR '%s'",
udfClassName2, jarPath2));
assertStreamJarsOf(2);
assertTableJarsOf(tableEnv, 2);

final TableResult tableResult =
tableEnv.executeSql("SELECT myUpper(s) FROM (VALUES ('Bob'), ('Alice')) AS T(s)");
assertStreamJarsOf(2);
assertTableJarsOf(tableEnv, 2);

testResult(tableResult, Row.of("BOB"), Row.of("ALICE"));
}

// --------------------------------------------------------------------------------------------
// Helper methods
// --------------------------------------------------------------------------------------------

private static void assertTableJarsOf(TableEnvironment tableEnv, int size) {
assertThat(tableEnv.getConfig().get(PipelineOptions.JARS)).hasSize(size);
}

private void assertStreamJarsOf(int size) {
assertThat(env.getConfiguration().get(PipelineOptions.JARS)).hasSize(size);
}

private Table getComplexUnifiedPipeline(StreamExecutionEnvironment env) {

final DataStream<String> allowedNamesStream = env.fromElements("Bob", "Alice");
Expand Down

0 comments on commit eb69b11

Please sign in to comment.