Skip to content

Commit

Permalink
register listener on dynamic-config zpath even if deserialization fai…
Browse files Browse the repository at this point in the history
…l on service startup (apache#351)
  • Loading branch information
rdhabalia authored and merlimat committed Apr 13, 2017
1 parent fc10931 commit bd5a622
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ public void onUpdate(String path, Map<String, String> data, Stat stat) {
}

private void updateDynamicServiceConfiguration() {

try {
// create dynamic-config znode if not present
if (pulsar.getZkClient().exists(BROKER_SERVICE_CONFIGURATION_PATH, false) == null) {
Expand All @@ -973,52 +974,53 @@ private void updateDynamicServiceConfiguration() {
}
Optional<Map<String, String>> data = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
if (data.isPresent() && data.get() != null) {
data.get().forEach((key,value)-> {
data.get().forEach((key, value) -> {
try {
Field field = ServiceConfiguration.class.getDeclaredField(key);
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
field.set(pulsar().getConfiguration(), FieldParser.value(value,field));
field.set(pulsar().getConfiguration(), FieldParser.value(value, field));
log.info("Successfully updated {}/{}", key, value);
}
} catch (Exception e) {
log.warn("Failed to update service configuration {}/{}, {}",key,value,e.getMessage());
}
log.warn("Failed to update service configuration {}/{}, {}", key, value, e.getMessage());
}
});
}
// register a listener: it updates field value and triggers appropriate registered field-listener only if
// field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
@SuppressWarnings("unchecked")
@Override
public void onUpdate(String path, Map<String, String> data, Stat stat) {
if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) {
data.forEach((configKey, value) -> {
Field configField = dynamicConfigurationMap.get(configKey);
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
}
});
}
}
});
} catch (Exception e) {
log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e);
}
// register a listener: it updates field value and triggers appropriate registered field-listener only if
// field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration
dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
@SuppressWarnings("unchecked")
@Override
public void onUpdate(String path, Map<String, String> data, Stat stat) {
if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) {
data.forEach((configKey, value) -> {
Field configField = dynamicConfigurationMap.get(configKey);
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
}
});
}
}
});

}

public static ConcurrentOpenHashMap<String, Field> getDynamicConfigurationMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,42 @@ public void testUpdateDynamicConfigurationWithZkWatch() throws Exception {

}

/**
* Verifies broker sets watch on dynamic-configuration map even with invalid init json data
* <pre>
* 1. Set invalid json at dynamic-config znode
* 2. Broker fails to deserialize znode content but sets the watch on znode
* 3. Update znode with valid json map
* 4. Broker should get watch and update the dynamic-config map
* </pre>
* @throws Exception
*/
@Test
public void testInvalidDynamicConfigContentInZK() throws Exception {
final int newValue = 10;
stopBroker();
// set invalid data into dynamic-config znode so, broker startup fail to deserialize data
mockZookKeeper.setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH, "$".getBytes(), -1);
// start broker: it should have set watch even if with failure of deserialization
startBroker();
Assert.assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue);
// update zk with config-value which should fire watch and broker should update the config value
Map<String, String> configMap = Maps.newHashMap();
configMap.put("brokerShutdownTimeoutMs", Integer.toString(newValue));
mockZookKeeper.setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMap), -1);
// wait config to be updated
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != newValue) {
Thread.sleep(100 + (i * 10));
} else {
break;
}
}
// verify value is updated
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue);
}

/**
* <pre>
* verifies: that registerListener updates pulsar.config value with newly updated zk-dynamic config
Expand Down

0 comments on commit bd5a622

Please sign in to comment.