Skip to content

Commit

Permalink
ExecuteProcessEngine.finish (apache#9706)
Browse files Browse the repository at this point in the history
* ExecuteProcessEngine.finish

* fix test

* rename parameters
  • Loading branch information
tristaZero authored Mar 17, 2021
1 parent 881d0e4 commit 42528f7
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;

/**
Expand All @@ -29,12 +29,12 @@
public final class GovernanceExecuteProcessReporter implements ExecuteProcessReporter {

@Override
public void report(final SQLStatementContext<?> context, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessStatus status) {
public void report(final SQLStatementContext<?> context, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessConstants constants) {
// TODO :Call API of configCenter
}

@Override
public void report(final String executionID, final SQLExecutionUnit executionUnit, final ExecuteProcessStatus status) {
public void report(final String executionID, final SQLExecutionUnit executionUnit, final ExecuteProcessConstants constants) {
// TODO :Call API of configCenter
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;

import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -81,6 +84,7 @@ private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTru
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread, dataMap);
T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
finishReport(dataMap, jdbcExecutionUnit);
return result;
} catch (final SQLException ex) {
if (!isTrunkThread) {
Expand All @@ -106,6 +110,12 @@ private DataSourceMetaData getDataSourceMetaData(final DatabaseMetaData metaData
return result;
}

private void finishReport(final Map<String, Object> dataMap, final SQLExecutionUnit executionUnit) {
if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
ExecuteProcessEngine.finish(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(), executionUnit);
}
}

protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode) throws SQLException;

protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;

Expand All @@ -49,19 +50,20 @@ public final class ExecuteProcessEngine {
*/
public static void initialize(final SQLStatementContext<?> context, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
if (!HANDLERS.isEmpty() && ExecuteProcessStrategyEvaluator.evaluate(context, executionGroupContext)) {
HANDLERS.iterator().next().report(context, executionGroupContext, ExecuteProcessStatus.DOING);
ExecutorDataMap.getValue().put(ExecuteProcessConstants.EXECUTE_ID.name(), executionGroupContext.getExecutionID());
HANDLERS.iterator().next().report(context, executionGroupContext, ExecuteProcessConstants.EXECUTE_STATUS_START);
}
}

/**
* Complete.
* Finish.
*
* @param executionID execution ID
* @param executionUnit execution unit
*/
public static void complete(final String executionID, final SQLExecutionUnit executionUnit) {
public static void finish(final String executionID, final SQLExecutionUnit executionUnit) {
if (!HANDLERS.isEmpty()) {
HANDLERS.iterator().next().report(executionID, executionUnit, ExecuteProcessStatus.DONE);
HANDLERS.iterator().next().report(executionID, executionUnit, ExecuteProcessConstants.EXECUTE_STATUS_DONE);
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.shardingsphere.infra.executor.sql.process.model;

/**
* Execute process status.
* Execute process constants.
*/
public enum ExecuteProcessStatus {
public enum ExecuteProcessConstants {

DOING, DONE
EXECUTE_ID, EXECUTE_STATUS_START, EXECUTE_STATUS_DONE
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public final class ExecuteProcessUnit {

private final String unitID;

private final ExecuteProcessStatus status;
private final ExecuteProcessConstants status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;

/**
* Execute process report.
Expand All @@ -31,15 +31,15 @@ public interface ExecuteProcessReporter {
* Report the summary of this task.
* @param context context
* @param executionGroupContext execution group context
* @param status status
* @param constants constants
*/
void report(SQLStatementContext<?> context, ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, ExecuteProcessStatus status);
void report(SQLStatementContext<?> context, ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, ExecuteProcessConstants constants);

/**
* Report a unit of this task.
* @param executionID execution ID
* @param executionUnit execution unit
* @param status status
* @param constants constants
*/
void report(String executionID, SQLExecutionUnit executionUnit, ExecuteProcessStatus status);
void report(String executionID, SQLExecutionUnit executionUnit, ExecuteProcessConstants constants);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ protected Optional<Integer> getSaneResult(final SQLStatement sqlStatement) {
field.setAccessible(true);
Map<String, DataSourceMetaData> cachedDataSourceMetaData = (Map<String, DataSourceMetaData>) field.get(jdbcExecutorCallback);
assertThat(cachedDataSourceMetaData.size(), is(0));
jdbcExecutorCallback.execute(units, true, null);
jdbcExecutorCallback.execute(units, true, Collections.emptyMap());
assertThat(cachedDataSourceMetaData.size(), is(1));
jdbcExecutorCallback.execute(units, true, null);
jdbcExecutorCallback.execute(units, true, Collections.emptyMap());
assertThat(cachedDataSourceMetaData.size(), is(1));
}
}

0 comments on commit 42528f7

Please sign in to comment.