Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
lollipopjin committed Dec 23, 2016
2 parents 0014a4c + 4b446d7 commit c92332e
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 200 deletions.
8 changes: 1 addition & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
Expand Down Expand Up @@ -348,12 +347,7 @@
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
</resource>
</resources>
<!-- We are not suppose to setup the customer resources here-->
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.rocketmq.broker;

import com.alibaba.rocketmq.common.BrokerConfig;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;

/**
* @author shtykh_roman
*/
public class BrokerControllerTest {
private static final int RESTART_NUM = 3;

/**
* Tests if the controller can be properly stopped and started.
*
* @throws Exception If fails.
*/
@Test
public void testRestart() throws Exception {

for (int i = 0; i < RESTART_NUM; i++) {
BrokerController brokerController = new BrokerController(//
new BrokerConfig(), //
new NettyServerConfig(), //
new NettyClientConfig(), //
new MessageStoreConfig());
boolean initResult = brokerController.initialize();
System.out.println("initialize " + initResult);
brokerController.start();

brokerController.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,19 @@

package com.alibaba.rocketmq.client;

import com.alibaba.rocketmq.client.exception.MQClientException;
import org.junit.Assert;
import org.junit.Test;


public class ValidatorsTest {

@Test
public void topicValidatorTest() {
try {
Validators.checkTopic("Hello");
Validators.checkTopic("%RETRY%Hello");
Validators.checkTopic("_%RETRY%Hello");
Validators.checkTopic("-%RETRY%Hello");
Validators.checkTopic("223-%RETRY%Hello");
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(false);
}
public void topicValidatorTest() throws MQClientException {
Validators.checkTopic("Hello");
Validators.checkTopic("%RETRY%Hello");
Validators.checkTopic("_%RETRY%Hello");
Validators.checkTopic("-%RETRY%Hello");
Validators.checkTopic("223-%RETRY%Hello");
}
}
18 changes: 14 additions & 4 deletions rocketmq-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,28 @@
<name>rocketmq-store ${project.version}</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public AtomicInteger getConnectionCount() {
// this.groupTransferService.notifyTransferSome();
// }

public void start() {
public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
Expand Down Expand Up @@ -181,20 +181,26 @@ public AcceptSocketService(final int port) {
}


public void beginAccept() {
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

@Override
public void shutdown(final boolean interrupt) {
super.shutdown(interrupt);
try {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
log.error("beginAccept exception", e);
serverSocketChannel.close();
}
catch (IOException e) {
log.error("AcceptSocketService shutdown exception", e);
}
}


@Override
public void run() {
log.info(this.getServiceName() + " service started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package com.alibaba.rocketmq.store;

import com.alibaba.rocketmq.common.BrokerConfig;
import com.alibaba.rocketmq.store.config.FlushDiskType;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand All @@ -35,6 +38,8 @@
* @author shijia.wxr
*/
public class DefaultMessageStoreTest {
private static final Logger logger = LoggerFactory.getLogger(DefaultMessageStoreTest.class);

private static final String StoreMessage = "Once, there was a chance for me!";

private static int QUEUE_TOTAL = 100;
Expand All @@ -59,7 +64,7 @@ public static void tearDownAfterClass() throws Exception {

@Test
public void test_write_read() throws Exception {
System.out.println("================================================================");
logger.debug("================================================================");
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
Expand All @@ -69,34 +74,32 @@ public void test_write_read() throws Exception {
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());

boolean load = master.load();
assertTrue(load);

master.start();
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
}

for (long i = 0; i < totalMsgs; i++) {
try {
try {
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
}

for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
System.out.println("result == null " + i);
logger.debug("result == null " + i);
}
assertTrue(result != null);
result.release();
System.out.println("read " + i + " OK");
} catch (Exception e) {
e.printStackTrace();
logger.debug("read " + i + " OK");
}

} finally {
master.shutdown();
master.destroy();
}
master.shutdown();
master.destroy();
System.out.println("================================================================");
logger.debug("================================================================");
}

public MessageExtBrokerInner buildMessage() {
Expand All @@ -116,39 +119,46 @@ public MessageExtBrokerInner buildMessage() {

@Test
public void test_group_commit() throws Exception {
System.out.println("================================================================");
logger.debug("================================================================");
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
assertTrue(load);

master.start();
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
}

for (long i = 0; i < totalMsgs; i++) {
try {
try {
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
}

for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
System.out.println("result == null " + i);
logger.debug("result == null " + i);
}
assertTrue(result != null);
result.release();
System.out.println("read " + i + " OK");
} catch (Exception e) {
e.printStackTrace();
logger.debug("read " + i + " OK");

}

} finally {
master.shutdown();
master.destroy();
}
logger.debug("================================================================");
}

private class MyMessageArrivingListener implements MessageArrivingListener {

@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
// Do nothing here
}
master.shutdown();
master.destroy();
System.out.println("================================================================");
}
}
Loading

0 comments on commit c92332e

Please sign in to comment.