Skip to content

Commit

Permalink
[ISSUE apache#7319] Optimize fault-tolerant mechanism for sending mes…
Browse files Browse the repository at this point in the history
…sages and hot update switch (apache#7320)
  • Loading branch information
GenerousMan authored Sep 7, 2023
1 parent 6280205 commit 6fd0073
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,7 @@ public void start(final boolean startFactory) throws MQClientException {
mQClientFactory.start();
}

if (this.mqFaultStrategy.isStartDetectorEnable()) {
this.mqFaultStrategy.startDetector();
}
this.mqFaultStrategy.startDetector();

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
Expand Down Expand Up @@ -311,9 +309,7 @@ public void shutdown(final boolean shutdownFactory) {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
if (this.mqFaultStrategy.isStartDetectorEnable()) {
this.mqFaultStrategy.shutdown();
}
this.mqFaultStrategy.shutdown();
RequestFutureHolder.getInstance().shutdown(this);
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,18 @@ void updateFaultItem(final T name, final long currentLatency, final long notAvai
* @param detectInterval each broker's detecting interval
*/
void setDetectInterval(final int detectInterval);

/**
* Use it to set the detector work or not.
*
* @param startDetectorEnable set the detector's work status
*/
void setStartDetectorEnable(final boolean startDetectorEnable);

/**
* Use it to judge if the detector enabled.
*
* @return is the detector should be started.
*/
boolean isStartDetectorEnable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String>
private int detectTimeout = 200;
private int detectInterval = 2000;
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();

private volatile boolean startDetectorEnable = false;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand Down Expand Up @@ -80,7 +82,9 @@ public void startDetector() {
@Override
public void run() {
try {
detectByOneRound();
if (startDetectorEnable) {
detectByOneRound();
}
} catch (Exception e) {
log.warn("Unexpected exception raised while detecting service reachability", e);
}
Expand Down Expand Up @@ -137,6 +141,13 @@ public void remove(final String name) {
this.faultItemTable.remove(name);
}

public boolean isStartDetectorEnable() {
return startDetectorEnable;
}

public void setStartDetectorEnable(boolean startDetectorEnable) {
this.startDetectorEnable = startDetectorEnable;
}
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

public class MQFaultStrategy {
private LatencyFaultTolerance<String> latencyFaultTolerance;
private boolean sendLatencyFaultEnable;
private boolean startDetectorEnable;
private volatile boolean sendLatencyFaultEnable;
private volatile boolean startDetectorEnable;
private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L};

Expand Down Expand Up @@ -64,11 +64,11 @@ public void setLastBrokerName(String lastBrokerName) {


public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) {
this.setStartDetectorEnable(cc.isStartDetectorEnable());
this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector);
this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
this.setStartDetectorEnable(cc.isStartDetectorEnable());
this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
}

// For unit test.
Expand Down Expand Up @@ -123,21 +123,15 @@ public boolean isStartDetectorEnable() {

public void setStartDetectorEnable(boolean startDetectorEnable) {
this.startDetectorEnable = startDetectorEnable;
this.latencyFaultTolerance.setStartDetectorEnable(startDetectorEnable);
}

public void startDetector() {
// user should start the detector
// and the thread should not be in running state.
if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
// start the detector.
this.latencyFaultTolerance.startDetector();
}
this.latencyFaultTolerance.startDetector();
}

public void shutdown() {
if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
this.latencyFaultTolerance.shutdown();
}
this.latencyFaultTolerance.shutdown();
}

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class MessageQueueView {
private final MessageQueueSelector readSelector;
private final MessageQueueSelector writeSelector;
private final TopicRouteWrapper topicRouteWrapper;
private MQFaultStrategy mqFaultStrategy;

public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) {
this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
Expand Down Expand Up @@ -67,12 +66,4 @@ public String toString() {
.add("topicRouteWrapper", topicRouteWrapper)
.toString();
}

public MQFaultStrategy getMQFaultStrategy() {
return mqFaultStrategy;
}

public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
this.mqFaultStrategy = mqFaultStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public boolean detect(String endpoint, long timeoutMillis) {
@Override
public String resolve(String name) {
try {
String brokerAddr = getBrokerAddr(null, name);
String brokerAddr = getBrokerAddr(ProxyContext.createForInner("MQFaultStrategy"), name);
return brokerAddr;
} catch (Exception e) {
return null;
Expand Down Expand Up @@ -175,9 +175,17 @@ public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig)

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation,
boolean reachable) {
checkSendFaultToleranceEnable();
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable);
}

public void checkSendFaultToleranceEnable() {
boolean hotLatencySwitch = ConfigurationManager.getProxyConfig().isSendLatencyEnable();
boolean hotDetectorSwitch = ConfigurationManager.getProxyConfig().isStartDetectorEnable();
this.mqFaultStrategy.setSendLatencyFaultEnable(hotLatencySwitch);
this.mqFaultStrategy.setStartDetectorEnable(hotDetectorSwitch);
}

public MQFaultStrategy getMqFaultStrategy() {
return this.mqFaultStrategy;
}
Expand Down

0 comments on commit 6fd0073

Please sign in to comment.