Skip to content

Commit

Permalink
Feature :集成protostuff 序列化
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklyc committed Jul 9, 2019
1 parent b1956f9 commit 11c29bb
Show file tree
Hide file tree
Showing 19 changed files with 253 additions and 72 deletions.
35 changes: 24 additions & 11 deletions learn2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,41 @@
</properties>

<dependencies>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>-->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>-->

<!--Kryo序列化-->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.0.0-RC1</version>
</dependency>

<!--protostuff-->
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.5.9</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.5.9</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.carrotsearch/java-sizeof -->
<dependency>
<groupId>com.carrotsearch</groupId>
Expand All @@ -52,12 +65,12 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<!--<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</exclusions>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface MetadataUpdater extends Closeable {
boolean isUpdateDue(long now);

/**
* metadata 是否需要更新,此方法返回的是元数据更新的倒计时
*
* 返回值是0 就是需要更新,其他大于0的数,就是还需要多久更新
*
*
* Starts a cluster metadata update if needed and possible. Returns the time until the metadata update (which would
* be 0 if an update has been started as a result of this call).
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
//等待更新cluster信息的阻塞方法
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
log.info("更新数据超时");
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

/**
* Partitioner Interface
* 分区器
*/

public interface Partitioner extends Configurable, Closeable {

/**
* Compute the partition for the given record.
*
*计算分区
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;

Expand All @@ -28,8 +24,8 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

/**默认分区策略
* The default partitioning strategy:
/**
* 默认分区策略 The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
Expand All @@ -39,49 +35,48 @@ public class DefaultPartitioner implements Partitioner {

private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {
}
public void configure(Map<String, ?> configs) {}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取该topic所有分区
// 获取该topic所有分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//key不存在
// key不存在
if (keyBytes == null) {
//获取该topic计数器
// 获取该topic计数器
int nextValue = nextValue(topic);
//获取可用分区
// 获取可用分区
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
//计算一个分区,key不存在的到是可用分区的随机一个
// 计算一个分区,key不存在的到是可用分区的随机一个
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//key存在,key存在的情况下,每次得到的都是统一分区
// key存在,key存在的情况下,每次得到的都是统一分区
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
//计数器
// 计数器
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
//随机产生一个随机数
// 随机产生一个随机数
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
Expand All @@ -91,7 +86,6 @@ private int nextValue(String topic) {
return counter.getAndIncrement();
}

public void close() {
}
public void close() {}

}
2 changes: 2 additions & 0 deletions learn2/src/main/java/org/apache/kafka/common/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ public PartitionInfo partition(TopicPartition topicPartition) {
*
* @param topic The topic name
* @return A list of partitions
* 计算指定topic的所有分区
*/
public List<PartitionInfo> partitionsForTopic(String topic) {
//
List<PartitionInfo> parts = this.partitionsByTopic.get(topic);
return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,15 @@ public ChannelState state() {
return this.state;
}

/**
* 检查连接是否建立完成
* @return
* @throws IOException
*/
public boolean finishConnect() throws IOException {
boolean connected = transportLayer.finishConnect();
if (connected)
//标记状态,连接建立完成,如果不是ready,则是在认证中
state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
return connected;
}
Expand Down Expand Up @@ -259,6 +265,12 @@ public boolean isInMutableState() {
return transportLayer.ready();
}

/**
* Channel是否ready,
* PlaintextTransportLayer 默认建立连接就是ready
* 采用PLAINTEXT 协议不用认证,默认是true
* @return
*/
public boolean ready() {
return transportLayer.ready() && authenticator.complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ public void poll(long timeout) throws IOException {
}

// Poll from channels where the underlying socket has more data
//轮询socket
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
readyKeys.clear();
Expand Down Expand Up @@ -509,6 +510,7 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
//遍历所有的key,从key中获取channel
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;

Expand All @@ -522,6 +524,7 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,

/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
//检查连接是否建立完成
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
Expand Down
2 changes: 1 addition & 1 deletion learn2/src/main/java/org/example/LearnApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @author maochao
* @date 2019/6/19
*/
@SpringBootApplication
//@SpringBootApplication
public class LearnApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static void main(String[] args) {
consumer.subscribe(Collections.singletonList(ConsumerConf.topic));
while (true) {
System.out.println("拉取消息一次");
ConsumerRecords<String, Object> records = consumer.poll(1000);
ConsumerRecords<String, Object> records = consumer.poll(5000);
handleMess(records);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.example.learn2.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;
Expand All @@ -17,13 +18,14 @@ public class ConsumerConf {

public static Properties initProperties() {
Properties properties = new Properties();
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
// properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//自定义解码器
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoDeserializer.class.getName());
// 自定义解码器
// properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtostuffDerializer.class.getName());
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.example.learn2.consumer;

import org.apache.kafka.common.serialization.Deserializer;
import org.example.learn2.entity.KafkaMessage;
import org.example.learn2.util.ProtostuffUtil;

import java.util.Map;

/**
* @author maochao
* @since 2019/7/8 19:29
*/
public class ProtostuffDerializer implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {

}

@Override
public Object deserialize(String topic, byte[] data) {
return ProtostuffUtil.deserialize(data, KafkaMessage.class);
}

@Override
public void close() {

}
}
Loading

0 comments on commit 11c29bb

Please sign in to comment.