Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/qunarcorp/qmq
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyijq committed Feb 17, 2019
2 parents 432a0b5 + 5d82f7e commit 82048b8
Show file tree
Hide file tree
Showing 32 changed files with 332 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/cn/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ min.group.num=2
*valid-api-tokens.properties*
```
# metaserver的管理工具token列表,用于控制权限。下面的tools.sh工具使用时需要该token
# 每行一个token
# 每行一个token,等号左边是token,在命令中使用;等号右边是描述,只起提示作用,无实际用途
<token 1>=<token 1 desc>
<token 2>=<token 2 desc>
```
Expand Down
6 changes: 6 additions & 0 deletions docs/cn/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ $ tools.sh MarkReadonly --metaserver=<metaserver address> --token=<token> --brok
$ tools.sh UnMarkReadonly --metaserver=<metaserver address> --token=<token> --brokerGroup=<groupName>
```

## reset offset
调整指定消费组的消费进度

```
$ tools.sh ResetOffset --metaserver=<metaserver address> --token=<token> --subject=<subject> --group=<consumer group> --action=<
```

[上一页](debug.md)
[回目录](../../README.md)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
<packaging>pom</packaging>

<name>qmq</name>
Expand Down
2 changes: 1 addition & 1 deletion qmq-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-delay-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-delay-server</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion qmq-deploy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion qmq-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-dist</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-gateway</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-metaserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-metaserver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2018 Qunar, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package qunar.tc.qmq.meta.management;

import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.meta.BrokerGroup;
import qunar.tc.qmq.meta.model.SubjectRoute;
import qunar.tc.qmq.meta.store.Store;
import qunar.tc.qmq.netty.NettyClientConfig;
import qunar.tc.qmq.netty.client.NettyClient;
import qunar.tc.qmq.protocol.CommandCode;
import qunar.tc.qmq.protocol.Datagram;
import qunar.tc.qmq.protocol.RemotingHeader;
import qunar.tc.qmq.utils.PayloadHolderUtils;
import qunar.tc.qmq.utils.RetrySubjectUtils;

import javax.servlet.http.HttpServletRequest;

public class ResetOffsetAction implements MetaManagementAction {
private static final Logger LOGGER = LoggerFactory.getLogger(ResetOffsetAction.class);

private final Store store;
private final NettyClient client;

public ResetOffsetAction(Store store) {
this.client = NettyClient.getClient();
this.client.start(new NettyClientConfig());
this.store = store;
}

@Override
public ActionResult<String> handleAction(HttpServletRequest req) {
String subject = req.getParameter("subject");
String consumerGroup = req.getParameter("group");
int action = Integer.valueOf(req.getParameter("code"));

if (Strings.isNullOrEmpty(subject) || Strings.isNullOrEmpty(consumerGroup)) {
return ActionResult.error("subject and consumerGroup required");
}

if (action != 1 && action != 2) {
return ActionResult.error("action must 1 or 2, LATEST=1, EARLIEST=2");
}

final SubjectRoute subjectRoute = store.selectSubjectRoute(RetrySubjectUtils.getRealSubject(subject));
if (subjectRoute == null) {
return ActionResult.error("find no route");
}

Datagram datagram = buildResetOffsetDatagram(subject, consumerGroup, action);
for (final String brokerGroupName : subjectRoute.getBrokerGroups()) {
try {
final BrokerGroup brokerGroup = store.getBrokerGroup(brokerGroupName);
client.sendSync(brokerGroup.getMaster(), datagram, 2000);
} catch (Throwable e) {
LOGGER.error("send consume manage request error, brokerGroupName={}", brokerGroupName, e);
return ActionResult.error("reset failed: brokerGroupName=" + brokerGroupName);
}
}

return ActionResult.ok("success");
}

private Datagram buildResetOffsetDatagram(final String subject, final String consumerGroup, int code) {
final Datagram datagram = new Datagram();
final RemotingHeader header = new RemotingHeader();
header.setCode(CommandCode.CONSUME_MANAGE);
datagram.setHeader(header);
datagram.setPayloadHolder(out -> {
PayloadHolderUtils.writeString(subject, out);
PayloadHolderUtils.writeString(consumerGroup, out);
out.writeInt(code);
});
return datagram;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public ReadonlyBrokerGroupManager(final CachedMetaInfoManager cachedMetaInfoMana

public List<BrokerGroup> disableReadonlyBrokerGroup(String realSubject, int clientTypeCode, List<BrokerGroup> brokerGroups) {
if (clientTypeCode != ClientType.PRODUCER.getCode()
|| clientTypeCode != ClientType.DELAY_PRODUCER.getCode()) {
&& clientTypeCode != ClientType.DELAY_PRODUCER.getCode()) {
return brokerGroups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void start(ServletContext context) {
actions.register("AddDb", new TokenVerificationAction(new RegisterClientDbAction(clientDbConfigurationStore)));
actions.register("MarkReadonlyBrokerGroup", new TokenVerificationAction(new MarkReadonlyBrokerGroupAction(readonlyBrokerGroupSettingService)));
actions.register("UnMarkReadonlyBrokerGroup", new TokenVerificationAction(new UnMarkReadonlyBrokerGroupAction(readonlyBrokerGroupSettingService)));
actions.register("ResetOffset", new TokenVerificationAction(new ResetOffsetAction(store)));


resources.add(cachedMetaInfoManager);
Expand Down
2 changes: 1 addition & 1 deletion qmq-metrics-prometheus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-metrics-prometheus</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-remoting/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-remoting</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-server-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-server-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2018 Qunar, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package qunar.tc.qmq.processor;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.base.ConsumeManageRequest;
import qunar.tc.qmq.protocol.CommandCode;
import qunar.tc.qmq.protocol.Datagram;
import qunar.tc.qmq.protocol.RemotingCommand;
import qunar.tc.qmq.store.Storage;
import qunar.tc.qmq.util.RemotingBuilder;
import qunar.tc.qmq.utils.PayloadHolderUtils;

import java.util.concurrent.CompletableFuture;

/**
* @author yunfeng.yang
* @since 2017/11/22
*/
public class ConsumerManageProcessor extends AbstractRequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerManageProcessor.class);

private final Storage store;

public ConsumerManageProcessor(Storage store) {
this.store = store;
}

@Override
public CompletableFuture<Datagram> processRequest(ChannelHandlerContext ctx, RemotingCommand command) {
final ConsumeManageRequest request = deserialize(command.getBody());
LOG.info("receive consumer manager request:{}", request);

if (!checkRequest(request)) {
final Datagram datagram = RemotingBuilder.buildEmptyResponseDatagram(CommandCode.PARAM_ERROR, command.getHeader());
ctx.writeAndFlush(datagram);
return null;
}
store.updateConsumeQueue(request.getSubject(), request.getGroup(), request.getConsumerFromWhere());

final Datagram datagram = RemotingBuilder.buildEmptyResponseDatagram(CommandCode.SUCCESS, command.getHeader());
datagram.getHeader().setVersion(command.getHeader().getVersion());
ctx.writeAndFlush(datagram);
return null;
}

private boolean checkRequest(ConsumeManageRequest request) {
return request != null && request.getSubject() != null && request.getGroup() != null;
}

private ConsumeManageRequest deserialize(ByteBuf buf) {
String subject = PayloadHolderUtils.readString(buf);
String consumerGroup = PayloadHolderUtils.readString(buf);
int code = buf.readInt();
ConsumeManageRequest request = new ConsumeManageRequest();
request.setSubject(subject);
request.setGroup(consumerGroup);
request.setConsumerFromWhere(code);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,13 @@ private void startServerHandlers() {
this.storage.registerEventListener(ConsumerLogWroteEvent.class, pullMessageProcessor);
final SendMessageProcessor sendMessageProcessor = new SendMessageProcessor(sendMessageWorker);
final AckMessageProcessor ackMessageProcessor = new AckMessageProcessor(actorSystem, consumerSequenceManager, subscriberStatusChecker);
final ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(storage);

this.nettyServer = new NettyServer("broker", Runtime.getRuntime().availableProcessors(), listenPort, new BrokerConnectionEventHandler());
this.nettyServer.registerProcessor(CommandCode.SEND_MESSAGE, sendMessageProcessor, sendMessageExecutorService);
this.nettyServer.registerProcessor(CommandCode.PULL_MESSAGE, pullMessageProcessor);
this.nettyServer.registerProcessor(CommandCode.ACK_REQUEST, ackMessageProcessor);
this.nettyServer.registerProcessor(CommandCode.CONSUME_MANAGE, consumerManageProcessor);
this.nettyServer.start();
}

Expand Down
2 changes: 1 addition & 1 deletion qmq-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.3.3</version>
<version>1.1.3.5</version>
</parent>

<artifactId>qmq-store</artifactId>
Expand Down
Loading

0 comments on commit 82048b8

Please sign in to comment.