Skip to content

Commit

Permalink
KAFKA-1398 Dynamic config follow-on-comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Apr 18, 2014
1 parent 037c054 commit 89f040c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/TopicConfigManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
* To update a topic config we first update the topic config properties. Then we create a new sequential
* znode under the change path which contains the name of the topic that was updated, say
* /brokers/config_changes/config_change_13321
* This is just a notification--the actual config change is stored only once under the /brokers/topics/<topic_name>/config path.
*
* This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
* It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
Expand Down Expand Up @@ -94,7 +95,7 @@ class TopicConfigManager(private val zkClient: ZkClient,
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification)
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
if(jsonOpt.isDefined) {
val json = jsonOpt.get
val topic = json.substring(1, json.length - 1) // hacky way to dequote
Expand All @@ -116,8 +117,7 @@ class TopicConfigManager(private val zkClient: ZkClient,
}

private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
val eligible = notifications.sorted.dropRight(1) // never purge newest notification--we need it for the seq number
for(notification <- eligible) {
for(notification <- notifications.sorted) {
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification)
if(jsonOpt.isDefined) {
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

import junit.framework.Assert._
Expand Down

0 comments on commit 89f040c

Please sign in to comment.