Skip to content

Commit

Permalink
add ShardingEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Aug 15, 2018
1 parent 1214cf9 commit 97568b5
Show file tree
Hide file tree
Showing 32 changed files with 209 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.routing.event;
package io.shardingsphere.core.event;

import com.google.common.base.Optional;
import lombok.AccessLevel;
Expand All @@ -24,40 +24,40 @@
import java.util.UUID;

/**
* SQL routing event.
* Sharding event.
*
* @author chenqingyang
* @author zhangliang
*/
@Getter
public abstract class AbstractRoutingEvent {
public class ShardingEvent {

private final String id = UUID.randomUUID().toString();

private EventRoutingType eventRoutingType = EventRoutingType.BEFORE_ROUTE;
private ShardingEventType eventType = ShardingEventType.BEFORE_EXECUTE;

@Getter(AccessLevel.NONE)
private Exception exception;

/**
* Set route success.
* Set execute success.
*/
public void setExecuteSuccess() {
eventRoutingType = EventRoutingType.ROUTE_SUCCESS;
eventType = ShardingEventType.EXECUTE_SUCCESS;
}

/**
* Set route failure.
*
* Set execute failure.
*
* @param cause fail cause
*/
public void setExecuteFailure(final Exception cause) {
eventRoutingType = EventRoutingType.ROUTE_FAILURE;
eventType = ShardingEventType.EXECUTE_FAILURE;
exception = cause;
}

/**
* Get exception.
*
*
* @return exception
*/
public final Optional<? extends Exception> getException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
import lombok.NoArgsConstructor;

/**
* Event bus for singleton instance.
* Sharding event bus for singleton instance.
*
* @author zhangliang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class EventBusInstance {
public final class ShardingEventBusInstance {

private static final EventBus INSTANCE = new EventBus();

/**
* Get event bus instance.
* Get sharding event bus instance.
*
* @return event bus instance
* @return sharding event bus instance
*/
public static EventBus getInstance() {
return INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,14 @@
* </p>
*/

package io.shardingsphere.core.merger.event;

import com.google.common.base.Optional;
package io.shardingsphere.core.event;

/**
* Result set merge event.
* Sharding event type.
*
* @author chenqingyang
* @author zhangliang
*/
public class ResultSetMergeEvent extends AbstractMergeEvent {

public enum ShardingEventType {

/**
* Get exception.
*
* @return exception
*/
public Optional<Exception> getException() {
Optional<? extends Exception> ex = super.getException();
if (ex.isPresent()) {
return Optional.of(ex.get());
}
return Optional.absent();
}
BEFORE_EXECUTE, EXECUTE_SUCCESS, EXECUTE_FAILURE
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,12 @@

package io.shardingsphere.core.merger.event;

import io.shardingsphere.core.event.ShardingEvent;

/**
* Event merge type.
*
* Merge event.
*
* @author chenqingyang
*/
public enum EventMergeType {

/**
* Before result set merge.
*/
BEFORE_MERGE,

/**
* result set merge success.
*/
MERGE_SUCCESS,

/**
* result set merge failure.
*/
MERGE_FAILURE
public final class MergeEvent extends ShardingEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package io.shardingsphere.core.routing.event;

import io.shardingsphere.core.event.ShardingEvent;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* SQL routing event.
* Routing event.
*
* @author chenqingyang
*/
@RequiredArgsConstructor
@Getter
public final class SQLRoutingEvent extends AbstractRoutingEvent {
public final class RoutingEvent extends ShardingEvent {

private final String sql;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.event.EventBusInstance;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.transaction.api.config.SoftTransactionConfiguration;
import io.shardingsphere.transaction.bed.BEDSoftTransaction;
import io.shardingsphere.transaction.bed.sync.BestEffortsDeliveryListener;
Expand Down Expand Up @@ -56,7 +56,7 @@ public final class SoftTransactionManager {
* @throws SQLException SQL exception
*/
public void init() throws SQLException {
EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
ShardingEventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
createTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void listen(final DMLExecutionEvent event) {
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
switch (event.getEventType()) {
case BEFORE_EXECUTE:
//TODO for batch SQL need split to 2-level records
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
Expand Down Expand Up @@ -98,7 +98,7 @@ public void listen(final DMLExecutionEvent event) {
}
return;
default:
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
throw new UnsupportedOperationException(event.getEventType().toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.shardingsphere.core.executor.event.OverallExecutionEvent;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.event.EventBusInstance;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -78,17 +78,17 @@ public <T> List<T> execute(
return Collections.emptyList();
}
OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size() > 1);
EventBusInstance.getInstance().post(event);
ShardingEventBusInstance.getInstance().post(event);
try {
List<T> result = getExecuteResults(sqlType, baseStatementUnits, executeCallback);
event.setExecuteSuccess();
EventBusInstance.getInstance().post(event);
ShardingEventBusInstance.getInstance().post(event);
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
event.setExecuteFailure(ex);
EventBusInstance.getInstance().post(event);
ShardingEventBusInstance.getInstance().post(event);
ExecutorExceptionHandler.handleException(ex);
return null;
}
Expand All @@ -106,21 +106,21 @@ protected final <T> T executeInternal(final SQLType sqlType, final BaseStatement
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
for (ExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
ShardingEventBusInstance.getInstance().post(event);
}
try {
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
for (ExecutionEvent each : events) {
each.setExecuteFailure(ex);
EventBusInstance.getInstance().post(each);
ShardingEventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
for (ExecutionEvent each : events) {
each.setExecuteSuccess();
EventBusInstance.getInstance().post(each);
ShardingEventBusInstance.getInstance().post(each);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,12 @@

package io.shardingsphere.core.executor.event;

import com.google.common.base.Optional;
import lombok.AccessLevel;
import lombok.Getter;

import java.util.UUID;
import io.shardingsphere.core.event.ShardingEvent;

/**
* Execution event.
*
* @author zhangliang
*/
@Getter
public abstract class ExecutionEvent {

private final String id = UUID.randomUUID().toString();

private EventExecutionType eventExecutionType = EventExecutionType.BEFORE_EXECUTE;

@Getter(AccessLevel.NONE)
private Exception exception;

/**
* Set execute success.
*/
public void setExecuteSuccess() {
eventExecutionType = EventExecutionType.EXECUTE_SUCCESS;
}

/**
* Set execute failure.
*
* @param cause fail cause
*/
public void setExecuteFailure(final Exception cause) {
eventExecutionType = EventExecutionType.EXECUTE_FAILURE;
exception = cause;
}

/**
* Get exception.
*
* @return exception
*/
public final Optional<? extends Exception> getException() {
return Optional.fromNullable(exception);
}
public class ExecutionEvent extends ShardingEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
*
* @author gaohongtao
*/
@Getter
@RequiredArgsConstructor
@Getter
public final class OverallExecutionEvent extends ExecutionEvent {

private final SQLType sqlType;
Expand Down
Loading

0 comments on commit 97568b5

Please sign in to comment.