Skip to content

Commit

Permalink
[streaming] State backend configurable from flink-conf.yaml
Browse files Browse the repository at this point in the history
Closes apache#676
  • Loading branch information
gyfora committed May 19, 2015
1 parent 2d3e69a commit 1479973
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,18 @@ public final class ConfigConstants {
*/
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";

// ----------------------------- Streaming --------------------------------

/**
* State backend for checkpoints;
*/
public static final String STATE_BACKEND = "state.backend";

/**
* Directory for saving streaming checkpoints
*/
public static final String STATE_BACKEND_FS_DIR = "state.backend.fs.checkpointdir";

// ----------------------------- Miscellaneous ----------------------------

/**
Expand Down Expand Up @@ -624,6 +636,9 @@ public final class ConfigConstants {

public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";

// ----------------------------- Streaming Values --------------------------

public static String DEFAULT_STATE_BACKEND = "jobmanager";

// ----------------------------- LocalExecution ----------------------------

Expand Down
15 changes: 15 additions & 0 deletions flink-dist/src/main/resources/flink-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ jobmanager.web.port: 8081

webclient.port: 8080

#==============================================================================
# Streaming state checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends: jobmanager, filesystem

state.backend: jobmanager

# Directory for storing checkpoints in a flink supported filesystem
#
# state.backend.fs.checkpointdir: hdfs://checkpoints

#==============================================================================
# Advanced
#==============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -78,7 +77,7 @@ public class StreamGraph extends StreamingPlan {

private Map<Integer, StreamLoop> streamLoops;
protected Map<Integer, StreamLoop> vertexIDtoLoop;
private StateHandleProvider<?> stateHandleProvider = LocalStateHandle.createProvider();
private StateHandleProvider<?> stateHandleProvider;

public StreamGraph(StreamExecutionEnvironment environment) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.NotNullPredicate;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.util.event.EventListener;
Expand Down Expand Up @@ -76,7 +80,7 @@ public void registerInputOutput() {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.context = createRuntimeContext(getEnvironment().getTaskName());
this.stateHandleProvider = configuration.getStateHandleProvider(userClassLoader);
this.stateHandleProvider = getStateHandleProvider();

outputHandler = new OutputHandler<OUT>(this);

Expand All @@ -98,6 +102,52 @@ public StreamingRuntimeContext createRuntimeContext(String taskName) {
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
getExecutionConfig());
}

private StateHandleProvider<Serializable> getStateHandleProvider() {

StateHandleProvider<Serializable> provider = configuration
.getStateHandleProvider(userClassLoader);

// If the user did not specify a provider in the program we try to get it from the config
if (provider == null) {
String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND,
ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();

StateBackend backend;

try {
backend = StateBackend.valueOf(backendName);
} catch (Exception e) {
throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
}

switch (backend) {
case JOBMANAGER:
LOG.info("State backend for state checkpoints is set to jobmanager.");
return LocalStateHandle.createProvider();
case FILESYSTEM:
String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
if (checkpointDir != null) {
LOG.info("State backend for state checkpoints is set to filesystem with directory: "
+ checkpointDir);
return FileStateHandle.createProvider(checkpointDir);
} else {
throw new RuntimeException(
"For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
}
default:
throw new RuntimeException("Backend " + backend + " is not supported yet.");
}

} else {
LOG.info("Using user defined state backend for streaming checkpoitns.");
return provider;
}
}

private enum StateBackend {
JOBMANAGER, FILESYSTEM
}

protected void openOperator() throws Exception {
streamOperator.open(getTaskConfiguration());
Expand Down

0 comments on commit 1479973

Please sign in to comment.