1
1
``` text
2
- Kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,
3
- 并使您能够将消息从一个端点传递到另一个端点;
4
- 1.Kafka的几个基本术语:
5
- Topics(主题): 属于特定类别的消息流称为主题;数据存储在主题中;
6
- Partition(分区): 每个主题可能有多个分区;
7
- Partition offset(分区偏移): 分区上每条记录的唯一序列标识;
8
- Replicas of partition(分区备份): 分区的备份,从不读取或写入数据;
9
- Brokers(经纪人): kafka集群中每个服务称为broker;
10
- Producers(生产者): 发送给一个或多个Kafka主题的消息的发布者;
11
- Consumers(消费者): 订阅一个或多个主题,并从broker提取已发布的消息来使用;
12
- Leader(领导者): 负责给定分区的所有读取和写入的节点;每个分区都有一个服务器充当Leader;
13
- Follower(追随者): 同步Leader的partition消息;
14
- Consumer Group(消费者组): Topic消息分配到消费者组,再由消费者组分配到具体消费实例;
15
- (每个分区最多只能绑定一个消费者,每个消费者可以消费多个分区)
16
- 2.Kafka安装使用:
17
- Kafka下载地址:http://kafka.apache.org/downloads,选择Binary downloads下载,然后解压即可;
18
- Kafka的配置文件位于config目录下(包含kafka和Zookeeper的配置文件),打开server.properties,
19
- 将broker.id的值修改为1,每个broker的id都必须设置为Integer类型,且不能重复;
20
- [1]启动Zookeeper:
21
- (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
2
+ Kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,
3
+ 并使您能够将消息从一个端点传递到另一个端点;
4
+ 1.Kafka的几个基本术语:
5
+ Topics(主题): 属于特定类别的消息流称为主题;数据存储在主题中;
6
+ Partition(分区): 每个主题可能有多个分区;
7
+ Partition offset(分区偏移): 分区上每条记录的唯一序列标识;
8
+ Replicas of partition(分区备份): 分区的备份,从不读取或写入数据;
9
+ Brokers(经纪人): kafka集群中每个服务称为broker;
10
+ Producers(生产者): 发送给一个或多个Kafka主题的消息的发布者;
11
+ Consumers(消费者): 订阅一个或多个主题,并从broker提取已发布的消息来使用;
12
+ Leader(领导者): 负责给定分区的所有读取和写入的节点;每个分区都有一个服务器充当Leader;
13
+ Follower(追随者): 同步Leader的partition消息;
14
+ Consumer Group(消费者组): Topic消息分配到消费者组,再由消费者组分配到具体消费实例;
15
+ (每个分区最多只能绑定一个消费者,每个消费者可以消费多个分区)
16
+ 2.Kafka安装使用:
17
+ Kafka下载地址:http://kafka.apache.org/downloads,选择Binary downloads下载,然后解压即可;
18
+ Kafka的配置文件位于config目录下(包含kafka和Zookeeper的配置文件),打开server.properties,
19
+ 将broker.id的值修改为1,每个broker的id都必须设置为Integer类型,且不能重复;
20
+ [1]启动Zookeeper:
21
+ (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
22
22
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
23
- (2)Linux下,在终端命令行切换到Kafka根目录,以后台进程的方式执行启动脚本:
23
+ (2)Linux下,在终端命令行切换到Kafka根目录,以后台进程的方式执行启动脚本:
24
24
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
25
- [2]启动Kafka:
26
- (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
25
+ [2]启动Kafka:
26
+ (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
27
27
bin\windows\kafka-server-start.bat config\server.properties
28
- (2)Linux下,在终端命令行切换到Kafka根目录,执行启动脚本:
28
+ (2)Linux下,在终端命令行切换到Kafka根目录,执行启动脚本:
29
29
bin/kafka-server-start.sh config/server.properties
30
- 当看到命令行打印started等信息,说明启动完毕;
31
- [3]创建Topic:
32
- (1)Windows下,在cmd中切换到Kafka根目录,执行创建Topic脚本:
30
+ 当看到命令行打印started等信息,说明启动完毕;
31
+ [3]创建Topic:
32
+ (1)Windows下,在cmd中切换到Kafka根目录,执行创建Topic脚本:
33
33
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181
34
34
--replication-factor 1 --partitions 1 --topic test
35
- (创建一个Topic到ZK(指定ZK的地址),副本个数为1,分区数为1,Topic的名称为test)
36
- (2)Linux下,在终端命令行切换到Kafka根目录,执行创建Topic脚本:
35
+ (创建一个Topic到ZK(指定ZK的地址),副本个数为1,分区数为1,Topic的名称为test)
36
+ (2)Linux下,在终端命令行切换到Kafka根目录,执行创建Topic脚本:
37
37
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
38
38
--partitions 1 --topic test
39
- [4]查看Kafka里的Topic列表:
40
- (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
39
+ [4]查看Kafka里的Topic列表:
40
+ (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
41
41
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
42
- (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
42
+ (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
43
43
bin/kafka-topics.sh --list --zookeeper localhost:2181
44
- [5]查看某个Topic的具体信息: (如:test)
45
- (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
44
+ [5]查看某个Topic的具体信息: (如:test)
45
+ (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
46
46
bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
47
- (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
47
+ (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
48
48
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
49
- [6]启动Producers:
50
- (1)Windows下,在cmd中切换到Kafka根目录,执行producer脚本:
49
+ [6]启动Producers:
50
+ (1)Windows下,在cmd中切换到Kafka根目录,执行producer脚本:
51
51
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
52
- (9092为生产者的默认端口号,启动生产者后,可往test Topic里发送数据)
53
- (2)Linux下,在终端命令行切换到Kafka根目录,执行producer脚本:
52
+ (9092为生产者的默认端口号,启动生产者后,可往test Topic里发送数据)
53
+ (2)Linux下,在终端命令行切换到Kafka根目录,执行producer脚本:
54
54
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
55
- [7]启动Consumers:
56
- (1)Windows下,在cmd中切换到Kafka根目录,执行consumer脚本:
55
+ [7]启动Consumers:
56
+ (1)Windows下,在cmd中切换到Kafka根目录,执行consumer脚本:
57
57
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
58
58
--topic test --from-beginning
59
- (from-beginning表示从头开始读取数据)
60
- (2)Linux下,在终端命令行切换到Kafka根目录,执行consumer脚本:
59
+ (from-beginning表示从头开始读取数据)
60
+ (2)Linux下,在终端命令行切换到Kafka根目录,执行consumer脚本:
61
61
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
62
62
--topic test --from-beginning
63
- 3.Spring Boot整合Kafaka:
64
- [1]引入web依赖和kafka依赖:
63
+ 3.Spring Boot整合Kafaka:
64
+ [1]引入web依赖和kafka依赖:
65
65
<dependency>
66
66
<groupId>org.springframework.boot</groupId>
67
67
<artifactId>spring-boot-starter-web</artifactId>
70
70
<groupId>org.springframework.kafka</groupId>
71
71
<artifactId>spring-kafka</artifactId>
72
72
</dependency>
73
- [2]配置生产者:
74
- (1)通过配置类,配置生产者工厂及kafka模板:
73
+ [2]配置生产者:
74
+ (1)通过配置类,配置生产者工厂及kafka模板:
75
75
@Configuration
76
76
public class KafkaProducerConfig {
77
77
@Value("${spring.kafka.bootstrap-servers}")
@@ -83,37 +83,37 @@ Kafka
83
83
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
84
84
bootstrapServers);
85
85
configProps.put(
86
- //key的序列化策略,String类型
86
+ //key的序列化策略,String类型
87
87
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
88
88
StringSerializer.class);
89
89
configProps.put(
90
- //value的序列化策略,String类型
90
+ //value的序列化策略,String类型
91
91
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
92
92
StringSerializer.class);
93
93
return new DefaultKafkaProducerFactory<>(configProps);
94
94
}
95
- //其包含了发送消息的便捷方法
95
+ //其包含了发送消息的便捷方法
96
96
@Bean
97
97
public KafkaTemplate<String, String> kafkaTemplate() {
98
98
return new KafkaTemplate<>(producerFactory());
99
99
}
100
100
}
101
- (2)配置文件application.yml中配置生产者的地址:
101
+ (2)配置文件application.yml中配置生产者的地址:
102
102
spring:
103
103
kafka:
104
104
bootstrap-servers: localhost:9092
105
- [3]编写发送消息的controller:
105
+ [3]编写发送消息的controller:
106
106
@RestController
107
107
public class SendMessageController {
108
108
@Autowired
109
109
private KafkaTemplate<String, String> kafkaTemplate;
110
110
@GetMapping("send/{message}")
111
111
public void send(@PathVariable String message) {
112
- // test为Topic的名称,message为要发送的消息
112
+ // test为Topic的名称,message为要发送的消息
113
113
this.kafkaTemplate.send("test", message);
114
114
}
115
115
}
116
- send方法是异步方法,可通过回调的方式来确定消息是否发送成功,改造controller:
116
+ send方法是异步方法,可通过回调的方式来确定消息是否发送成功,改造controller:
117
117
@RestController
118
118
public class SendMessageController {
119
119
private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -126,19 +126,19 @@ Kafka
126
126
future.addCallback(new ListenableFutureCallback<SendResult<String,String>>(){
127
127
@Override
128
128
public void onSuccess(SendResult<String, String> result) {
129
- logger.info("成功发送消息:{},offset=[{}]", message,
129
+ logger.info("成功发送消息:{},offset=[{}]", message,
130
130
result.getRecordMetadata().offset());
131
131
}
132
132
@Override
133
133
public void onFailure(Throwable ex) {
134
- logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
134
+ logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
135
135
}
136
136
});
137
137
}
138
138
}
139
- [4]配置消费者:
140
- (1)通过配置类,配置消费者工厂和监听容器工厂:
141
- //配置类上需要@EnableKafka注释才能在Spring托管Bean上检测@KafkaListener注解
139
+ [4]配置消费者:
140
+ (1)通过配置类,配置消费者工厂和监听容器工厂:
141
+ //配置类上需要@EnableKafka注释才能在Spring托管Bean上检测@KafkaListener注解
142
142
@EnableKafka
143
143
@Configuration
144
144
public class KafkaConsumerConfig {
@@ -177,40 +177,40 @@ Kafka
177
177
return factory;
178
178
}
179
179
}
180
- (2)在application.yml里配置消费者组ID和消息读取策略:
180
+ (2)在application.yml里配置消费者组ID和消息读取策略:
181
181
spring:
182
182
kafka:
183
183
consumer:
184
184
group-id: test-consumer
185
185
auto-offset-reset: latest
186
186
187
- 消息读取策略,包含四个可选值:
188
- earliest:当各分区下有已提交的offset时,从提交的offset开始消费;
189
- 无提交的offset时,从头开始消费;
190
- latest:当各分区下有已提交的offset时,从提交的offset开始消费;
191
- 无提交的offset时,消费新产生的该分区下的数据;
192
- none:topic各分区都存在已提交的offset时,从offset后开始消费;
193
- 只要有一个分区不存在已提交的offset,则抛出异常;
194
- exception:直接抛出异常;
195
- [5]编写消息监听器类:
187
+ 消息读取策略,包含四个可选值:
188
+ earliest:当各分区下有已提交的offset时,从提交的offset开始消费;
189
+ 无提交的offset时,从头开始消费;
190
+ latest:当各分区下有已提交的offset时,从提交的offset开始消费;
191
+ 无提交的offset时,消费新产生的该分区下的数据;
192
+ none:topic各分区都存在已提交的offset时,从offset后开始消费;
193
+ 只要有一个分区不存在已提交的offset,则抛出异常;
194
+ exception:直接抛出异常;
195
+ [5]编写消息监听器类:
196
196
@Component
197
197
public class KafkaMessageListener {
198
198
private Logger logger = LoggerFactory.getLogger(this.getClass());
199
- // 指定监听的主题和消费者组
199
+ // 指定监听的主题和消费者组
200
200
@KafkaListener(topics = "test", groupId = "test-consumer")
201
201
public void listen(String message) {
202
- logger.info("接收消息: {}", message);
202
+ logger.info("接收消息: {}", message);
203
203
}
204
204
}
205
- 4.@KafkaListener详解:
206
- [1]同时监听来自多个Topic的消息: @KafkaListener(topics = "topic1, topic2")
207
- [2]@Header注解获取当前消息来自哪个分区:
205
+ 4.@KafkaListener详解:
206
+ [1]同时监听来自多个Topic的消息: @KafkaListener(topics = "topic1, topic2")
207
+ [2]@Header注解获取当前消息来自哪个分区:
208
208
@KafkaListener(topics = "test", groupId = "test-consumer")
209
209
public void listen(@Payload String message,
210
210
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
211
- logger.info("接收消息: {},partition:{}", message, partition);
211
+ logger.info("接收消息: {},partition:{}", message, partition);
212
212
}
213
- [3]指定只接收来自特定分区的消息:
213
+ [3]指定只接收来自特定分区的消息:
214
214
@KafkaListener(
215
215
groupId = "test-consumer",
216
216
topicPartitions = @TopicPartition(
@@ -222,28 +222,28 @@ Kafka
222
222
)
223
223
public void listen(@Payload String message,
224
224
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
225
- logger.info("接收消息: {},partition:{}", message, partition);
225
+ logger.info("接收消息: {},partition:{}", message, partition);
226
226
}
227
- 如果不需要指定initialOffset,上面代码可以简化为:
227
+ 如果不需要指定initialOffset,上面代码可以简化为:
228
228
@KafkaListener(groupId = "test-consumer",
229
229
topicPartitions = @TopicPartition(topic = "test", partitions = { "0", "1" }))
230
- 5.为消息监听添加消息过滤器: setRecordFilterStrategy(RecordFilterStrategy<K, V> strategy)
230
+ 5.为消息监听添加消息过滤器: setRecordFilterStrategy(RecordFilterStrategy<K, V> strategy)
231
231
@Bean
232
232
public ConcurrentKafkaListenerContainerFactory<String, String>
233
233
kafkaListenerContainerFactory() {
234
234
ConcurrentKafkaListenerContainerFactory<String, String> factory
235
235
= new ConcurrentKafkaListenerContainerFactory<>();
236
236
factory.setConsumerFactory(consumerFactory());
237
- // 添加过滤配置
237
+ // 添加过滤配置
238
238
factory.setRecordFilterStrategy( r -> r.value().contains("fuck"));
239
239
return factory;
240
240
}
241
- // RecordFilterStrategy接口: (是函数式接口)
241
+ // RecordFilterStrategy接口: (是函数式接口)
242
242
public interface RecordFilterStrategy<K, V> {
243
243
boolean filter(ConsumerRecord<K, V> var1);
244
244
}
245
- 6.发送复杂的消息: (通过自定义消息转换器来发送复杂的消息)
246
- [1]定义消息实体:
245
+ 6.发送复杂的消息: (通过自定义消息转换器来发送复杂的消息)
246
+ [1]定义消息实体:
247
247
public class Message implements Serializable {
248
248
private static final long serialVersionUID = 6678420965611108427L;
249
249
private String from;
@@ -260,14 +260,14 @@ Kafka
260
260
", message='" + message + '\'' +
261
261
'}';
262
262
}
263
- // get set 略
263
+ // get set 略
264
264
}
265
- [2]改造消息生产者配置:
265
+ [2]改造消息生产者配置:
266
266
@Configuration
267
267
public class KafkaProducerConfig {
268
268
@Value("${spring.kafka.bootstrap-servers}")
269
269
private String bootstrapServers;
270
- // 返回类型为ProducerFactory<String,Message>
270
+ // 返回类型为ProducerFactory<String,Message>
271
271
@Bean
272
272
public ProducerFactory<String, Message> producerFactory() {
273
273
Map<String, Object> configProps = new HashMap<>();
@@ -278,18 +278,18 @@ Kafka
278
278
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
279
279
StringSerializer.class);
280
280
configProps.put(
281
- //将value序列化策略指定为了Kafka提供的JsonSerializer
281
+ //将value序列化策略指定为了Kafka提供的JsonSerializer
282
282
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
283
283
JsonSerializer.class);
284
284
return new DefaultKafkaProducerFactory<>(configProps);
285
285
}
286
- // 返回类型为KafkaTemplate<String, Message>
286
+ // 返回类型为KafkaTemplate<String, Message>
287
287
@Bean
288
288
public KafkaTemplate<String, Message> kafkaTemplate() {
289
289
return new KafkaTemplate<>(producerFactory());
290
290
}
291
291
}
292
- [3]在controller中发送复杂消息:
292
+ [3]在controller中发送复杂消息:
293
293
@RestController
294
294
public class SendMessageController {
295
295
private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -300,7 +300,7 @@ Kafka
300
300
this.kafkaTemplate.send("test", new Message("kimi", message));
301
301
}
302
302
}
303
- [4]修改消费者配置:
303
+ [4]修改消费者配置:
304
304
@EnableKafka
305
305
@Configuration
306
306
public class KafkaConsumerConfig {
@@ -336,17 +336,17 @@ Kafka
336
336
return factory;
337
337
}
338
338
}
339
- [5]修改消息监听:
339
+ [5]修改消息监听:
340
340
@Component
341
341
public class KafkaMessageListener {
342
342
private Logger logger = LoggerFactory.getLogger(this.getClass());
343
- // 指定监听的主题和消费者组
343
+ // 指定监听的主题和消费者组
344
344
@KafkaListener(topics = "test", groupId = "test-consumer")
345
345
public void listen(Message message) {
346
- logger.info("接收消息: {}", message);
346
+ logger.info("接收消息: {}", message);
347
347
}
348
348
}
349
- 7.更多配置:
349
+ 7.更多配置:
350
350
(https://docs.spring.io/spring-boot/docs/2.1.1.RELEASE/reference/htmlsingle/#common-application-properties)
351
351
# APACHE KAFKA (KafkaProperties)
352
352
spring.kafka.admin.client-id=
@@ -443,4 +443,4 @@ Kafka
443
443
spring.kafka.streams.ssl.trust-store-type=
444
444
spring.kafka.streams.state-dir=
445
445
spring.kafka.template.default-topic=
446
- ```
446
+ ```
0 commit comments