Skip to content

Commit

Permalink
[fix][broker]After the broker is restarted, the cache dynamic configu…
Browse files Browse the repository at this point in the history
…ration is invalid (apache#17035)
  • Loading branch information
lordcheng10 authored Sep 6, 2022
1 parent b81d0a5 commit a7f1a56
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
Expand Down Expand Up @@ -2134,33 +2135,44 @@ private void handlePoliciesUpdates(NamespaceName namespace) {
}

private void handleDynamicConfigurationUpdates() {
pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfigurationAsync()
.thenAccept(optMap -> {
if (!optMap.isPresent()) {
return;
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
Field configField = dynamicConfigurationMap.get(configKey).field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
DynamicConfigurationResources dynamicConfigResources = null;
try {
dynamicConfigResources = pulsar()
.getPulsarResources()
.getDynamicConfigResources();
} catch (Exception e) {
log.warn("Failed to read dynamic broker configuration", e);
}

if (dynamicConfigResources != null) {
dynamicConfigResources.getDynamicConfigurationAsync()
.thenAccept(optMap -> {
if (!optMap.isPresent()) {
return;
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
Field configField = dynamicConfigurationMap.get(configKey).field;
Object newValue = FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener = configRegisteredListeners.get(configKey);
try {
Object existingValue = configField.get(pulsar.getConfiguration());
configField.set(pulsar.getConfiguration(), newValue);
log.info("Successfully updated configuration {}/{}", configKey,
data.get(configKey));
if (listener != null && !existingValue.equals(newValue)) {
listener.accept(newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
}
} catch (Exception e) {
log.error("Failed to update config {}/{}", configKey, newValue);
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
}
} else {
log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
}
});
});
});
}
}

/**
Expand Down Expand Up @@ -2267,10 +2279,7 @@ private void updateConfigurationAndRegisterListeners() {
return true;
});

// (2) update ServiceConfiguration value by reading zk-configuration-map
updateDynamicServiceConfiguration();

// (3) Listener Registration
// (2) Listener Registration
// add listener on "maxConcurrentLookupRequest" value change
registerConfigurationListener("maxConcurrentLookupRequest",
(maxConcurrentLookupRequest) -> lookupRequestSemaphore.set(
Expand Down Expand Up @@ -2407,6 +2416,12 @@ private void updateConfigurationAndRegisterListeners() {
});

// add more listeners here

// (3) create dynamic-config if not exist.
createDynamicConfigPathIfNotExist();

// (4) update ServiceConfiguration value by reading zk-configuration-map and trigger corresponding listeners.
handleDynamicConfigurationUpdates();
}

private void updateDefaultNumPartitions(int numPartitions) {
Expand Down Expand Up @@ -2593,6 +2608,21 @@ private void validateConfigKey(String key) {
}
}

private void createDynamicConfigPathIfNotExist() {
try {
Optional<Map<String, String>> configCache =
pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration();

// create dynamic-config if not exist.
if (!configCache.isPresent()) {
pulsar().getPulsarResources().getDynamicConfigResources()
.setDynamicConfigurationWithCreate(n -> Maps.newHashMap());
}
} catch (Exception e) {
log.warn("Failed to read dynamic broker configuration", e);
}
}

/**
* Updates pulsar.ServiceConfiguration's dynamic field with value persistent into zk-dynamic path. It also validates
* dynamic-value before updating it and throws {@code IllegalArgumentException} if validation fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
.toNanos(2000));

restartBroker();

// verify value again
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L);
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
.toNanos(2000));
}

/**
Expand Down

0 comments on commit a7f1a56

Please sign in to comment.