Skip to content

Commit

Permalink
[FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
Browse files Browse the repository at this point in the history
[streaming-java]

- Adds a KvStateRegistry per TaskManager at which created KvState instances are
  registered/unregistered.

- Registered KvState instances are reported to the JobManager, whcih can be
  queried for KvStateLocation.
  • Loading branch information
uce committed Aug 9, 2016
1 parent a909adb commit 63c9b8e
Show file tree
Hide file tree
Showing 27 changed files with 2,062 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
Expand Down Expand Up @@ -311,6 +312,11 @@ private static Environment getMockEnvironment(File[] tempDirs) {
when(env.getJobID()).thenReturn(new JobID());
when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
when(env.getIOManager()).thenReturn(ioMan);

TaskInfo taskInfo = mock(TaskInfo.class);
when(env.getTaskInfo()).thenReturn(taskInfo);

when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
return env;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;

Expand Down Expand Up @@ -147,6 +149,13 @@ public interface Environment {
*/
AccumulatorRegistry getAccumulatorRegistry();

/**
* Returns the registry for {@link KvState} instances.
*
* @return KvState registry
*/
TaskKvStateRegistry getTaskKvStateRegistry();

/**
* Confirms that the invokable has successfully completed all steps it needed to
* to for the checkpoint with the give checkpoint-ID. This method does not include
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
Expand Down Expand Up @@ -224,6 +225,9 @@ public class ExecutionGraph {
/** The execution context which is used to execute futures. */
private ExecutionContext executionContext;

/** Registered KvState instances reported by the TaskManagers. */
private transient KvStateLocationRegistry kvStateLocationRegistry;

// ------ Fields that are only relevant for archived execution graphs ------------
private String jsonPlan;

Expand Down Expand Up @@ -304,6 +308,8 @@ public ExecutionGraph(
this.restartStrategy = restartStrategy;

metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());

this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -445,6 +451,10 @@ public SavepointCoordinator getSavepointCoordinator() {
return savepointCoordinator;
}

public KvStateLocationRegistry getKvStateLocationRegistry() {
return kvStateLocationRegistry;
}

public RestartStrategy getRestartStrategy() {
return restartStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
Expand All @@ -35,11 +36,21 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
import org.apache.flink.runtime.messages.TaskMessages.FailTask;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
Expand Down Expand Up @@ -84,6 +95,12 @@ public class NetworkEnvironment {

private PartitionStateChecker partitionStateChecker;

/** Server for {@link org.apache.flink.runtime.state.KvState} requests. */
private KvStateServer kvStateServer;

/** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */
private KvStateRegistry kvStateRegistry;

private boolean isShutdown;

/**
Expand All @@ -92,17 +109,21 @@ public class NetworkEnvironment {
*/
private final ExecutionContext executionContext;

private final InstanceConnectionInfo connectionInfo;

/**
* Initializes all network I/O components.
*/
public NetworkEnvironment(
ExecutionContext executionContext,
FiniteDuration jobManagerTimeout,
NetworkEnvironmentConfiguration config) throws IOException {
ExecutionContext executionContext,
FiniteDuration jobManagerTimeout,
NetworkEnvironmentConfiguration config,
InstanceConnectionInfo connectionInfo) throws IOException {

this.executionContext = executionContext;
this.configuration = checkNotNull(config);
this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
this.connectionInfo = checkNotNull(connectionInfo);

// create the network buffers - this is the operation most likely to fail upon
// mis-configuration, so we do this first
Expand Down Expand Up @@ -151,6 +172,10 @@ public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
return configuration.partitionRequestInitialAndMaxBackoff();
}

public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) {
return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
}

// --------------------------------------------------------------------------------------------
// Association / Disassociation with JobManager / TaskManager
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -183,7 +208,9 @@ public void associateWithTaskManagerAndJobManager(
if (this.partitionConsumableNotifier == null &&
this.partitionManager == null &&
this.taskEventDispatcher == null &&
this.connectionManager == null)
this.connectionManager == null &&
this.kvStateRegistry == null &&
this.kvStateServer == null)
{
// good, not currently associated. start the individual components

Expand Down Expand Up @@ -211,6 +238,29 @@ public void associateWithTaskManagerAndJobManager(
catch (Throwable t) {
throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
}

try {
kvStateRegistry = new KvStateRegistry();

kvStateServer = new KvStateServer(
connectionInfo.address(),
0,
1,
10,
kvStateRegistry,
new AtomicKvStateRequestStats());

kvStateServer.start();

KvStateRegistryListener listener = new JobManagerKvStateRegistryListener(
jobManagerGateway,
kvStateServer.getAddress());

kvStateRegistry.registerListener(listener);
} catch (Throwable t) {
throw new IOException("Failed to instantiate KvState management components: "
+ t.getMessage(), t);
}
}
else {
throw new IllegalStateException(
Expand All @@ -227,6 +277,19 @@ public void disassociate() throws IOException {

LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");

// Shut down KvStateRegistry
kvStateRegistry = null;

// Shut down KvStateServer
if (kvStateServer != null) {
try {
kvStateServer.shutDown();
} catch (Throwable t) {
throw new IOException("Cannot shutdown KvStateNettyServer", t);
}
kvStateServer = null;
}

// terminate all network connections
if (connectionManager != null) {
try {
Expand Down Expand Up @@ -511,4 +574,58 @@ public void triggerPartitionStateCheck(
jobManager.tell(msg, taskManager);
}
}

/**
* Simple {@link KvStateRegistry} listener, which forwards registrations to
* the JobManager.
*/
private static class JobManagerKvStateRegistryListener implements KvStateRegistryListener {

private ActorGateway jobManager;

private KvStateServerAddress kvStateServerAddress;

public JobManagerKvStateRegistryListener(
ActorGateway jobManager,
KvStateServerAddress kvStateServerAddress) {

this.jobManager = Preconditions.checkNotNull(jobManager, "JobManager");
this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
}

@Override
public void notifyKvStateRegistered(
JobID jobId,
JobVertexID jobVertexId,
int keyGroupIndex,
String registrationName,
KvStateID kvStateId) {

Object msg = new KvStateMessage.NotifyKvStateRegistered(
jobId,
jobVertexId,
keyGroupIndex,
registrationName,
kvStateId,
kvStateServerAddress);

jobManager.tell(msg);
}

@Override
public void notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
int keyGroupIndex,
String registrationName) {

Object msg = new KvStateMessage.NotifyKvStateUnregistered(
jobId,
jobVertexId,
keyGroupIndex,
registrationName);

jobManager.tell(msg);
}
}
}
Loading

0 comments on commit 63c9b8e

Please sign in to comment.