Skip to content

Commit

Permalink
[ROCKETMQ-107] Fix possible concurrency problem on ServiceState when …
Browse files Browse the repository at this point in the history
…consumer start/shutdown, closes apache#68
  • Loading branch information
Jaskey authored and dongeforever committed Jun 6, 2017
1 parent 15c2b55 commit 3c6260a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final RPCHook rpcHook;
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
private ServiceState serviceState = ServiceState.CREATE_JUST;
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
Expand Down Expand Up @@ -161,7 +161,8 @@ public PullResult pull(MessageQueue mq, String subExpression, long offset, int m
return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
}

private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();

Expand Down Expand Up @@ -365,7 +366,8 @@ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums
pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}

public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
}
Expand Down Expand Up @@ -449,7 +451,8 @@ public DefaultMQPullConsumer getDefaultMQPullConsumer() {
return defaultMQPullConsumer;
}

public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
Expand Down Expand Up @@ -510,7 +513,7 @@ public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerN
}
}

public void shutdown() {
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
Expand All @@ -528,7 +531,7 @@ public void shutdown() {
}
}

public void start() throws MQClientException {
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
Expand Down Expand Up @@ -593,6 +596,7 @@ public void start() throws MQClientException {
default:
break;
}

}

private void checkConfig() throws MQClientException {
Expand Down Expand Up @@ -662,7 +666,8 @@ public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientExc
this.offsetStore.updateOffset(mq, offset, false);
}

public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
}
Expand Down Expand Up @@ -692,6 +697,8 @@ public ServiceState getServiceState() {
return serviceState;
}

//Don't use this deprecated setter, which will be removed soon.
@Deprecated
public void setServiceState(ServiceState serviceState) {
this.serviceState = serviceState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final RPCHook rpcHook;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private volatile boolean pause = false;
Expand Down Expand Up @@ -515,7 +515,7 @@ private int getMaxReconsumeTimes() {
}
}

public void shutdown() {
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
Expand All @@ -535,7 +535,7 @@ public void shutdown() {
}
}

public void start() throws MQClientException {
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
Expand Down Expand Up @@ -615,9 +615,7 @@ public void start() throws MQClientException {
}

this.updateTopicSubscribeInfoWhenSubscriptionChanged();

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

this.mQClientFactory.rebalanceImmediately();
}

Expand Down Expand Up @@ -855,7 +853,8 @@ public void updateCorePoolSize(int corePoolSize) {
this.consumeMessageService.updateCorePoolSize(corePoolSize);
}

public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public MessageExt viewMessage(String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
}

Expand Down Expand Up @@ -1014,7 +1013,9 @@ public ServiceState getServiceState() {
return serviceState;
}

public void setServiceState(ServiceState serviceState) {
//Don't use this deprecated setter, which will be removed soon.
@Deprecated
public synchronized void setServiceState(ServiceState serviceState) {
this.serviceState = serviceState;
}

Expand Down

0 comments on commit 3c6260a

Please sign in to comment.