Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouxinyu committed Aug 29, 2017
2 parents a8333a7 + 08a0c40 commit 1b853e8
Show file tree
Hide file tree
Showing 259 changed files with 2,393 additions and 2,819 deletions.
13 changes: 0 additions & 13 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,13 @@ matrix:
# On Linux, run with specific JDKs only.
- os: linux
env: CUSTOM_JDK="oraclejdk8"
- os: linux
env: CUSTOM_JDK="oraclejdk7"
- os: linux
env: CUSTOM_JDK="openjdk7"

before_install:
- echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
- cat ~/.mavenrc
- if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
- if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi

#os:
# - linux
# - osx
#jdk:
# - oraclejdk8
# - oraclejdk7
# - openjdk7


script:
- travis_retry mvn -B clean apache-rat:check
- travis_retry mvn -B package jacoco:report coveralls:report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ public class BrokerController {
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;

public BrokerController(//
final BrokerConfig brokerConfig, //
final NettyServerConfig nettyServerConfig, //
final NettyClientConfig nettyClientConfig, //
final MessageStoreConfig messageStoreConfig //
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
Expand Down Expand Up @@ -255,7 +255,6 @@ public boolean initialize() throws CloneNotSupportedException {

this.registerProcessor();

// TODO remove in future
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.File;


public class BrokerPathConfigHelper {
private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "config" + File.separator + "broker.properties";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ public static BrokerController createBrokerController(String[] args) {
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);

final BrokerController controller = new BrokerController(//
brokerConfig, //
nettyServerConfig, //
nettyClientConfig, //
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
return r1 || r2;
}

public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) {
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public boolean tryLock(final String group, final MessageQueue mq, final String c
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
group, //
clientId, //
log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}

Expand All @@ -69,19 +69,19 @@ public boolean tryLock(final String group, final MessageQueue mq, final String c
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return true;
}

log.warn(
"tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return false;
} finally {
Expand Down Expand Up @@ -144,9 +144,9 @@ public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info(
"tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
group, //
clientId, //
"tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}

Expand All @@ -162,20 +162,20 @@ public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
lockedMqs.add(mq);
continue;
}

log.warn(
"tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
group, //
oldClientId, //
clientId, //
"tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ public long howLongAfterDeath() {

/**
* Check this filter data has been used to calculate bit map when msg was stored in server.
*
* @param msgStoreTime
* @return
*/
public boolean isMsgInLive(long msgStoreTime) {
return msgStoreTime > getBornTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,11 @@ public ConsumerFilterManager(BrokerController brokerController) {
/**
* Build consumer filter data.Be care, bloom filter data is not included.
*
* @param topic
* @param consumerGroup
* @param expression
* @param type
* @param clientVersion
* @return maybe null
*/
public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
final String expression, final String type,
final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return null;
}
Expand Down Expand Up @@ -140,7 +135,7 @@ public void register(final String consumerGroup, final Collection<SubscriptionDa
}

public boolean register(final String topic, final String consumerGroup, final String expression,
final String type, final long clientVersion) {
final String type, final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return false;
}
Expand Down Expand Up @@ -357,7 +352,8 @@ public void unRegister(String consumerGroup) {
data.setDeadTime(now);
}

public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, long clientVersion) {
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,
long clientVersion) {
ConsumerFilterData old = this.groupFilterData.get(consumerGroup);

if (old == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.rocketmq.broker.filter;


import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageConst;
Expand All @@ -32,7 +31,8 @@
* <br>It will decode properties first in order to get real topic.
*/
public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) {
public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
ConsumerFilterManager consumerFilterManager) {
super(subscriptionData, consumerFilterData, consumerFilterManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ExpressionMessageFilter implements MessageFilter {
protected final boolean bloomDataValid;

public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
ConsumerFilterManager consumerFilterManager) {
ConsumerFilterManager consumerFilterManager) {
this.subscriptionData = subscriptionData;
this.consumerFilterData = consumerFilterData;
this.consumerFilterManager = consumerFilterManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ public void registerFilterServer(final Channel channel, final String filterServe
}
}

/**
*/
public void scanNotActiveChannel() {

Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private void cleanExpiredRequest() {
}
}
}

public void shutdown() {
this.scheduledExecutorService.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,28 @@
import java.util.concurrent.TimeUnit;

public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory,
final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHol

@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void notifyMessageArriving(final String topic, final int queueId, final l
}

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public Set<String> whichGroupByTopic(final String topic) {
return groups;
}

public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ public void unregisterBroker(
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
public TopicConfigSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);

Expand All @@ -248,7 +249,8 @@ public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws R
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
Expand All @@ -264,7 +266,8 @@ public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) th
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
public String getAllDelayOffset(
final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
Expand All @@ -280,7 +283,8 @@ public String getAllDelayOffset(final String addr) throws InterruptedException,
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
private final ByteBuffer byteBufferHeader;
private final GetMessageResult getMessageResult;

/** Bytes which were transferred already. */
/**
* Bytes which were transferred already.
*/
private long transferred;

public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
private final ByteBuffer byteBufferHeader;
private final SelectMappedBufferResult selectMappedBufferResult;

/** Bytes which were transferred already. */
/**
* Bytes which were transferred already.
*/
private long transferred;

public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
private final ByteBuffer byteBufferHeader;
private final QueryMessageResult queryMessageResult;

/** Bytes which were transferred already. */
/**
* Bytes which were transferred already.
*/
private long transferred;

public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
Expand Down
Loading

0 comments on commit 1b853e8

Please sign in to comment.