Skip to content

Commit

Permalink
[ROCKETMQ-51] Add unit tests for ClientManageProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouxinyu committed Jan 21, 2017
1 parent 6729967 commit b29c318
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ClientManageProcessorTest {
private ClientManageProcessor clientManageProcessor;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
private Channel channel;

private ClientChannelInfo clientChannelInfo;
private String clientId = UUID.randomUUID().toString();
private String group = "FooBarGroup";
private String topic = "FooBar";

@Before
public void init() {
when(handlerContext.channel()).thenReturn(channel);
clientManageProcessor = new ClientManageProcessor(brokerController);
clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
brokerController.getProducerManager().registerProducer(group, clientChannelInfo);

ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
consumerData.getConsumeType(),
consumerData.getMessageModel(),
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
false);
}

@Test
public void processRequest_UnRegisterProducer() throws Exception {
brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
HashMap<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo);

RemotingCommand request = createUnRegisterProducerCommand();
RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
assertThat(channelMap).isNull();
}

@Test
public void processRequest_UnRegisterConsumer() throws RemotingCommandException {
ConsumerGroupInfo consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group);
assertThat(consumerGroupInfo).isNotNull();

RemotingCommand request = createUnRegisterConsumerCommand();
RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group);
assertThat(consumerGroupInfo).isNull();
}

private RemotingCommand createUnRegisterProducerCommand() {
UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
requestHeader.setClientID(clientId);
requestHeader.setProducerGroup(group);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA);
request.setVersion(100);
request.makeCustomHeaderToNet();
return request;
}

private RemotingCommand createUnRegisterConsumerCommand() {
UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
requestHeader.setClientID(clientId);
requestHeader.setConsumerGroup(group);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA);
request.setVersion(100);
request.makeCustomHeaderToNet();
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
Expand Down Expand Up @@ -78,7 +82,7 @@ public void init() {
when(handlerContext.channel()).thenReturn(mockChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
clientChannelInfo = new ClientChannelInfo(mockChannel);
ConsumerData consumerData = createConsumerData();
ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
Expand Down Expand Up @@ -130,6 +134,36 @@ public void testProcessRequest_Found() throws RemotingCommandException {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testProcessRequest_FoundWithHook() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1];
ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() {
@Override public String hookName() {
return "TestHook";
}

@Override public void consumeMessageBefore(ConsumeMessageContext context) {
messageContext[0] = context;
}

@Override public void consumeMessageAfter(ConsumeMessageContext context) {
}
};
consumeMessageHookList.add(consumeMessageHook);
pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(messageContext[0]).isNotNull();
assertThat(messageContext[0].getConsumerGroup()).isEqualTo(group);
assertThat(messageContext[0].getTopic()).isEqualTo(topic);
assertThat(messageContext[0].getQueueId()).isEqualTo(1);
}

@Test
public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
Expand Down Expand Up @@ -170,7 +204,7 @@ private RemotingCommand createPullMsgCommand(int requestCode) {
return request;
}

private ConsumerData createConsumerData() {
static ConsumerData createConsumerData(String group, String topic) {
ConsumerData consumerData = new ConsumerData();
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down Expand Up @@ -64,6 +68,9 @@ public class SendMessageProcessorTest {
@Mock
private MessageStore messageStore;

private String topic = "FooBar";
private String group = "FooBarGroup";

@Before
public void init() {
brokerController.setMessageStore(messageStore);
Expand All @@ -72,6 +79,7 @@ public void init() {
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
sendMessageProcessor = new SendMessageProcessor(brokerController);
}

@Test
Expand All @@ -80,6 +88,33 @@ public void testProcessRequest() throws RemotingCommandException {
assertPutResult(ResponseCode.SUCCESS);
}

@Test
public void testProcessRequest_WithHook() throws RemotingCommandException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
List<SendMessageHook> sendMessageHookList = new ArrayList<>();
final SendMessageContext[] sendMessageContext = new SendMessageContext[1];
SendMessageHook sendMessageHook = new SendMessageHook() {
@Override public String hookName() {
return null;
}

@Override public void sendMessageBefore(SendMessageContext context) {
sendMessageContext[0] = context;
}

@Override public void sendMessageAfter(SendMessageContext context) {

}
};
sendMessageHookList.add(sendMessageHook);
sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
assertPutResult(ResponseCode.SUCCESS);
System.out.println(sendMessageContext[0]);
assertThat(sendMessageContext[0]).isNotNull();
assertThat(sendMessageContext[0].getTopic()).isEqualTo(topic);
assertThat(sendMessageContext[0].getProducerGroup()).isEqualTo(group);
}


@Test
public void testProcessRequest_FlushTimeOut() throws RemotingCommandException {
Expand Down Expand Up @@ -142,8 +177,8 @@ public void testProcessRequest_WithMsgBack() throws RemotingCommandException {

private RemotingCommand createSendMsgCommand(int requestCode) {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup("FooBar_PID");
requestHeader.setTopic("FooBar");
requestHeader.setProducerGroup(group);
requestHeader.setTopic(topic);
requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
requestHeader.setDefaultTopicQueueNums(3);
requestHeader.setQueueId(1);
Expand All @@ -153,6 +188,7 @@ private RemotingCommand createSendMsgCommand(int requestCode) {
requestHeader.setReconsumeTimes(0);

RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.setBody(new byte[] {'a'});
request.makeCustomHeaderToNet();
return request;
}
Expand All @@ -162,7 +198,7 @@ private RemotingCommand createSendMsgBackCommand(int requestCode) {

requestHeader.setMaxReconsumeTimes(3);
requestHeader.setDelayLevel(4);
requestHeader.setGroup("FooBar_PID");
requestHeader.setGroup(group);
requestHeader.setOffset(123L);

RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
Expand All @@ -179,8 +215,6 @@ private void assertPutResult(int responseCode) throws RemotingCommandException {
return null;
}
}).when(handlerContext).writeAndFlush(any(Object.class));

sendMessageProcessor = new SendMessageProcessor(brokerController);
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
if (responseToReturn != null) {
assertThat(response[0]).isNull();
Expand Down

0 comments on commit b29c318

Please sign in to comment.