Skip to content

Commit

Permalink
[FLINK-33266][sql-gateway] Support plan cache for DQL in SQL Gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
zoudan authored and libenchao committed Dec 7, 2023
1 parent 978bff3 commit b6380d5
Show file tree
Hide file tree
Showing 21 changed files with 730 additions and 28 deletions.
9 changes: 5 additions & 4 deletions docs/content.zh/docs/dev/table/olap_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ Session Cluster 和 SQL Gateway 都依赖连接器来获取表的元信息同时

#### SQL&Table 参数

| 参数名称 | 默认值 | 推荐值 |
|:---------------------------------------------------------------------------------------------------------------|:------|:-----|
| [table.optimizer.join-reorder-enabled]({{<ref "docs/dev/table/config#table-optimizer-join-reorder-enabled">}}) | false | true |
| [pipeline.object-reuse]({{< ref "docs/deployment/config#pipeline-object-reuse" >}}) | false | true |
| 参数名称 | 默认值 | 推荐值 |
|:----------------------------------------------------------------------------------------------------------------------------------|:------|:-----|
| [table.optimizer.join-reorder-enabled]({{<ref "docs/dev/table/config#table-optimizer-join-reorder-enabled">}}) | false | true |
| [pipeline.object-reuse]({{< ref "docs/deployment/config#pipeline-object-reuse" >}}) | false | true |
| [sql-gateway.session.plan-cache.enabled]({{<ref "docs/dev/table/sql-gateway/overview#sql-gateway-session-plan-cache-enabled">}}) | false | true |

#### Runtime 参数

Expand Down
18 changes: 18 additions & 0 deletions docs/content.zh/docs/dev/table/sql-gateway/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,24 @@ $ ./sql-gateway -Dkey=value
<td>Integer</td>
<td>SQL Gateway 服务中存活 session 的最大数量。</td>
</tr>
<tr>
<td><h5>sql-gateway.session.plan-cache.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>设置为 true 的时候,SQL Gateway 会在一个 session 内部缓存并复用 plan。</td>
</tr>
<tr>
<td><h5>sql-gateway.session.plan-cache.size</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Plan cache 的大小, 当且仅当 `table.optimizer.plan-cache.enabled` 为 true 的时候生效。</td>
</tr>
<tr>
<td><h5>sql-gateway.session.plan-cache.ttl</h5></td>
<td style="word-wrap: break-word;">1 hour</td>
<td>Duration</td>
<td>Plan cache 的 TTL, 控制 cache 在写入之后多久过期, 当且仅当 `table.optimizer.plan-cache.enabled` 为 true 的时候生效。</td>
</tr>
<tr>
<td><h5>sql-gateway.worker.keepalive-time</h5></td>
<td style="word-wrap: break-word;">5 min</td>
Expand Down
11 changes: 6 additions & 5 deletions docs/content/docs/dev/table/olap_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ In OLAP scenario, appropriate configurations that can greatly help users improve

#### SQL&Table Options

| Parameters | Default | Recommended |
|:---------------------------------------------------------------------------------------------------------------|:--------|:------------|
| [table.optimizer.join-reorder-enabled]({{<ref "docs/dev/table/config#table-optimizer-join-reorder-enabled">}}) | false | true |
| [pipeline.object-reuse]({{< ref "docs/deployment/config#pipeline-object-reuse" >}}) | false | true |
| Parameters | Default | Recommended |
|:----------------------------------------------------------------------------------------------------------------------------------|:----------|:------------|
| [table.optimizer.join-reorder-enabled]({{<ref "docs/dev/table/config#table-optimizer-join-reorder-enabled">}}) | false | true |
| [pipeline.object-reuse]({{< ref "docs/deployment/config#pipeline-object-reuse" >}}) | false | true |
| [sql-gateway.session.plan-cache.enabled]({{<ref "docs/dev/table/sql-gateway/overview#sql-gateway-session-plan-cache-enabled">}}) | false | true |

#### Runtime Options

Expand Down Expand Up @@ -205,4 +206,4 @@ You can configure `slotmanager.number-of-slots.min` to a proper value as the res
## Future Work
Flink OLAP is now part of [Apache Flink Roadmap](https://flink.apache.org/what-is-flink/roadmap/), which means the community will keep putting efforts to improve Flink OLAP, both in usability and query performance. Relevant work are traced in underlying tickets:
- https://issues.apache.org/jira/browse/FLINK-25318
- https://issues.apache.org/jira/browse/FLINK-32898
- https://issues.apache.org/jira/browse/FLINK-32898
18 changes: 18 additions & 0 deletions docs/content/docs/dev/table/sql-gateway/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,24 @@ $ ./sql-gateway -Dkey=value
<td>Integer</td>
<td>The maximum number of the active session for sql gateway service.</td>
</tr>
<tr>
<td><h5>sql-gateway.session.plan-cache.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When it is true, sql gateway will cache and reuse plans for queries per session.</td>
</tr>
<tr>
<td><h5>sql-gateway.session.plan-cache.size</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Plan cache size, it takes effect iff `table.optimizer.plan-cache.enabled` is true.</td>
</tr>
<tr>
<td><h5>sql-gateway.session.plan-cache.ttl</h5></td>
<td style="word-wrap: break-word;">1 hour</td>
<td>Duration</td>
<td>TTL for plan cache, it controls how long will the cache expire after write, it takes effect iff `table.optimizer.plan-cache.enabled` is true.</td>
</tr>
<tr>
<td><h5>sql-gateway.worker.keepalive-time</h5></td>
<td style="word-wrap: break-word;">5 min</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,23 @@ public class CollectResultIterator<T> implements CloseableIterator<T> {
private final CollectResultFetcher<T> fetcher;
private T bufferedResult;

private CompletableFuture<OperatorID> operatorIdFuture;
private TypeSerializer<T> serializer;
private String accumulatorName;
private CheckpointConfig checkpointConfig;
private long resultFetchTimeout;

public CollectResultIterator(
CompletableFuture<OperatorID> operatorIdFuture,
TypeSerializer<T> serializer,
String accumulatorName,
CheckpointConfig checkpointConfig,
long resultFetchTimeout) {
this.operatorIdFuture = operatorIdFuture;
this.serializer = serializer;
this.accumulatorName = accumulatorName;
this.checkpointConfig = checkpointConfig;
this.resultFetchTimeout = resultFetchTimeout;
AbstractCollectResultBuffer<T> buffer = createBuffer(serializer, checkpointConfig);
this.fetcher =
new CollectResultFetcher<>(
Expand Down Expand Up @@ -131,4 +142,13 @@ private AbstractCollectResultBuffer<T> createBuffer(
return new UncheckpointedCollectResultBuffer<>(serializer, false);
}
}

public CollectResultIterator<T> copy() {
return new CollectResultIterator<>(
this.operatorIdFuture,
this.serializer,
this.accumulatorName,
this.checkpointConfig,
this.resultFetchTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,30 @@ public class SqlGatewayServiceConfigOptions {
.withDescription(
"The maximum number of the active session for sql gateway service.");

public static final ConfigOption<Boolean> SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED =
key("sql-gateway.session.plan-cache.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"When it is true, sql gateway will cache and reuse plans for queries per session.");

public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE =
key("sql-gateway.session.plan-cache.size")
.intType()
.defaultValue(100)
.withDescription(
"Plan cache size, it takes effect iff "
+ "`table.optimizer.plan-cache.enabled` is true.");

public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_PLAN_CACHE_TTL =
key("sql-gateway.session.plan-cache.ttl")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription(
"TTL for plan cache, it controls how long will the "
+ "cache expire after write, it takes effect iff "
+ "`table.optimizer.plan-cache.enabled` is true.");

public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX =
key("sql-gateway.worker.threads.max")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.PlanCacheManager;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogStoreHolder;
Expand All @@ -50,18 +51,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ExecutorService;

import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED;
import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE;
import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_PLAN_CACHE_TTL;

/**
* Context describing a session, it's mainly used for user to open a new session in the backend. If
* client request to open a new session, the backend {@code Executor} will maintain the session
Expand All @@ -85,6 +93,8 @@ public class SessionContext {
private boolean isStatementSetState;
private final List<ModifyOperation> statementSetOperations;

@Nullable private final PlanCacheManager planCacheManager;

protected SessionContext(
DefaultContext defaultContext,
SessionHandle sessionId,
Expand All @@ -102,6 +112,18 @@ protected SessionContext(
this.operationManager = operationManager;
this.isStatementSetState = false;
this.statementSetOperations = new ArrayList<>();
this.planCacheManager = createPlanCacheManager(sessionConf);
}

@Nullable
private static PlanCacheManager createPlanCacheManager(ReadableConfig readableConfig) {
boolean planCacheEnabled = readableConfig.get(SQL_GATEWAY_SESSION_PLAN_CACHE_ENABLED);
if (planCacheEnabled) {
int planCacheSize = readableConfig.get(SQL_GATEWAY_SESSION_PLAN_CACHE_SIZE);
Duration ttl = readableConfig.get(SQL_GATEWAY_SESSION_PLAN_CACHE_TTL);
return new PlanCacheManager(planCacheSize, ttl);
}
return null;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -136,6 +158,11 @@ public URLClassLoader getUserClassloader() {
return userClassloader;
}

@Nullable
public PlanCacheManager getPlanCacheManager() {
return planCacheManager;
}

public void set(String key, String value) {
try {
// Test whether the key value will influence the creation of the Executor.
Expand All @@ -147,6 +174,7 @@ public void set(String key, String value) {
String.format("Failed to set key %s with value %s.", key, value), e);
}
sessionConf.setString(key, value);
invalidatePlanCacheIfExist();
}

public synchronized void reset(String key) {
Expand All @@ -159,13 +187,15 @@ public synchronized void reset(String key) {
} else {
sessionConf.removeConfig(option);
}
invalidatePlanCacheIfExist();
}

public synchronized void reset() {
for (String key : sessionConf.keySet()) {
sessionConf.removeConfig(ConfigOptions.key(key).stringType().noDefaultValue());
}
sessionConf.addAll(defaultContext.getFlinkConfig());
invalidatePlanCacheIfExist();
}

// --------------------------------------------------------------------------------------------
Expand All @@ -176,6 +206,12 @@ public OperationExecutor createOperationExecutor(Configuration executionConfig)
return new OperationExecutor(this, executionConfig);
}

private void invalidatePlanCacheIfExist() {
if (planCacheManager != null) {
planCacheManager.invalidateAll();
}
}

// --------------------------------------------------------------------------------------------
// Begin statement set
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.CachedPlan;
import org.apache.flink.table.api.internal.ExecutableOperationContextImpl;
import org.apache.flink.table.api.internal.PlanCacheManager;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
Expand Down Expand Up @@ -193,28 +195,44 @@ public ResultFetcher executeStatement(OperationHandle handle, String statement)
// Instantiate the TableEnvironment lazily
ResourceManager resourceManager = sessionContext.getSessionState().resourceManager.copy();
TableEnvironmentInternal tableEnv = getTableEnvironment(resourceManager);
List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
if (parsedOperations.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+ "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
PlanCacheManager planCacheManager = sessionContext.getPlanCacheManager();
CachedPlan cachedPlan = null;
Operation op = null;

if (planCacheManager != null) {
Optional<CachedPlan> cachedPlanOptional = planCacheManager.getPlan(statement);
if (cachedPlanOptional.isPresent()) {
cachedPlan = cachedPlanOptional.get();
op = cachedPlan.getOperation();
}
}
Operation op = parsedOperations.get(0);
if (op == null) {
List<Operation> parsedOperations = tableEnv.getParser().parse(statement);
if (parsedOperations.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported SQL statement! Execute statement only accepts a single SQL statement or "
+ "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
}
op = parsedOperations.get(0);
}

if (op instanceof CallProcedureOperation) {
// if the operation is CallProcedureOperation, we need to set the stream environment
// context to it since the procedure will use the stream environment
try {
SqlGatewayStreamExecutionEnvironment.setAsContext(
sessionContext.getUserClassloader());
return executeOperation(tableEnv, handle, op).withResourceManager(resourceManager);
return executeOperation(tableEnv, handle, op, statement, cachedPlan)
.withResourceManager(resourceManager);
} finally {
SqlGatewayStreamExecutionEnvironment.unsetAsContext();
}
} else {
return sessionContext.isStatementSetState()
? executeOperationInStatementSetState(tableEnv, handle, op)
.withResourceManager(resourceManager)
: executeOperation(tableEnv, handle, op).withResourceManager(resourceManager);
: executeOperation(tableEnv, handle, op, statement, cachedPlan)
.withResourceManager(resourceManager);
}
}

Expand Down Expand Up @@ -426,7 +444,11 @@ private ResultFetcher executeOperationInStatementSetState(
}

private ResultFetcher executeOperation(
TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) {
TableEnvironmentInternal tableEnv,
OperationHandle handle,
Operation op,
String statement,
CachedPlan cachedPlan) {
if (op instanceof SetOperation) {
return callSetOperation(tableEnv, handle, (SetOperation) op);
} else if (op instanceof ResetOperation) {
Expand All @@ -446,7 +468,14 @@ private ResultFetcher executeOperation(
return callModifyOperations(
tableEnv, handle, ((StatementSetOperation) op).getOperations());
} else if (op instanceof QueryOperation) {
TableResultInternal result = tableEnv.executeInternal(op);
TableResultInternal result =
cachedPlan != null
? tableEnv.executeCachedPlanInternal(cachedPlan)
: tableEnv.executeInternal(op);
PlanCacheManager planCacheManager = sessionContext.getPlanCacheManager();
if (cachedPlan == null && planCacheManager != null && result.getCachedPlan() != null) {
planCacheManager.putPlan(statement, result.getCachedPlan());
}
return ResultFetcher.fromTableResult(handle, result, true);
} else if (op instanceof StopJobOperation) {
return callStopJobOperation(tableEnv, handle, (StopJobOperation) op);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.flink.table.gateway.service.session;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.internal.PlanCacheManager;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.operation.OperationManager;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.util.Map;

Expand Down Expand Up @@ -74,6 +77,11 @@ public OperationExecutor createExecutor(Configuration executionConfig) {
return sessionContext.createOperationExecutor(executionConfig);
}

@Nullable
public PlanCacheManager getPlanCacheManager() {
return sessionContext.getPlanCacheManager();
}

@Override
public void close() {
sessionContext.close();
Expand Down
Loading

0 comments on commit b6380d5

Please sign in to comment.