Skip to content

Commit

Permalink
[FLINK-12732] [state-processing-api] Miscellaneous cleanup for read p…
Browse files Browse the repository at this point in the history
…art of State Processing API

This closes apache#8618.
  • Loading branch information
tzulitai committed Jun 26, 2019
1 parent 05a45c3 commit 861bf73
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
@PublicEvolving
public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {

private static final long serialVersionUID = 3873843034140417407L;

/**
* Initialization method for the function. It is called before {@link #readKey(Object,
* Context, Collector)} and thus suitable for one time setup work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
@Internal
public class KeyedStateInputFormat<K, OUT> extends RichInputFormat<OUT, KeyGroupRangeInputSplit> implements KeyContext {

private static final long serialVersionUID = 8230460226049597182L;

private final OperatorState operatorState;

private final StateBackend stateBackend;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.util.Preconditions;

import java.util.Iterator;
import java.util.List;
Expand All @@ -44,9 +45,9 @@ final class MultiStateKeyIterator<K> implements Iterator<K> {
private K currentKey;

MultiStateKeyIterator(List<? extends StateDescriptor<?, ?>> descriptors, AbstractKeyedStateBackend<K> backend) {
this.descriptors = descriptors;
this.descriptors = Preconditions.checkNotNull(descriptors);

this.backend = backend;
this.backend = Preconditions.checkNotNull(backend);

this.internal = descriptors
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
@Internal
public final class KeyGroupRangeInputSplit implements InputSplit {

private static final long serialVersionUID = -3715297712294815706L;

private final List<KeyedStateHandle> managedKeyedState;

private final List<KeyedStateHandle> rawKeyedState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -86,10 +87,13 @@ private SavepointEnvironment(RuntimeContext ctx, Configuration configuration, in
this.jobID = new JobID();
this.vertexID = new JobVertexID();
this.attemptID = new ExecutionAttemptID();
this.ctx = ctx;
this.configuration = configuration;
this.ctx = Preconditions.checkNotNull(ctx);
this.configuration = Preconditions.checkNotNull(configuration);

Preconditions.checkArgument(maxParallelism > 0 && indexOfSubtask < maxParallelism);
this.maxParallelism = maxParallelism;
this.indexOfSubtask = indexOfSubtask;

this.registry = new KvStateRegistry().createTaskRegistry(jobID, vertexID);
this.taskStateManager = new SavepointTaskStateManager(prioritizedOperatorSubtaskState);
this.ioManager = new IOManagerAsync();
Expand Down Expand Up @@ -256,7 +260,9 @@ public static class Builder {
private PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskState;

public Builder(RuntimeContext ctx, int maxParallelism) {
this.ctx = ctx;
this.ctx = Preconditions.checkNotNull(ctx);

Preconditions.checkArgument(maxParallelism > 0);
this.maxParallelism = maxParallelism;

this.prioritizedOperatorSubtaskState = PrioritizedOperatorSubtaskState.emptyNotRestored();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
* required to run the {@code state-processor-api}.
*/
final class SavepointLocalRecoveryProvider implements LocalRecoveryDirectoryProvider {

private static final long serialVersionUID = 9205635927790634162L;

private static final String MSG = "This method should never be called";

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -71,8 +72,8 @@ public final class SavepointRuntimeContext implements RuntimeContext {
private boolean stateRegistrationAllowed;

public SavepointRuntimeContext(RuntimeContext ctx, KeyedStateStore keyedStateStore) {
this.ctx = ctx;
this.keyedStateStore = keyedStateStore;
this.ctx = Preconditions.checkNotNull(ctx);
this.keyedStateStore = Preconditions.checkNotNull(keyedStateStore);
this.stateRegistrationAllowed = true;

this.registeredDescriptors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Preconditions;

/**
* A minimally implemented {@link TaskManagerRuntimeInfo} that provides the functionality required
Expand All @@ -30,7 +31,7 @@ class SavepointTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo {
private final IOManager ioManager;

SavepointTaskManagerRuntimeInfo(IOManager ioManager) {
this.ioManager = ioManager;
this.ioManager = Preconditions.checkNotNull(ioManager);
}

@Override
Expand Down

0 comments on commit 861bf73

Please sign in to comment.