Skip to content

Commit

Permalink
[ISSUE apache#4832] Remove innerProducer and innerConsumer in EscapeB…
Browse files Browse the repository at this point in the history
…ridge (apache#4834)

* remove innerProducer and innerConsumer in Escape
 apache#4832

* format code to pass code-style check

* Move topic route management logic to TopicRouteInfoManager

* Use ThreadPoolExecutor instead of creating a new ForkJoinPool

* remove retry logic

* remove unnecessary name server list

* remove unnecessary invokeId

* remove AssignmentManager and make maintenance of topic subscribe data lazy-initialized

* simply code

* remove unnecessary code
  • Loading branch information
caigy authored Aug 24, 2022
1 parent cf3c1ef commit 10d2918
Show file tree
Hide file tree
Showing 10 changed files with 597 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
Expand Down Expand Up @@ -95,6 +94,7 @@
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
Expand Down Expand Up @@ -171,7 +171,6 @@ public class BrokerController {
protected final ConsumerOrderInfoManager consumerOrderInfoManager;
protected final ProducerManager producerManager;
protected final ScheduleMessageService scheduleMessageService;
protected final AssignmentManager assignmentManager;
protected final ClientHousekeepingService clientHousekeepingService;
protected final PullMessageProcessor pullMessageProcessor;
protected final PeekMessageProcessor peekMessageProcessor;
Expand All @@ -191,6 +190,7 @@ public class BrokerController {
protected final ConsumerIdsChangeListener consumerIdsChangeListener;
protected final EndTransactionProcessor endTransactionProcessor;
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
private final TopicRouteInfoManager topicRouteInfoManager;
protected BrokerOuterAPI brokerOuterAPI;
protected ScheduledExecutorService scheduledExecutorService;
protected ScheduledExecutorService syncBrokerMemberGroupExecutorService;
Expand Down Expand Up @@ -322,7 +322,6 @@ public BrokerController(

this.filterServerManager = new FilterServerManager(this);

this.assignmentManager = new AssignmentManager(this);
this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
this.clientManageProcessor = new ClientManageProcessor(this);
this.slaveSynchronize = new SlaveSynchronize(this);
Expand Down Expand Up @@ -386,6 +385,8 @@ public boolean online(String instanceId, String group, String topic) {

this.escapeBridge = new EscapeBridge(this);

this.topicRouteInfoManager = new TopicRouteInfoManager(this);

if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
}
Expand Down Expand Up @@ -1238,10 +1239,6 @@ protected void shutdownBasicService() {
this.ackMessageProcessor.shutdownPopReviveService();
}

if (this.assignmentManager != null) {
this.assignmentManager.shutdown();
}

if (this.notificationProcessor != null) {
this.notificationProcessor.shutdown();
}
Expand Down Expand Up @@ -1353,6 +1350,10 @@ protected void shutdownBasicService() {
escapeBridge.shutdown();
}

if (this.topicRouteInfoManager != null) {
this.topicRouteInfoManager.shutdown();
}

if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
this.brokerPreOnlineService.shutdown();
}
Expand Down Expand Up @@ -1448,10 +1449,6 @@ protected void startBasicService() throws Exception {
this.ackMessageProcessor.startPopReviveService();
}

if (this.assignmentManager != null) {
this.assignmentManager.start();
}

if (this.topicQueueMappingCleanService != null) {
this.topicQueueMappingCleanService.start();
}
Expand Down Expand Up @@ -1484,6 +1481,10 @@ protected void startBasicService() throws Exception {
this.escapeBridge.start();
}

if (this.topicRouteInfoManager != null) {
this.topicRouteInfoManager.start();
}

if (this.brokerPreOnlineService != null) {
this.brokerPreOnlineService.start();
}
Expand Down Expand Up @@ -1779,16 +1780,6 @@ private boolean needRegister(final String clusterName,
return needRegister;
}

public String getNameServerList() {
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
return this.brokerConfig.getNamesrvAddr();
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
return this.brokerOuterAPI.fetchNameServerAddr();
}
return null;
}

public void startService(long minBrokerId, String minBrokerAddr) {
BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
Expand Down Expand Up @@ -2153,14 +2144,6 @@ public ExecutorService getSendMessageExecutor() {
return sendMessageExecutor;
}

public AssignmentManager getAssignmentManager() {
return assignmentManager;
}

public ClientManageProcessor getClientManageProcessor() {
return clientManageProcessor;
}

public SendMessageProcessor getSendMessageProcessor() {
return sendMessageProcessor;
}
Expand Down Expand Up @@ -2253,4 +2236,8 @@ public TimerCheckpoint getTimerCheckpoint() {
return timerCheckpoint;
}

public TopicRouteInfoManager getTopicRouteInfoManager() {
return this.topicRouteInfoManager;
}

}
Loading

0 comments on commit 10d2918

Please sign in to comment.