Skip to content

Commit

Permalink
[FLINK-33713][core] Deprecate RuntimeContext#getExecutionConfig
Browse files Browse the repository at this point in the history
This closes apache#23848.
  • Loading branch information
JunRuiLee authored and zhuzhurk committed Dec 11, 2023
1 parent 0e515dc commit 3aa70df
Show file tree
Hide file tree
Showing 49 changed files with 461 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ public void open(OpenContext openContext) throws Exception {
Class<KEYIN> inKeyClass =
(Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
TypeSerializer<KEYIN> keySerializer =
TypeExtractor.getForClass(inKeyClass)
.createSerializer(getRuntimeContext().getExecutionConfig());
getRuntimeContext().createSerializer(TypeExtractor.getForClass(inKeyClass));
this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer);
this.combineCollector = new HadoopOutputCollector<>();
this.reduceCollector = new HadoopOutputCollector<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public void open(OpenContext openContext) throws Exception {
Class<KEYIN> inKeyClass =
(Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
TypeSerializer<KEYIN> keySerializer =
TypeExtractor.getForClass(inKeyClass)
.createSerializer(getRuntimeContext().getExecutionConfig());
getRuntimeContext().createSerializer(TypeExtractor.getForClass(inKeyClass));
this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.groups.OperatorMetricGroup;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -120,9 +123,39 @@ public interface RuntimeContext {
/**
* Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
* job.
*
* @deprecated This method has been deprecated and will be removed in the upcoming FLINK major
* version FLINK-2.0. Users relying on this method should migrate to alternative getter
* methods, such as {@link #getGlobalJobParameters()} or {@link #isObjectReuseEnabled()}.
*/
@Deprecated
ExecutionConfig getExecutionConfig();

/**
* Create a serializer for a given type.
*
* @param typeInformation the type information of the object to be serialized
* @return the serializer for the given type
*/
@PublicEvolving
<T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation);

/**
* Get global job parameters.
*
* @return the global job parameters
*/
@PublicEvolving
Map<String, String> getGlobalJobParameters();

/**
* Check if object reuse is enabled.
*
* @return true if object reuse is enabled, false otherwise
*/
@PublicEvolving
boolean isObjectReuseEnabled();

/**
* Gets the ClassLoader to load classes that are not in system's classpath, but are part of the
* jar file of a user job.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

/** A factory for {@link TypeSerializer} implementations. */
@Internal
@FunctionalInterface
public interface SerializerFactory {

/**
* Creates a serializer for the specified type of data.
*
* @param typeInformation the type of data that the serializer will be used to serialize or
* deserialize
* @return an instance of a class that implements the {@link TypeSerializer} interface
*/
<T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;

import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -83,10 +86,26 @@ public AbstractRuntimeUDFContext(
}

@Override
@Deprecated
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}

@Override
public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
return typeInformation.createSerializer(executionConfig);
}

@Override
public Map<String, String> getGlobalJobParameters() {
return Collections.unmodifiableMap(executionConfig.getGlobalJobParameters().toMap());
}

@Override
public boolean isObjectReuseEnabled() {
return executionConfig.isObjectReuseEnabled();
}

@Override
public String getTaskName() {
return taskInfo.getTaskName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.SerializerFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -314,10 +315,22 @@ public boolean isSerializerInitialized() {
* @param executionConfig The execution config to use when creating the serializer.
*/
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
initializeSerializerUnlessSet(
new SerializerFactory() {
@Override
public <T> TypeSerializer<T> createSerializer(
TypeInformation<T> typeInformation) {
return typeInformation.createSerializer(executionConfig);
}
});
}

@Internal
public void initializeSerializerUnlessSet(SerializerFactory serializerFactory) {
if (serializerAtomicReference.get() == null) {
checkState(typeInfo != null, "no serializer and no type info");
// try to instantiate and set the serializer
TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
TypeSerializer<T> serializer = serializerFactory.createSerializer(typeInfo);
// use cas to assure the singleton
if (!serializerAtomicReference.compareAndSet(null, serializer)) {
LOG.debug("Someone else beat us at initializing the serializer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.groups.OperatorMetricGroup;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -105,10 +108,26 @@ public String getTaskNameWithSubtasks() {
}

@Override
@Deprecated
public ExecutionConfig getExecutionConfig() {
return runtimeContext.getExecutionConfig();
}

@Override
public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
return runtimeContext.createSerializer(typeInformation);
}

@Override
public Map<String, String> getGlobalJobParameters() {
return runtimeContext.getGlobalJobParameters();
}

@Override
public boolean isObjectReuseEnabled() {
return runtimeContext.isObjectReuseEnabled();
}

@Override
public ClassLoader getUserCodeClassLoader() {
return runtimeContext.getUserCodeClassLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.cep.operator;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.AggregateFunction;
Expand All @@ -45,6 +44,7 @@

import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -114,9 +114,11 @@ public void testCepRuntimeContext() {
final int indexOfSubtask = 43;
final int attemptNumber = 1337;
final String taskNameWithSubtask = "barfoo";
final ExecutionConfig executionConfig = mock(ExecutionConfig.class);
final Map<String, String> globalJobParameters = new HashMap<>();
globalJobParameters.put("k1", "v1");
final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
final DistributedCache distributedCache = mock(DistributedCache.class);
final boolean isObjectReused = true;

RuntimeContext mockedRuntimeContext = mock(RuntimeContext.class);

Expand All @@ -127,7 +129,8 @@ public void testCepRuntimeContext() {
when(mockedRuntimeContext.getIndexOfThisSubtask()).thenReturn(indexOfSubtask);
when(mockedRuntimeContext.getAttemptNumber()).thenReturn(attemptNumber);
when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);
when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);
when(mockedRuntimeContext.getGlobalJobParameters()).thenReturn(globalJobParameters);
when(mockedRuntimeContext.isObjectReuseEnabled()).thenReturn(isObjectReused);
when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
when(mockedRuntimeContext.getDistributedCache()).thenReturn(distributedCache);

Expand All @@ -139,7 +142,8 @@ public void testCepRuntimeContext() {
assertEquals(indexOfSubtask, runtimeContext.getIndexOfThisSubtask());
assertEquals(attemptNumber, runtimeContext.getAttemptNumber());
assertEquals(taskNameWithSubtask, runtimeContext.getTaskNameWithSubtasks());
assertEquals(executionConfig, runtimeContext.getExecutionConfig());
assertEquals(globalJobParameters, runtimeContext.getGlobalJobParameters());
assertEquals(isObjectReused, runtimeContext.isObjectReuseEnabled());
assertEquals(userCodeClassLoader, runtimeContext.getUserCodeClassLoader());
assertEquals(distributedCache, runtimeContext.getDistributedCache());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ private <K, T, OUT> DataSet<OUT> readWindowOperator(
metadata.getOperatorState(uid),
stateBackend,
env.getConfiguration(),
operator);
operator,
env.getConfig());

return env.createInput(format, outputType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ private <K, T, OUT> DataStream<OUT> readWindowOperator(
metadata.getOperatorState(OperatorIdentifier.forUid(uid)),
stateBackend,
MutableConfig.of(env.getConfiguration()),
operator);
operator,
env.getConfig());

return SourceBuilder.fromFormat(env, format, outputType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ public <T> DataSource<T> readListState(String uid, String name, TypeInformation<
ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
ListStateInputFormat<T> inputFormat =
new ListStateInputFormat<>(
operatorState, env.getConfiguration(), stateBackend, descriptor);
operatorState,
env.getConfiguration(),
stateBackend,
descriptor,
env.getConfig());
return env.createInput(inputFormat, typeInfo);
}

Expand All @@ -141,7 +145,11 @@ public <T> DataSource<T> readListState(
ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, serializer);
ListStateInputFormat<T> inputFormat =
new ListStateInputFormat<>(
operatorState, env.getConfiguration(), stateBackend, descriptor);
operatorState,
env.getConfiguration(),
stateBackend,
descriptor,
env.getConfig());
return env.createInput(inputFormat, typeInfo);
}

Expand All @@ -161,7 +169,11 @@ public <T> DataSource<T> readUnionState(String uid, String name, TypeInformation
ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
UnionStateInputFormat<T> inputFormat =
new UnionStateInputFormat<>(
operatorState, env.getConfiguration(), stateBackend, descriptor);
operatorState,
env.getConfiguration(),
stateBackend,
descriptor,
env.getConfig());
return env.createInput(inputFormat, typeInfo);
}

Expand All @@ -186,7 +198,11 @@ public <T> DataSource<T> readUnionState(
ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, serializer);
UnionStateInputFormat<T> inputFormat =
new UnionStateInputFormat<>(
operatorState, env.getConfiguration(), stateBackend, descriptor);
operatorState,
env.getConfiguration(),
stateBackend,
descriptor,
env.getConfig());
return env.createInput(inputFormat, typeInfo);
}

Expand Down Expand Up @@ -214,7 +230,11 @@ public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
new MapStateDescriptor<>(name, keyTypeInfo, valueTypeInfo);
BroadcastStateInputFormat<K, V> inputFormat =
new BroadcastStateInputFormat<>(
operatorState, env.getConfiguration(), stateBackend, descriptor);
operatorState,
env.getConfiguration(),
stateBackend,
descriptor,
env.getConfig());
return env.createInput(inputFormat, new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo));
}

Expand Down Expand Up @@ -248,7 +268,11 @@ public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
new MapStateDescriptor<>(name, keySerializer, valueSerializer);
BroadcastStateInputFormat<K, V> inputFormat =
new BroadcastStateInputFormat<>(
operatorState, env.getConfiguration(), stateBackend, descriptor);
operatorState,
env.getConfiguration(),
stateBackend,
descriptor,
env.getConfig());
return env.createInput(inputFormat, new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo));
}

Expand Down Expand Up @@ -325,7 +349,8 @@ public <K, OUT> DataSource<OUT> readKeyedState(
operatorState,
stateBackend,
env.getConfiguration(),
new KeyedStateReaderOperator<>(function, keyTypeInfo));
new KeyedStateReaderOperator<>(function, keyTypeInfo),
env.getConfig());

return env.createInput(inputFormat, outTypeInfo);
}
Expand Down
Loading

0 comments on commit 3aa70df

Please sign in to comment.