Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
dandan.sha committed Dec 7, 2018
1 parent daa93a6 commit f8814a3
Show file tree
Hide file tree
Showing 589 changed files with 47,697 additions and 0 deletions.
37 changes: 37 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# kdiff3 ignore
*.orig

# maven ignore
target/

# eclipse ignore
.settings/
.project
.classpath

# idea ignore
.idea/
*.ipr
*.iml
*.iws

# temp ignore
*.log
*.cache
*.diff
*.patch
*.tmp

# system ignore
.DS_Store
Thumbs.db

# package ignore (optional)
# *.jar
# *.war
# *.zip
# *.tar
# *.tar.gz
catalina.base_IS_UNDEFINED/

qconfig_test
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package:
mvn -U clean package -Dmaven.test.skip=true -DskipTests -am -pl qmq-dist
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# QMQ

QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景;
也包括报价搜索等高吞吐量场景。目前在公司内部日常消息qps在60W左右,生产上承载将近4W+消息topic,消息的端到端延迟可以控制在10ms以内。

主要提供以下特性:
* 异步实时消息
* 延迟/定时消息
* 基于Tag的服务端过滤
* Consumer端幂等处理支持
* Consumer端filter
* 死信消息
* 结合Spring annotation使用的简单API
* 提供丰富的监控指标
* 接入OpenTracing
* 分布式事务(即将开源)
* 消息投递轨迹(即将开源)
* 历史消息的自动备份(即将开源)

# 快速开始
你可以通过[设计背景](docs/cn/design.md)了解设计QMQ的初衷和她与其他消息队列的不同。
阅读[架构概览](docs/cn/arch.md)了解QMQ的存储模型

## 文档
* [快速入门](docs/cn/quickstart.md)
* [安装](docs/cn/install.md)
* [设计背景](docs/cn/design.md)
* [架构概览](docs/cn/arch.md)
* [代码模块介绍](docs/cn/code.md)
* [高可用](docs/cn/ha.md)
* [监控](docs/cn/monitor.md)
* [Trace](docs/cn/trace.md)
* [发送消息](docs/cn/producer.md)
* [消费消息](docs/cn/consumer.md)
* [延时/定时消息](docs/cn/delay.md)
* [服务端tag过滤](docs/cn/tag.md)
* [开源协议](docs/cn/opensource.md)
* [技术支持](docs/cn/support.md)

# 技术支持

### QQ群
![QQ](docs/images/support1.png)
67 changes: 67 additions & 0 deletions docs/cn/arch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
[上一页](design.md)
[回目录](../../readme.md)
[下一页](code.md)

# 架构概览
下图是QMQ中各组件及其交互图:
* meta server提供集群管理和集群发现的作用
* server 提供实时消息服务
* delay server 提供延时/定时消息服务,延时消息先在delay server排队,时间到之后再发送给server
* producer 消息生产者
* consumer 消息消费者

![架构图](../images/arch1.png)

根据图中的编号描述一下其交互过程
1. delay server 向meta server注册
2. 实时server 向meta server注册
3. producer在发送消息前需要询问meta server获取server list
4. meta server返回server list给producer(根据producer请求的消息类型返回不同的server list)
5. producer发送延时/定时消息
6. 延时时间已到,delay server将消息投递给实时server
7. producer发送实时消息
8. consumer需要拉取消息,在拉取之前向meta server获取server list(只会获取实时server的list)
9. meta server返回server list给consumer
10. consumer向实时server发起pull请求
11. 实时server将消息返回给consumer

下面分别对实时消息Server和延时/定时消息Server的存储模型进行描述

## 实时消息
在设计背景里,已经描述了QMQ没有采用基于partition存储模型,但是在学习Kafka和RocketMQ的存储实现方式后,有很多地方是值得借鉴的:
* 顺序append文件,提供很好的性能
* 顺序消费文件,使用offset表示消费进度,成本极低
* 将所有subject的消息合并在一起,减少parition数量,可以提供更多的subject(RocketMQ)

在演化QMQ的存储模型时,觉得这几点是非常重要的。那如何在不实用基于partition的情况下,又能得到这些特性呢?正所谓有前辈大师说:计算机中所有问题都可以通过添加一个中间层来解决,一个中间层解决不了那就添加两个。

我们通过添加一层拉取的log(pull log)来动态映射consumer与partition的逻辑关系,这样不仅解决了consumer的动态扩容缩容问题,还可以继续使用一个offset表示消费进度。

下图是QMQ的存储模型

![img](../images/arch3.png)

先解释一下上图中的数字的意义。上图中方框上方的数字,表示该方框在自己log中的偏移,而方框内的数字是该项的内容。比如message log方框上方的数字:3,6,9几表示这几条消息在message log中的偏移。而consume log中方框内的数字3,6,9,20正对应着messzge log的偏移,表示这几个位置上的消息都是subject1的消息,consume log方框上方的1,2,3,4表示这几个方框在consume log中的逻辑偏移。下面的pull log方框内的内容对应着consume log的逻辑偏移,而pull log方框外的数字表示pull log的逻辑偏移。

在实时Server存储模型中有三种重要的log:
* message log 所有subject的消息进入该log,消息的主存储
* consume log consume log存储的是message log的索引信息
* pull log 每个consumer拉取消息的时候会产生pull log,pull log记录的是拉取的消息在consume log中的sequence

那么消费者就可以使用pull log上的sequence来表示消费进度

## 延时/定时消息
QMQ提供任意时间的延时/定时消息,你可以指定消息在未来两年内任意时间内投递。比起RocketMQ提供的多个不同延时level的延时消息,QMQ的延时消息更加灵活。比如在OTA场景中,客人经常是预订未来某个时刻的酒店或者机票,这个时间是不固定的,我们无法使用几个固定的延时level来实现这个场景。

QMQ的延时/定时消息使用的是两层HashWheelTimer来实现的。第一层位于磁盘上,每个小时为一个刻度,每个刻度会生成一个日志文件,因为QMQ支持两年内的延迟消息,则最多会生成 2 * 366 * 24 = 17568 个文件。第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引从磁盘文件加载到内存中的HashWheelTimer上。

![img](../images/arch4.png)

在延时/定时消息里也存在三种log:
* message log 和实时消息里的message log类似,收到消息后append到该log
* schedule log 按照投递时间组织,每个小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层HashWheelTimer的第一层,位于磁盘上
* dispatch log 延时/定时消息投递后写入,主要用于在应用重启后能够确定哪些消息已经投递

[上一页](design.md)
[回目录](../../readme.md)
[下一页](code.md)
36 changes: 36 additions & 0 deletions docs/cn/code.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[上一页](arch.md)
[回目录](../../readme.md)
[下一页](ha.md)

# 代码模块介绍

### qmq-api
暴露给用户的一些interface

### qmq-common
一些公用的类,所有其他模块都可能引用

### qmq-server-common
公用的类,只有server side应用引用,不暴露给client side

### qmq-server
实时消息服务

### qmq-delay-server
延时/定时消息服务

### qmq-store
存储

### qmq-remoting
网络相关

### qmq-client
客户端逻辑

### qmq-metrics-prometheus
提供的prometheus监控接入

[上一页](arch.md)
[回目录](../../readme.md)
[下一页](ha.md)
122 changes: 122 additions & 0 deletions docs/cn/consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
[上一页](producer.md)
[回目录](../../readme.md)
[下一页](delay.md)


# 消费消息(consumer)

## 与Spring结合

QMQ除了提供使用API来消费消息的方式外,还提供了跟Spring结合的基于annotation的API,我们更推荐使用这种方式。QMQ已经与SpringBoot进行了集成,如果项目使用SpringBoot则只需要引入qmq-client.jar就可以直接使用annotation的API,如果使用传统Spring的话则需要在Spring的xml里进行如下配置:
```xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:qmq="http://www.qunar.com/schema/qmq"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.qunar.com/schema/qmq http://www.qunar.com/schema/qmq.xsd">

<qmq:consumer />

<context:annotation-config />
<context:component-scan base-package="qunar.tc.qmq.demo.consumer.*" />
</beans>
```

使用下面的代码就可以订阅消息了,是不是非常简单。
```java
@QmqConsumer(subject = "your subject", consumerGroup = "group", executor = "your executor bean name")
public void onMessage(Message message){
//process your message
String value = message.getStringProperty("key");
}
```
使用上面的方式订阅消息时,如果QmqConsumer标记的onMessage方法抛出异常,则该方法被认为是消费失败,消费失败的消息会再次消费,默认再次消费的间隔是5秒钟,这个可以进行配置。这里需要注意的是,如果有些通过重试也无法消除的异常,请将其在onMessage方法里捕获,而通过重试可以恢复的异常才抛出。

### 仅消费一次
有些消息的可靠性可能要求不高,不管是消费成功还是失败,仅仅消费一次即可,不期望重试,那么可以设置仅消费一次
```java
@QmqConsumer(subject = "your subject", consumerGroup="group", consumeMostOnce = true, executor = "your executor bean name")
public void onMessage(Message message){
//process your message
String value = message.getStringProperty("key");
}
```

### 广播消息
有这样的场景,我们每台机器都维护进程内内存,当数据有变更的时候,变更方会发送变更消息触发缓存更新,那么这个时候我们期望消费者每台机器都收到消息,这就是广播消息的场景了。
```java
@QmqConsumer(subject = "your subject", consumerGroup="group", isBroadcast = true, executor = "your executor bean name")
public void onMessage(Message message){
//update local cache
}
```

### 消费端过滤器(filter)
可以将一些公共逻辑放在filter里,这样可以将filter配置在所有消费者上。比如在QMQ里内置了opentracing就是通过filter实现的,不过这个filter是内置的,不需要额外配置。
```java

@Compoent
public class LogFilter implements Filter {

//在处理消息之前执行
public boolean preOnMessage(Message message, Map<String, Object> filterContext){

}

//在处理消息之后执行
public void postOnMessage(Message message, Throwable e, Map<String, Object> filterContext){

}
}

@QmqConsumer(subject = "your subject", consumerGroup="group", filters = {"logFilter"}, executor = "your executor bean name")
public void onMessage(Message message){
//update local cache
}
```

## 非Spring API
如果在非Spring环境中使用QMQ那就需要直接使用API了。QMQ提供了两种API:Listener和纯Pull。

### Listener

Listener的方式与@QmqConsumer提供的功能基本类似

```java
//推荐一个应用里只创建一个实例
MessageConsumerProvider consumer = new MessageConsumerProvider();
consumer.init();

consumer.addListener("your subject", "group", (m) -> {
//process message
}, new ThreadPoolExecutor(2,2,));
```

### Pull API

Pull API是最基础的API,需要考虑更多情况,如无必要,我们推荐使用annotation或者Listener的方式。

```java
//推荐一个应用里只创建一个实例
MessageConsumerProvider consumer = new MessageConsumerProvider();
consumer.init();

PullConsumer pullConsumer = consumer.getOrCreatePullConsumer("your subject", "group", false);
List<Message> messages = pullConsumer.pull(100, 1000);
for(Message message : messages){
//process message

//对于pull的使用方式,pull到的每一条消息都必须ack,如果处理成功ack的第二个参数为null
message.ack(1000, null);

//处理失败,则ack的第二个参数传入Throwable对象
//message.ack(1000, new Exception("消费失败"));
}
```

[上一页](producer.md)
[回目录](../../readme.md)
[下一页](delay.md)
5 changes: 5 additions & 0 deletions docs/cn/contributing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
如果你遇到问题或需要新功能,欢迎[创建issue]

如果你可以解决某个[issue], 欢迎发送PR

如果你觉得该项目对你有帮助,也请不吝Star
46 changes: 46 additions & 0 deletions docs/cn/delay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[上一页](consumer.md)
[回目录](../../readme.md)
[下一页](tag.md)

# 延时/定时消息

延时/定时消息是指生产者(producer)发送消息到server后,server并不将消息立即发送给消费者(consumer),而是在producer指定的时间之后送达。比如在电商交易中,经常有这样的场景:下单后如果半个小时内没有支付,自动将订单取消。那么如果不使用延时/定时消息,则一般的做法是使用定时任务定期扫描订单状态表,如果半个小时后订单状态还未支付,则将订单取消。而使用延时/定时消息实现起来则比较优雅:用户下单后,发送一个延时消息,指定半个小时后消息送达,那么消费者在半个小时后收到消息就查询消息状态,如果这个时候订单是未支付状态,则取消订单。

延时/定时消息的消费与实时消息一致,请参照[消费消息](consumer.md)

注意: 延时/定时消息使用的时间都是指delay server服务器的时间,所以请确保delay server的服务器时间偏差不要太大。另外,延时/定时消息的精度在1秒左右,在业务设计时应该考虑到这一点,比如不要期望能达到延时100ms的效果。

## 发送延时消息

延时消息是指消息在当前时间之后一段时间后发送

```java
Message message = producer.generateMessage("your subject");
message.setProperty("key", "value");

//指定消息延时30分钟
message.setDelayTime(30, TimeUnit.MINUTES);

//发送消息
producer.sendMessage(message);
```

## 发送定时消息

定时消息是指指定消息的发送时间。需要注意的是如果指定的发送时间小于或等于当前时间,消息是会立即发送的

```java
Message message = producer.generateMessage("your subject");
message.setProperty("key", "value");

Date deadline = DateUtils.addMinutes(new Date(), 30);
//指定发送时间
message.setDelayTime(deadline);

//发送消息
producer.sendMessage(message);
```

[上一页](consumer.md)
[回目录](../../readme.md)
[下一页](tag.md)
Loading

0 comments on commit f8814a3

Please sign in to comment.