Skip to content

Commit

Permalink
[FLINK-35201][table] Support the execution of drop materialized table…
Browse files Browse the repository at this point in the history
… in full refresh mode
  • Loading branch information
hackergin authored and lsyldliu committed Jun 6, 2024
1 parent 9708f9f commit 42289bd
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow;
import org.apache.flink.table.workflow.CreateRefreshWorkflow;
import org.apache.flink.table.workflow.DeleteRefreshWorkflow;
import org.apache.flink.table.workflow.ModifyRefreshWorkflow;
import org.apache.flink.table.workflow.ResumeRefreshWorkflow;
import org.apache.flink.table.workflow.SuspendRefreshWorkflow;
Expand Down Expand Up @@ -779,38 +780,16 @@ private ResultFetcher callDropMaterializedTableOperation(

CatalogMaterializedTable materializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
== materializedTable.getRefreshStatus()) {
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
// get job running status
JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler);
if (!jobStatus.isTerminalState()) {
try {
cancelJob(operationExecutor, handle, refreshHandler.getJobId());
} catch (Exception e) {
jobStatus = getJobStatus(operationExecutor, handle, refreshHandler);
if (!jobStatus.isTerminalState()) {
throw new SqlExecutionException(
String.format(
"Failed to drop the materialized table %s because the continuous refresh job %s could not be canceled."
+ " The current status of the continuous refresh job is %s.",
tableIdentifier, refreshHandler.getJobId(), jobStatus),
e);
} else {
LOG.warn(
"An exception occurred while canceling the continuous refresh job {} for materialized table {},"
+ " but since the job is in a terminal state, skip the cancel operation.",
refreshHandler.getJobId(),
tableIdentifier);
}
}
} else {
LOG.info(
"No need to cancel the continuous refresh job {} for materialized table {} as it is not currently running.",
refreshHandler.getJobId(),
tableIdentifier);
CatalogMaterializedTable.RefreshMode refreshMode = materializedTable.getRefreshMode();
CatalogMaterializedTable.RefreshStatus refreshStatus = materializedTable.getRefreshStatus();
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus
|| CatalogMaterializedTable.RefreshStatus.SUSPENDED == refreshStatus) {
if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) {
deleteRefreshWorkflow(tableIdentifier, materializedTable);
} else if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == refreshMode
&& CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus) {
cancelContinuousRefreshJob(
operationExecutor, handle, tableIdentifier, materializedTable);
}
} else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
== materializedTable.getRefreshStatus()) {
Expand All @@ -825,6 +804,71 @@ private ResultFetcher callDropMaterializedTableOperation(
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private void cancelContinuousRefreshJob(
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
CatalogMaterializedTable materializedTable) {
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
// get job running status
JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler);
if (!jobStatus.isTerminalState()) {
try {
cancelJob(operationExecutor, handle, refreshHandler.getJobId());
} catch (Exception e) {
jobStatus = getJobStatus(operationExecutor, handle, refreshHandler);
if (!jobStatus.isTerminalState()) {
throw new SqlExecutionException(
String.format(
"Failed to drop the materialized table %s because the continuous refresh job %s could not be canceled."
+ " The current status of the continuous refresh job is %s.",
tableIdentifier, refreshHandler.getJobId(), jobStatus),
e);
} else {
LOG.warn(
"An exception occurred while canceling the continuous refresh job {} for materialized table {},"
+ " but since the job is in a terminal state, skip the cancel operation.",
refreshHandler.getJobId(),
tableIdentifier);
}
}
} else {
LOG.info(
"No need to cancel the continuous refresh job {} for materialized table {} as it is not currently running.",
refreshHandler.getJobId(),
tableIdentifier);
}
}

private void deleteRefreshWorkflow(
ObjectIdentifier tableIdentifier, CatalogMaterializedTable catalogMaterializedTable) {
if (workflowScheduler == null) {
throw new SqlExecutionException(
"The workflow scheduler must be configured when dropping materialized table in full refresh mode.");
}
try {
RefreshHandlerSerializer<?> refreshHandlerSerializer =
workflowScheduler.getRefreshHandlerSerializer();
RefreshHandler refreshHandler =
refreshHandlerSerializer.deserialize(
catalogMaterializedTable.getSerializedRefreshHandler(),
userCodeClassLoader);
DeleteRefreshWorkflow deleteRefreshWorkflow = new DeleteRefreshWorkflow(refreshHandler);
workflowScheduler.deleteRefreshWorkflow(deleteRefreshWorkflow);
} catch (Exception e) {
LOG.error(
"Failed to delete the refresh workflow for materialized table {}.",
tableIdentifier,
e);
throw new SqlExecutionException(
String.format(
"Failed to delete the refresh workflow for materialized table %s.",
tableIdentifier),
e);
}
}

/**
* Retrieves the session configuration for initializing the periodic refresh job. The function
* filters out default context configurations and removes unnecessary configurations such as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,6 @@ public void deleteScheduleWorkflow(String workflowName, String workflowGroup)
JobKey jobKey = JobKey.jobKey(workflowName, workflowGroup);
lock.writeLock().lock();
try {
String errorMsg =
String.format(
"Failed to delete a non-existent quartz schedule job: %s.", jobKey);
checkJobExists(jobKey, errorMsg);

quartzScheduler.deleteJob(jobKey);
} catch (org.quartz.SchedulerException e) {
LOG.error("Failed to delete quartz schedule job: {}.", jobKey, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.TableInfo;
Expand Down Expand Up @@ -196,20 +195,14 @@ void after() throws Exception {
ResolvedCatalogBaseTable<?> resolvedTable =
service.getTable(sessionHandle, tableInfo.getIdentifier());
if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == resolvedTable.getTableKind()) {
ResolvedCatalogMaterializedTable catalogMaterializedTable =
(ResolvedCatalogMaterializedTable) resolvedTable;
if (catalogMaterializedTable.getRefreshMode()
== ResolvedCatalogMaterializedTable.RefreshMode.CONTINUOUS) {
// drop materialized table (batch refresh job will be dropped automatically
String dropTableDDL =
String.format(
"DROP MATERIALIZED TABLE %s",
tableInfo.getIdentifier().asSerializableString());
OperationHandle dropTableHandle =
service.executeStatement(
sessionHandle, dropTableDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, dropTableHandle);
}
String dropTableDDL =
String.format(
"DROP MATERIALIZED TABLE %s",
tableInfo.getIdentifier().asSerializableString());
OperationHandle dropTableHandle =
service.executeStatement(
sessionHandle, dropTableDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, dropTableHandle);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ void testCreateMaterializedTableInFullMode() throws Exception {
.containsKey("table.catalog-store.file.path")
.doesNotContainKey(WORKFLOW_SCHEDULER_TYPE.key())
.doesNotContainKey(RESOURCES_DOWNLOAD_DIR.key());

// delete the workflow
embeddedWorkflowScheduler.deleteScheduleWorkflow(jobKey.getName(), jobKey.getGroup());
}

@Test
Expand Down Expand Up @@ -751,9 +748,6 @@ void testAlterMaterializedTableSuspendAndResumeInFullMode() throws Exception {
fromJson((String) jobDetail.getJobDataMap().get(WORKFLOW_INFO), WorkflowInfo.class);
assertThat(workflowInfo.getDynamicOptions())
.containsEntry("debezium-json.ignore-parse-errors", "true");

// delete the workflow
embeddedWorkflowScheduler.deleteScheduleWorkflow(jobKey.getName(), jobKey.getGroup());
}

@Test
Expand Down Expand Up @@ -891,6 +885,96 @@ void testDropMaterializedTable() throws Exception {
.asSerializableString()));
}

@Test
void testDropMaterializedTableInFullMode() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL);

JobKey jobKey =
JobKey.jobKey(
"quartz_job_"
+ ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")
.asSerializableString(),
"default_group");
EmbeddedQuartzScheduler embeddedWorkflowScheduler =
SQL_GATEWAY_REST_ENDPOINT_EXTENSION
.getSqlGatewayRestEndpoint()
.getQuartzScheduler();

// verify refresh workflow is created
assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();

// drop materialized table
String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
OperationHandle dropMaterializedTableHandle =
service.executeStatement(
sessionHandle, dropMaterializedTableDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, dropMaterializedTableHandle);

// verify materialized table metadata is removed
assertThatThrownBy(
() ->
service.getTable(
sessionHandle,
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")))
.isInstanceOf(SqlGatewayException.class)
.hasMessageContaining("Failed to getTable.");

// verify refresh workflow is removed
assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isFalse();
}

@Test
void testDropMaterializedTableWithDeletedRefreshWorkflowInFullMode() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL);

JobKey jobKey =
JobKey.jobKey(
"quartz_job_"
+ ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")
.asSerializableString(),
"default_group");
EmbeddedQuartzScheduler embeddedWorkflowScheduler =
SQL_GATEWAY_REST_ENDPOINT_EXTENSION
.getSqlGatewayRestEndpoint()
.getQuartzScheduler();

// verify refresh workflow is created
assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();

// delete the workflow
embeddedWorkflowScheduler.deleteScheduleWorkflow(jobKey.getName(), jobKey.getGroup());

// drop materialized table
String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
OperationHandle dropMaterializedTableHandle =
service.executeStatement(
sessionHandle, dropMaterializedTableDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, dropMaterializedTableHandle);

// verify materialized table metadata is removed
assertThatThrownBy(
() ->
service.getTable(
sessionHandle,
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")))
.isInstanceOf(SqlGatewayException.class)
.hasMessageContaining("Failed to getTable.");
}

@Test
void testRefreshMaterializedTableWithStaticPartition() throws Exception {
List<Row> data = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,7 @@ void testDeleteNonExistsWorkflow() throws Exception {
EmptyMessageParameters.getInstance(),
nonExistsWorkflow);

assertThatFuture(suspendFuture)
.failsWithin(5, TimeUnit.SECONDS)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(RestClientException.class)
.withMessageContaining(
"Failed to delete a non-existent quartz schedule job: default_group.non-exists.")
.satisfies(
e ->
assertThat(
((RestClientException) e.getCause())
.getHttpResponseStatus())
.isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR));
assertThatFuture(suspendFuture).succeedsWithin(5, TimeUnit.SECONDS);
}

@Test
Expand Down Expand Up @@ -304,7 +293,7 @@ void testModifyWorkflowWithUnsupportedTypeByWorkflowSchedulerInterface() {
}

@Test
void testNonExistsWorkflowByWorkflowSchedulerInterface() {
void testNonExistsWorkflowByWorkflowSchedulerInterface() throws WorkflowException {
// suspend case
assertThatThrownBy(
() ->
Expand All @@ -331,16 +320,8 @@ void testNonExistsWorkflowByWorkflowSchedulerInterface() {
+ "}.");

// delete case
assertThatThrownBy(
() ->
embeddedWorkflowScheduler.deleteRefreshWorkflow(
new DeleteRefreshWorkflow<>(nonExistsHandler)))
.isInstanceOf(WorkflowException.class)
.hasMessage(
"Failed to delete refresh workflow {\n"
+ " workflowName: non-exits,\n"
+ " workflowGroup: default_group\n"
+ "}.");
embeddedWorkflowScheduler.deleteRefreshWorkflow(
new DeleteRefreshWorkflow<>(nonExistsHandler));
}

/** Just used for test. */
Expand Down

0 comments on commit 42289bd

Please sign in to comment.