Skip to content

Commit

Permalink
[ISSUE apache#3579]:Fix spelling mistake in getter/setter method of …
Browse files Browse the repository at this point in the history
…mQClientFactory SSUE

Co-authored-by: wangfan <[email protected]>
  • Loading branch information
ferrirW and wangfan authored Dec 7, 2021
1 parent 4d49551 commit 5694db1
Show file tree
Hide file tree
Showing 18 changed files with 35 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public boolean sendMessageBack(final MessageExt msg) {
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getMqClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private void initRebalanceImpl() {
this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.rebalanceImpl.setMqClientFactory(this.mQClientFactory);
}

private void initPullAPIWrapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ public synchronized void start() throws MQClientException {
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.rebalanceImpl.setMqClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ public synchronized void start() throws MQClientException {
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.rebalanceImpl.setMqClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
Expand Down Expand Up @@ -1089,11 +1089,11 @@ public ConsumerRunningInfo consumerRunningInfo() {
return info;
}

public MQClientInstance getmQClientFactory() {
public MQClientInstance getMqClientFactory() {
return mQClientFactory;
}

public void setmQClientFactory(MQClientInstance mQClientFactory) {
public void setMqClientFactory(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,11 @@ public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocat
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}

public MQClientInstance getmQClientFactory() {
public MQClientInstance getMqClientFactory() {
return mQClientFactory;
}

public void setmQClientFactory(MQClientInstance mQClientFactory) {
public void setMqClientFactory(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<Messa
}

// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
this.getMqClientFactory().sendHeartbeatToAllBrokerWithLock();
}

@Override
Expand Down Expand Up @@ -114,7 +114,7 @@ private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {

if (pq.hasTempMessage()) {
log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
this.defaultMQPushConsumerImpl.getMqClientFactory().getScheduledExecutorService().schedule(new Runnable() {
@Override
public void run() {
log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public void updateFaultItem(final String brokerName, final long currentLatency,
}

private void validateNameServerSetting() throws MQClientException {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
List<String> nsList = this.getMqClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
Expand Down Expand Up @@ -896,7 +896,7 @@ private SendResult sendKernelImpl(final Message msg,
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

public MQClientInstance getmQClientFactory() {
public MQClientInstance getMqClientFactory() {
return mQClientFactory;
}

Expand Down Expand Up @@ -1568,16 +1568,16 @@ private void requestFail(final String correlationId) {

private void prepareSendRequest(final Message msg, long timeout) {
String correlationId = CorrelationIdUtil.createCorrelationId();
String requestClientId = this.getmQClientFactory().getClientId();
String requestClientId = this.getMqClientFactory().getClientId();
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));

boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
boolean hasRouteData = this.getMqClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
if (!hasRouteData) {
long beginTimestamp = System.currentTimeMillis();
this.tryToFindTopicPublishInfo(msg.getTopic());
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
this.getMqClientFactory().sendHeartbeatToAllBrokerWithLock();
long cost = System.currentTimeMillis() - beginTimestamp;
if (cost > 500) {
log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer,
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
producer.getMqClientFactory().updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void endTransaction(EndTransactionContext context) {
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setMsgType(MessageType.Trans_msg_Commit);
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getMqClientFactory().getClientId());
traceBean.setMsgId(context.getMsgId());
traceBean.setTransactionState(context.getTransactionState());
traceBean.setTransactionId(context.getTransactionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
pushConsumer.subscribe(topic, "*");
pushConsumer.start();

mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
mQClientFactory = spy(pushConsumerImpl.getMqClientFactory());
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
Expand All @@ -116,7 +116,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);

pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setMqClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);

when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,14 @@
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
Expand Down Expand Up @@ -105,7 +102,7 @@ public void init() throws Exception {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);

producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());

when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
Expand Down Expand Up @@ -361,7 +358,7 @@ public void testSetCallbackExecutor() throws MQClientException {
producer.setCallbackExecutor(customized);

NettyRemotingClient remotingClient = (NettyRemotingClient) producer.getDefaultMQProducerImpl()
.getmQClientFactory().getMQClientAPIImpl().getRemotingClient();
.getMqClientFactory().getMQClientAPIImpl().getRemotingClient();

assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

pushConsumer.start();

mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory());
mQClientFactory = spy(pushConsumerImpl.getMqClientFactory());
mQClientTraceFactory = spy(pushConsumerImpl.getMqClientFactory());

field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
Expand All @@ -178,7 +178,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);

pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setMqClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);

when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
Expand Down Expand Up @@ -214,7 +214,7 @@ public void terminate() {

@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());

final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicReference<MessageExt> messageAtomic = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
Expand Down Expand Up @@ -219,7 +218,7 @@ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsume
field.setAccessible(true);
field.set(litePullConsumerImpl, offsetStore);

traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());

when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void init() throws Exception {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);

producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());

when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
Expand All @@ -112,7 +112,7 @@ public void init() throws Exception {

@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.send(message);
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void init() throws Exception {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);

producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());

when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
Expand All @@ -121,7 +121,7 @@ public void init() throws Exception {

@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);

producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());

when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
Expand All @@ -130,7 +130,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {

@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.sendMessageInTransaction(message, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);

producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());

Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList");
fieldHooks.setAccessible(true);
Expand All @@ -141,7 +141,7 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {

@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
AtomicReference<EndTransactionContext> context = new AtomicReference<>();
doAnswer(mock -> {
Expand Down
Loading

0 comments on commit 5694db1

Please sign in to comment.