Skip to content

Commit

Permalink
MINOR: Fixed clusterId reference in Metadata.update (apache#4360)
Browse files Browse the repository at this point in the history
Also fixed log message with respective error in KerberosLogin.login.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
satishd authored and hachikuji committed Dec 28, 2017
1 parent 89b245b commit a3aea3c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
22 changes: 11 additions & 11 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ public synchronized boolean containsTopic(String topic) {
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param newCluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
* @param now current time in milliseconds
*/
public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(cluster, "cluster should not be null");
public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(newCluster, "cluster should not be null");

this.needUpdate = false;
this.lastRefreshMs = now;
Expand All @@ -245,25 +245,25 @@ else if (expireMs <= now) {
}

for (Listener listener: listeners)
listener.onMetadataUpdate(cluster, unavailableTopics);
listener.onMetadataUpdate(newCluster, unavailableTopics);

String previousClusterId = cluster.clusterResource().clusterId();

if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(cluster);
this.cluster = getClusterForCurrentTopics(newCluster);
} else {
this.cluster = cluster;
this.cluster = newCluster;
}

// The bootstrap cluster is guaranteed not to have any useful information
if (!cluster.isBootstrapConfigured()) {
String clusterId = cluster.clusterResource().clusterId();
if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))
log.info("Cluster ID: {}", cluster.clusterResource().clusterId());
clusterResourceListeners.onUpdate(cluster.clusterResource());
if (!newCluster.isBootstrapConfigured()) {
String newClusterId = newCluster.clusterResource().clusterId();
if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
log.info("Cluster ID: {}", newClusterId);
clusterResourceListeners.onUpdate(newCluster.clusterResource());
}

notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,8 @@ public void run() {
return;
}
} else {
log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'; " +
"exception was: %s. Exiting refresh thread.",
principal, kinitCmd, kinitArgs, e, e);
log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
"Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
return;
}
}
Expand Down

0 comments on commit a3aea3c

Please sign in to comment.