Skip to content

Commit

Permalink
[ROCKETMQ-324] Expose an interface for client to specify the async ca…
Browse files Browse the repository at this point in the history
…ll back executor
  • Loading branch information
zhouxinyu committed Dec 6, 2017
1 parent 8c30310 commit 37cf2a7
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,10 @@ public void endTransaction(
this.defaultMQProducer.getSendMsgTimeout());
}

public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}

public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
Expand All @@ -34,6 +35,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;

/**
* This class is the entry point for applications intending to send messages.
Expand Down Expand Up @@ -630,6 +632,16 @@ public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}

/**
* Sets an Executor to be used for executing callback methods.
* If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used.
*
* @param callbackExecutor the instance of Executor
*/
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
}

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
Expand All @@ -39,6 +41,7 @@
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -195,6 +198,22 @@ public void run() {
}
}

@Test
public void testSetCallbackExecutor() throws MQClientException {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new DefaultMQProducer(producerGroupTemp);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

ExecutorService customized = Executors.newCachedThreadPool();
producer.setCallbackExecutor(customized);

NettyRemotingClient remotingClient = (NettyRemotingClient) producer.getDefaultMQProducerImpl()
.getmQClientFactory().getMQClientAPIImpl().getRemotingClient();

assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}

public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,7 @@ void invokeOneway(final String addr, final RemotingCommand request, final long t
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);

void setCallbackExecutor(final ExecutorService callbackExecutor);

boolean isChannelWritable(final String addr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Lock lockNamesrvChannel = new ReentrantLock();

private final ExecutorService publicExecutor;

/**
* Invoke the callback methods in this executor when process response.
*/
private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
Expand Down Expand Up @@ -582,7 +587,12 @@ public RPCHook getRPCHook() {

@Override
public ExecutorService getCallbackExecutor() {
return this.publicExecutor;
return callbackExecutor != null ? callbackExecutor : publicExecutor;
}

@Override
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

static class ChannelWrapper {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;

import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(MockitoJUnitRunner.class)
public class NettyRemotingClientTest {
private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig());

@Test
public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor");
field.setAccessible(true);
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient));

ExecutorService customized = Executors.newCachedThreadPool();
remotingClient.setCallbackExecutor(customized);

assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
}

0 comments on commit 37cf2a7

Please sign in to comment.