Skip to content

Commit

Permalink
[FLINK-28655][sql-gateway] Support SHOW JOBS syntax in SqlGateway
Browse files Browse the repository at this point in the history
This closes apache#21582
  • Loading branch information
link3280 authored Jan 12, 2023
1 parent f611ea8 commit f177089
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
Expand Down Expand Up @@ -64,10 +65,12 @@
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJobsOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AlterOperation;
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
Expand All @@ -77,6 +80,7 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -89,9 +93,12 @@

import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_CANDIDATES;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_NAME;
import static org.apache.flink.table.gateway.service.utils.Constants.SAVEPOINT_PATH;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
import static org.apache.flink.table.gateway.service.utils.Constants.START_TIME;
import static org.apache.flink.table.gateway.service.utils.Constants.STATUS;
import static org.apache.flink.util.Preconditions.checkArgument;

/** An executor to execute the {@link Operation}. */
Expand Down Expand Up @@ -316,6 +323,8 @@ private ResultFetcher executeOperation(
return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
} else if (op instanceof StopJobOperation) {
return callStopJobOperation(handle, (StopJobOperation) op);
} else if (op instanceof ShowJobsOperation) {
return callShowJobsOperation(handle, (ShowJobsOperation) op);
} else {
return callOperation(tableEnv, handle, op);
}
Expand Down Expand Up @@ -524,6 +533,46 @@ private ResultFetcher buildOkResultFetcher(OperationHandle handle) {
TableResultInternal.TABLE_RESULT_OK.collectInternal()));
}

public ResultFetcher callShowJobsOperation(
OperationHandle operationHandle, ShowJobsOperation showJobsOperation)
throws SqlExecutionException {
Duration clientTimeout =
Configuration.fromMap(sessionContext.getConfigMap())
.get(ClientOptions.CLIENT_TIMEOUT);
Collection<JobStatusMessage> jobs =
runClusterAction(
operationHandle,
clusterClient -> {
try {
return clusterClient
.listJobs()
.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new SqlExecutionException(
"Failed to list jobs in the cluster.", e);
}
});
List<RowData> resultRows =
jobs.stream()
.map(
job ->
GenericRowData.of(
StringData.fromString(job.getJobId().toString()),
StringData.fromString(job.getJobName()),
StringData.fromString(job.getJobState().toString()),
DateTimeUtils.toTimestampData(
job.getStartTime(), 3)))
.collect(Collectors.toList());
return new ResultFetcher(
operationHandle,
ResolvedSchema.of(
Column.physical(JOB_ID, DataTypes.STRING()),
Column.physical(JOB_NAME, DataTypes.STRING()),
Column.physical(STATUS, DataTypes.STRING()),
Column.physical(START_TIME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
resultRows);
}

/**
* Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
* against it.
Expand All @@ -536,7 +585,7 @@ private ResultFetcher buildOkResultFetcher(OperationHandle handle) {
*/
private <ClusterID, Result> Result runClusterAction(
OperationHandle handle, ClusterAction<ClusterID, Result> clusterAction)
throws FlinkException {
throws SqlExecutionException {
final Configuration configuration = Configuration.fromMap(sessionContext.getConfigMap());
final ClusterClientFactory<ClusterID> clusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(configuration);
Expand All @@ -549,6 +598,8 @@ private <ClusterID, Result> Result runClusterAction(
final ClusterClient<ClusterID> clusterClient =
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
return clusterAction.runAction(clusterClient);
} catch (FlinkException e) {
throw new SqlExecutionException("Failed to run cluster action.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
public class Constants {

public static final String JOB_ID = "job id";
public static final String JOB_NAME = "job name";
public static final String STATUS = "status";
public static final String START_TIME = "start time";
public static final String SET_KEY = "key";
public static final String SET_VALUE = "value";
public static final String COMPLETION_CANDIDATES = "candidates";

public static final String SAVEPOINT_PATH = "savepoint_path";
public static final String SAVEPOINT_PATH = "savepoint path";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
Expand Down Expand Up @@ -121,7 +122,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
public static final MiniClusterExtension MINI_CLUSTER =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberTaskManagers(2)
.build());

@RegisterExtension
Expand Down Expand Up @@ -456,6 +457,53 @@ public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
}

@Test
public void testShowJobsOperation(@InjectClusterClient RestClusterClient<?> restClusterClient)
throws Exception {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());

String pipelineName = "test-job";
configuration.setString(PipelineOptions.NAME, pipelineName);

// running jobs
String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
String insertSql = "INSERT INTO sink SELECT * FROM source;";

service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
service.executeStatement(sessionHandle, sinkDdl, -1, configuration);

long timeOpStart = System.currentTimeMillis();
OperationHandle insertsOperationHandle =
service.executeStatement(sessionHandle, insertSql, -1, configuration);
String jobId =
fetchAllResults(sessionHandle, insertsOperationHandle)
.get(0)
.getString(0)
.toString();

TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
long timeOpSucceed = System.currentTimeMillis();

OperationHandle showJobsOperationHandle1 =
service.executeStatement(sessionHandle, "SHOW JOBS", -1, configuration);

List<RowData> result = fetchAllResults(sessionHandle, showJobsOperationHandle1);
RowData jobRow =
result.stream()
.filter(row -> jobId.equals(row.getString(0).toString()))
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Test job " + jobId + " not found."));
assertThat(jobRow.getString(1).toString()).isEqualTo(pipelineName);
assertThat(jobRow.getString(2).toString()).isEqualTo("RUNNING");
assertThat(jobRow.getTimestamp(3, 3).getMillisecond())
.isBetween(timeOpStart, timeOpSucceed);
}

// --------------------------------------------------------------------------------------------
// Catalog API tests
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
"org.apache.flink.sql.parser.ddl.SqlStopJob"
"org.apache.flink.sql.parser.dql.SqlShowJobs"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
Expand Down Expand Up @@ -147,6 +148,7 @@
"JAR"
"JARS"
"JOB"
"JOBS"
"LOAD"
"METADATA"
"MODIFY"
Expand Down Expand Up @@ -294,8 +296,9 @@
"ISOYEAR"
"JAR"
"JARS"
"JOB"
"JAVA"
"JOB"
"JOBS"
"JSON"
"K"
"KEY"
Expand Down Expand Up @@ -569,6 +572,7 @@
"SqlReset()"
"SqlAnalyzeTable()"
"SqlStopJob()"
"SqlShowJobs()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2365,6 +2365,19 @@ SqlNode SqlAnalyzeTable():
}
}

/**
* Parse a "SHOW JOBS" statement.
*/
SqlShowJobs SqlShowJobs() :
{
}
{
<SHOW> <JOBS>
{
return new SqlShowJobs(getPos());
}
}

/**
* Parses a STOP JOB statement:
* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** The command to list running flink jobs in a cluster. */
public class SqlShowJobs extends SqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("SHOW JOBS", SqlKind.OTHER);

public SqlShowJobs(SqlParserPos pos) {
super(pos);
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.emptyList();
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("SHOW JOBS");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,11 @@ void testCreateTableAsSelectWithPartitionKey() {
"CREATE TABLE AS SELECT syntax does not support to create partitioned table yet."));
}

@Test
void testShowJobs() {
sql("show jobs").ok("SHOW JOBS");
}

@Test
void testStopJob() {
sql("STOP JOB 'myjob'").ok("STOP JOB 'myjob'");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.command;

import org.apache.flink.table.operations.ShowOperation;

/** Operation to describe a SHOW JOBS statement. */
public class ShowJobsOperation implements ShowOperation {

@Override
public String asSummaryString() {
return "SHOW JOBS";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.flink.sql.parser.dql.SqlShowDatabases;
import org.apache.flink.sql.parser.dql.SqlShowFunctions;
import org.apache.flink.sql.parser.dql.SqlShowJars;
import org.apache.flink.sql.parser.dql.SqlShowJobs;
import org.apache.flink.sql.parser.dql.SqlShowModules;
import org.apache.flink.sql.parser.dql.SqlShowPartitions;
import org.apache.flink.sql.parser.dql.SqlShowTables;
Expand Down Expand Up @@ -159,6 +160,7 @@
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
import org.apache.flink.table.operations.command.ShowJobsOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
Expand Down Expand Up @@ -358,6 +360,8 @@ private static Optional<Operation> convertValidatedSqlNode(
return Optional.of(converter.convertRemoveJar((SqlRemoveJar) validated));
} else if (validated instanceof SqlShowJars) {
return Optional.of(converter.convertShowJars((SqlShowJars) validated));
} else if (validated instanceof SqlShowJobs) {
return Optional.of(converter.convertShowJobs((SqlShowJobs) validated));
} else if (validated instanceof RichSqlInsert) {
return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
} else if (validated instanceof SqlBeginStatementSet) {
Expand Down Expand Up @@ -1472,6 +1476,10 @@ private ValueLiteralExpression getPartitionValueExpr(
return new ValueLiteralExpression(value, dataType.notNull());
}

private Operation convertShowJobs(SqlShowJobs sqlStopJob) {
return new ShowJobsOperation();
}

private Operation convertStopJob(SqlStopJob sqlStopJob) {
return new StopJobOperation(
sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowViews]
|| sqlNode.isInstanceOf[SqlShowColumns]
|| sqlNode.isInstanceOf[SqlShowPartitions]
|| sqlNode.isInstanceOf[SqlShowJobs]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]
|| sqlNode.isInstanceOf[SqlUnloadModule]
|| sqlNode.isInstanceOf[SqlUseModules]
Expand Down

0 comments on commit f177089

Please sign in to comment.