Skip to content

Commit

Permalink
[pulsar-broker] add Delete dynamic config api (apache#4614)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Jun 28, 2019
1 parent ed7cdca commit 20d2499
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -132,6 +133,18 @@ public void updateDynamicConfiguration(@PathParam("configName") String configNam
updateDynamicConfigurationOnZk(configName, configValue);
}

@DELETE
@Path("/configuration/{configName}")
@ApiOperation(value = "Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 412, message = "Invalid dynamic-config value"),
@ApiResponse(code = 500, message = "Internal server error") })
public void deleteDynamicConfiguration(@PathParam("configName") String configName) throws Exception {
validateSuperUserAccess();
deleteDynamicConfigurationOnZk(configName);
}

@GET
@Path("/configuration/values")
@ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
Expand Down Expand Up @@ -320,5 +333,34 @@ private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture,
});
});
}

private synchronized void deleteDynamicConfigurationOnZk(String configName) {
try {
if (BrokerService.isDynamicConfiguration(configName)) {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElse(null);
if (configurationMap != null && configurationMap.containsKey(configName)) {
configurationMap.remove(configName);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
serviceConfigZkVersion = localZk()
.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
}
LOG.info("[{}] Deleted Service configuration {}", clientAppId(), configName);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName);
}
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
}
} catch (RestException re) {
throw re;
} catch (Exception ie) {
LOG.error("[{}] Failed to update configuration {}, {}", clientAppId(), configName, ie.getMessage(), ie);
throw new RestException(ie);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -480,6 +481,13 @@ public void testUpdateDynamicConfigurationWithZkWatch() throws Exception {
assertTrue(pulsar.getConfiguration().getSuperUserRoles().contains(user1));
assertTrue(pulsar.getConfiguration().getSuperUserRoles().contains(user2));


admin.brokers().updateDynamicConfiguration("loadManagerClassName", SimpleLoadManagerImpl.class.getName());
retryStrategically((test) -> pulsar.getConfiguration().getLoadManagerClassName()
.equals(SimpleLoadManagerImpl.class.getName()), 150, 5);
assertEquals(pulsar.getConfiguration().getLoadManagerClassName(), SimpleLoadManagerImpl.class.getName());
admin.brokers().deleteDynamicConfiguration("loadManagerClassName");
assertFalse(admin.brokers().getAllDynamicConfigurations().containsKey("loadManagerClassName"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ public interface Brokers {
* @throws PulsarAdminException
*/
void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException;

/**
* It deletes dynamic configuration value in to Zk. It will not impact current value in broker but next time when
* broker restarts, it applies value from configuration file only.
*
* @param key
* @param value
* @throws PulsarAdminException
*/
void deleteDynamicConfiguration(String configName) throws PulsarAdminException;

/**
* Get list of updatable configuration name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ public void updateDynamicConfiguration(String configName, String configValue) th
}
}

@Override
public void deleteDynamicConfiguration(String configName) throws PulsarAdminException {
try {
request(adminBrokers.path("/configuration/").path(configName)).delete(ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public Map<String, String> getAllDynamicConfigurations() throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ void brokers() throws Exception {

brokers.run(split("update-dynamic-config --config brokerShutdownTimeoutMs --value 100"));
verify(mockBrokers).updateDynamicConfiguration("brokerShutdownTimeoutMs", "100");

brokers.run(split("delete-dynamic-config --config brokerShutdownTimeoutMs"));
verify(mockBrokers).deleteDynamicConfiguration("brokerShutdownTimeoutMs");

brokers.run(split("get-internal-config"));
verify(mockBrokers).getInternalConfigurationData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Delete dynamic-serviceConfiguration of broker")
private class DeleteConfigurationCmd extends CliCommand {
@Parameter(names = "--config", description = "service-configuration name", required = true)
private String configName;

@Override
void run() throws Exception {
admin.brokers().deleteDynamicConfiguration(configName);
}
}

@Parameters(commandDescription = "Get all overridden dynamic-configuration values")
private class GetAllConfigurationsCmd extends CliCommand {

Expand Down Expand Up @@ -118,6 +129,7 @@ public CmdBrokers(PulsarAdmin admin) {
jcommander.addCommand("list", new List());
jcommander.addCommand("namespaces", new Namespaces());
jcommander.addCommand("update-dynamic-config", new UpdateConfigurationCmd());
jcommander.addCommand("delete-dynamic-config", new DeleteConfigurationCmd());
jcommander.addCommand("list-dynamic-config", new GetUpdatableConfigCmd());
jcommander.addCommand("get-all-dynamic-config", new GetAllConfigurationsCmd());
jcommander.addCommand("get-internal-config", new GetInternalConfigurationCmd());
Expand Down

0 comments on commit 20d2499

Please sign in to comment.