Skip to content

Commit

Permalink
comment :Metadata 注释
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklyc committed Jul 13, 2019
1 parent c89558b commit 824fcbd
Showing 1 changed file with 55 additions and 68 deletions.
123 changes: 55 additions & 68 deletions learn2/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients;

Expand All @@ -39,18 +35,16 @@
import java.util.Set;

/**
* 元数据
* 维护的就是一些topic相关的数据,最主要的是Cluster 对象,其他的属于逻辑变量。
* 所有public方法都是synchronized修饰了,线程安全类
* 元数据 维护的就是一些topic相关的数据,最主要的是Cluster 对象,其他的属于逻辑变量。 所有public方法都是synchronized修饰了,线程安全类
* <p>
* This class is shared by the client thread (for partitioning) and the background sender thread.
* <p>
* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
* topic we don't have any metadata for it will trigger a metadata update.
* <p>
* If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval
* is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
* manage topics while producers rely on topic expiry to limit the refresh set.
* If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval is removed
* from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly manage topics
* while producers rely on topic expiry to limit the refresh set.
*/
public final class Metadata implements Closeable {

Expand All @@ -59,31 +53,28 @@ public final class Metadata implements Closeable {
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
/**
* 刷新重试时间,KafkaProduer 实例化Metadata时候 采用的是 retry.backoff.ms配置
* 在这里指的时候 metadata的更新周期,
* 刷新重试时间,KafkaProduer 实例化Metadata时候 采用的是 retry.backoff.ms配置 在这里指的时候 metadata的更新周期,
*/
private final long refreshBackoffMs;
/**
* metadata 的过期时间, 默认 60,000ms
*/
private final long metadataExpireMs;
/**
* 版本号
*/
private int version;
/**
* 上一次更新时间
*/
private long lastRefreshMs;
/**
* 版本号
*/
private int version;
/**
* 上一次成功更新时间
*/
private long lastSuccessfulRefreshMs;
private AuthenticationException authenticationException;
/**
* 集群数据,核心数据
* 其他的是逻辑数据,帮助业务处理
* cluster是关键数据
* 集群数据,核心数据 其他的是逻辑数据,帮助业务处理 cluster是关键数据
*/
private Cluster cluster;
/**
Expand All @@ -94,6 +85,9 @@ public final class Metadata implements Closeable {
* topic的名字和过期时间映射关系
*/
private final Map<String, Long> topics;
/**
* 监听器
*/
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
/**
Expand All @@ -114,16 +108,16 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoT
/**
* Create a new Metadata instance
*
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that
* don't exist will be created by the broker when a metadata request is sent
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that
* don't exist will be created by the broker when a metadata request is sent
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.allowAutoTopicCreation = allowAutoTopicCreation;
Expand All @@ -148,9 +142,7 @@ public synchronized Cluster fetch() {
}

/**
* 添加新的topic 的名字到Metadata中进行维护,
* 会重置过期时间
* 刷新上一次更新时间,并且设置更新为true;
* 添加新的topic 的名字到Metadata中进行维护, 会重置过期时间 刷新上一次更新时间,并且设置更新为true;
*/
public synchronized void add(String topic) {
Objects.requireNonNull(topic, "topic cannot be null");
Expand All @@ -162,22 +154,18 @@ public synchronized void add(String topic) {
/**
* The next time to update the cluster info is the maximum of the time the current info will expire and the time the
* current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
* is now
* 计算下次更新metadata时间,这里也就是metadata更新策略:
* is now 计算下次更新metadata时间,这里也就是metadata更新策略:
*
* 1: 当needUpdate=true时,当前时间 - 上一次更新时间 > refreshBackoffMs 进行更新。
* 否则,继续等待到更新周期refreshBackoffMs进行跟新,
* 1: 当needUpdate=true时,当前时间 - 上一次更新时间 > refreshBackoffMs 进行更新。 否则,继续等待到更新周期refreshBackoffMs进行跟新,
*
* 2:如果needUpdate=false时候,检查metadata是否过期,如果过期了。则对更新周期进行判断。
* 2:如果needUpdate=false时候,检查metadata是否过期,如果过期了。则对更新周期进行判断。
*
* 3: refreshBackoffMs使用的是 retry.backoff.ms配置。默认更新周期100毫秒。
* 3: refreshBackoffMs使用的是 retry.backoff.ms配置。默认更新周期100毫秒。
*
* 总结:metadata的更新周期是100ms,过期时间是metadata.max.age.ms,默认60s。
* 如果当needUpdate=true,当更新开关打开,并且与上一次更新时间间隔达到100ms以上进行更新,
* 在更新开关没有打开的情况下,上一次更新是一个成功的操作,该metadata会缓存metadata.max.age.ms,
* 然后数据过期发生更新操作。
* 总结:metadata的更新周期是100ms,过期时间是metadata.max.age.ms,默认60s。 如果当needUpdate=true,当更新开关打开,并且与上一次更新时间间隔达到100ms以上进行更新,
* 在更新开关没有打开的情况下,上一次更新是一个成功的操作,该metadata会缓存metadata.max.age.ms, 然后数据过期发生更新操作。
*
* timeToNextUpdate(nowMs)的返回值就是还剩多久需要进行更新操作。
* timeToNextUpdate(nowMs)的返回值就是还剩多久需要进行更新操作。
*
*/

Expand Down Expand Up @@ -207,8 +195,8 @@ public synchronized boolean updateRequested() {
}

/**
* If any non-retriable authentication exceptions were encountered during
* metadata update, clear and return the exception.
* If any non-retriable authentication exceptions were encountered during metadata update, clear and return the
* exception.
*/
public synchronized AuthenticationException getAndClearAuthenticationException() {
if (authenticationException != null) {
Expand All @@ -221,8 +209,7 @@ public synchronized AuthenticationException getAndClearAuthenticationException()

/**
* Wait for metadata update until the current version is larger than the last version we know of
* 更新metadata,只要版本号没有更新并且metadata没有close,就会一直调用wait阻塞,
* 当阻塞总时间超过max.block.ms,就会抛出超时异常。
* 更新metadata,只要版本号没有更新并且metadata没有close,就会一直调用wait阻塞, 当阻塞总时间超过max.block.ms,就会抛出超时异常。
*
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
Expand All @@ -231,6 +218,7 @@ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs

long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
// 版本号判断
while ((this.version <= lastVersion) && !isClosed()) {
AuthenticationException ex = getAndClearAuthenticationException();
if (ex != null)
Expand All @@ -247,9 +235,8 @@ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs
}

/**
* Replace the current set of topics maintained to the one provided.
* If topic expiry is enabled, expiry time of the topics will be
* reset on the next update.
* Replace the current set of topics maintained to the one provided. If topic expiry is enabled, expiry time of the
* topics will be reset on the next update.
*
* @param topics
*/
Expand Down Expand Up @@ -280,13 +267,12 @@ public synchronized boolean containsTopic(String topic) {
}

/**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
* Updates the cluster metadata. If topic expiry is enabled, expiry time is set for topics if required and expired
* topics are removed from the metadata.
*
* @param newCluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
* @param now current time in milliseconds
* @param newCluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose leader is not known
* @param now current time in milliseconds
*/
public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(newCluster, "cluster should not be null");
Expand All @@ -300,14 +286,15 @@ public synchronized void update(Cluster newCluster, Set<String> unavailableTopic

if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, Long> entry = it.next();
long expireMs = entry.getValue();
if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
entry.setValue(now + TOPIC_EXPIRY_MS);
else if (expireMs <= now) {
it.remove();
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(),
expireMs, now);
}
}
}
Expand Down Expand Up @@ -339,8 +326,8 @@ else if (expireMs <= now) {
}

/**
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
* Record an attempt to update the metadata that failed. We need to keep track of this to avoid retrying
* immediately.
*/
public synchronized void failedUpdate(long now, AuthenticationException authenticationException) {
this.lastRefreshMs = now;
Expand Down Expand Up @@ -427,9 +414,9 @@ public interface Listener {
/**
* Callback invoked on metadata update.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose leader is not
* known
*/
void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics);
}
Expand Down

0 comments on commit 824fcbd

Please sign in to comment.