Skip to content

Commit

Permalink
[FLINK-3779] [runtime] Add QueryableStateClient
Browse files Browse the repository at this point in the history
- Adds a client, which works with the network client and location lookup service
  to query KvState instances.

- Furthermore, location information is cached.
  • Loading branch information
uce committed Aug 9, 2016
1 parent 775a787 commit 329610d
Show file tree
Hide file tree
Showing 12 changed files with 975 additions and 17 deletions.
14 changes: 14 additions & 0 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,20 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use

- `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. It has to be an absolute path.

## Queryable State

### Server

- `query.server.port`: Port to bind queryable state server to (Default: `0`, binds to random port).
- `query.server.network-threads`: Number of network (Netty's event loop) Threads for queryable state server (Default: `0`, picks number of slots).
- `query.server.query-threads`: Number of query Threads for queryable state server (Default: `0`, picks number of slots).

### Client

- `query.client.network-threads`: Number of network (Netty's event loop) Threads for queryable state client (Default: `0`, picks number of available cores as returned by `Runtime.getRuntime().availableProcessors()`).
- `query.client.lookup.num-retries`: Number of retries on KvState lookup failure due to unavailable JobManager (Default: `3`).
- `query.client.lookup.retry-delay`: Retry delay in milliseconds on KvState lookup failure due to unavailable JobManager (Default: `1000`).

## Metrics

- `metrics.reporters`: The list of named reporters, i.e. "foo,bar".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,44 @@ public final class ConfigConstants {
/** ZooKeeper default leader port. */
public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;

// ------------------------- Queryable state ------------------------------

/** Port to bind KvState server to. */
public static final String QUERYABLE_STATE_SERVER_PORT = "query.server.port";

/** Number of network (event loop) threads for the KvState server. */
public static final String QUERYABLE_STATE_SERVER_NETWORK_THREADS = "query.server.network-threads";

/** Number of query threads for the KvState server. */
public static final String QUERYABLE_STATE_SERVER_QUERY_THREADS = "query.server.query-threads";

/** Default port to bind KvState server to (0 => pick random free port). */
public static final int DEFAULT_QUERYABLE_STATE_SERVER_PORT = 0;

/** Default Number of network (event loop) threads for the KvState server (0 => #slots). */
public static final int DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS = 0;

/** Default number of query threads for the KvState server (0 => #slots). */
public static final int DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS = 0;

/** Number of network (event loop) threads for the KvState client. */
public static final String QUERYABLE_STATE_CLIENT_NETWORK_THREADS = "query.client.network-threads";

/** Number of retries on location lookup failures. */
public static final String QUERYABLE_STATE_CLIENT_LOOKUP_RETRIES = "query.client.lookup.num-retries";

/** Retry delay on location lookup failures (millis). */
public static final String QUERYABLE_STATE_CLIENT_LOOKUP_RETRY_DELAY = "query.client.lookup.retry-delay";

/** Default number of query threads for the KvState client (0 => #cores) */
public static final int DEFAULT_QUERYABLE_STATE_CLIENT_NETWORK_THREADS = 0;

/** Default number of retries on location lookup failures. */
public static final int DEFAULT_QUERYABLE_STATE_CLIENT_LOOKUP_RETRIES = 3;

/** Default retry delay on location lookup failures. */
public static final int DEFAULT_QUERYABLE_STATE_CLIENT_LOOKUP_RETRY_DELAY = 1000;

// ----------------------------- Environment Variables ----------------------------

/** The environment variable name which contains the location of the configuration directory */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -242,21 +242,33 @@ public void associateWithTaskManagerAndJobManager(
try {
kvStateRegistry = new KvStateRegistry();

kvStateServer = new KvStateServer(
connectionInfo.address(),
0,
1,
10,
kvStateRegistry,
new AtomicKvStateRequestStats());
if (nettyConfig.isDefined()) {
int numNetworkThreads = configuration.queryServerNetworkThreads();
if (numNetworkThreads == 0) {
numNetworkThreads = nettyConfig.get().getNumberOfSlots();
}

kvStateServer.start();
int numQueryThreads = configuration.queryServerNetworkThreads();
if (numQueryThreads == 0) {
numQueryThreads = nettyConfig.get().getNumberOfSlots();
}

KvStateRegistryListener listener = new JobManagerKvStateRegistryListener(
jobManagerGateway,
kvStateServer.getAddress());
kvStateServer = new KvStateServer(
connectionInfo.address(),
configuration.queryServerPort(),
numNetworkThreads,
numQueryThreads,
kvStateRegistry,
new DisabledKvStateRequestStats());

kvStateRegistry.registerListener(listener);
kvStateServer.start();

KvStateRegistryListener listener = new JobManagerKvStateRegistryListener(
jobManagerGateway,
kvStateServer.getAddress());

kvStateRegistry.registerListener(listener);
}
} catch (Throwable t) {
throw new IOException("Failed to instantiate KvState management components: "
+ t.getMessage(), t);
Expand Down
Loading

0 comments on commit 329610d

Please sign in to comment.