Skip to content

Commit

Permalink
Update Broker service configuration dynamically (apache#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Mar 5, 2017
1 parent 8544200 commit 92b79d0
Show file tree
Hide file tree
Showing 11 changed files with 623 additions and 6 deletions.
154 changes: 154 additions & 0 deletions docs/AdminTools.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
- [Brokers](#brokers)
- [list of active brokers](#list-of-active-brokers)
- [list of namespaces owned by a given broker](#list-of-namespaces-owned-by-a-given-broker)
- [update dynamic configuration](#update-dynamic-configuration)
- [get list of dynamic configuration name](#get-list-of-dynamic-configuration-name)
- [get value of dynamic configurations](#get-value-of-dynamic-configurations)
- [Properties](#properties)
- [list existing properties](#list-existing-properties)
- [create property](#create-property)
Expand Down Expand Up @@ -233,6 +236,157 @@ GET /admin/brokers/{cluster}/{broker}/ownedNamespaces
admin.brokers().getOwnedNamespaces(cluster,brokerUrl)
```

#### update dynamic configuration
Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.

###### CLI

```
$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
```

```
N/A
```

###### REST

```
GET /admin/brokers/configuration/{configName}/{configValue}
```

###### Java

```java
admin.brokers().updateDynamicConfiguration(configName, configValue)
```

#### get list of dynamic configuration name
It gives list of updatable dynamic service-configuration name.

###### CLI

```
$ pulsar-admin brokers list-dynamic-config
```

```
brokerShutdownTimeoutMs
```

###### REST

```
GET /admin/brokers/configuration
```

###### Java

```java
admin.brokers().getDynamicConfigurationNames()
```

#### get value of dynamic configurations
It gives value of all dynamic configurations stored in zookeeper

###### CLI

```
$ pulsar-admin brokers get-all-dynamic-config
```

```
brokerShutdownTimeoutMs:100
```

###### REST

```
GET /admin/brokers/configuration/values
```

###### Java

```java
admin.brokers().getAllDynamicConfigurations()
```

#### Update dynamic configuration
Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.

###### CLI

```
$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
```

```
N/A
```

###### REST

```
GET /admin/brokers/configuration/{configName}/{configValue}
```

###### Java

```java
admin.brokers().updateDynamicConfiguration(configName, configValue)
```

#### Get list of dynamic configuration name
It gives list of updatable dynamic service-configuration name.

###### CLI

```
$ pulsar-admin brokers list-dynamic-config
```

```
brokerShutdownTimeoutMs
```

###### REST

```
GET /admin/brokers/configuration
```

###### Java

```java
admin.brokers().getDynamicConfigurationNames()
```

#### Get value of dynamic configurations
It gives value of all dynamic configurations stored in zookeeper

###### CLI

```
$ pulsar-admin brokers get-all-dynamic-config
```

```
brokerShutdownTimeoutMs:100
```

###### REST

```
GET /admin/brokers/configuration/values
```

###### Java

```java
admin.brokers().getAllDynamicConfigurations()
```



### Properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ServiceConfiguration implements PulsarConfiguration{
private long zooKeeperSessionTimeoutMillis = 30000;
// Time to wait for broker graceful shutdown. After this time elapses, the
// process will be killed
@FieldContext(dynamic = true)
private long brokerShutdownTimeoutMs = 3000;
// Enable backlog quota check. Enforces action on topic when the quota is
// reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,11 @@
* @return character length of field
*/
public int maxCharLength() default Integer.MAX_VALUE;

/**
* allow field to be updated dynamically
*
* @return
*/
public boolean dynamic() default false;
}
111 changes: 107 additions & 4 deletions pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,48 @@
*/
package com.yahoo.pulsar.broker.admin;

import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;

import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;

import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;


@Path("/brokers")
@Api(value = "/brokers", description = "Brokers admin apis", tags = "brokers")
@Produces(MediaType.APPLICATION_JSON)
public class Brokers extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(Brokers.class);

private int serviceConfigZkVersion = -1;

@GET
@Path("/{cluster}")
@ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.", response = String.class, responseContainer = "Set")
Expand Down Expand Up @@ -79,4 +95,91 @@ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaes(@PathParam("clust
throw new RestException(e);
}
}

@POST
@Path("/configuration/{configName}/{configValue}")
@ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
@ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
@ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
validateSuperUserAccess();
updateDynamicConfigurationOnZk(configName, configValue);
}

@GET
@Path("/configuration/values")
@ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
@ApiResponses(value = { @ApiResponse(code = 404, message = "Configuration not found") })
public Map<String, String> getAllDynamicConfigurations() throws Exception {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = null;
try {
configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
} catch (RestException e) {
LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
throw e;
} catch (Exception e) {
LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
return configurationMap;
}

@GET
@Path("/configuration")
@ApiOperation(value = "Get all updatable dynamic configurations's name")
public List<String> getDynamicConfigurationName() {
return BrokerService.getDynamicConfigurationMap().keys();
}

/**
* if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
* all other brokers get the watch and can see the change and take appropriate action on the change.
*
* @param configName
* : configuration key
* @param configValue
* : configuration value
*/
private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
try {
if (BrokerService.getDynamicConfigurationMap().containsKey(configName)) {
ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
.getDynamicConfigurationCache();
Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
.orElse(null);
if (configurationMap != null) {
configurationMap.put(configName, configValue);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
serviceConfigZkVersion = localZk()
.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
} else {
configurationMap = Maps.newHashMap();
configurationMap.put(configName, configValue);
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
configValue);
}
throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
}
} catch (RestException re) {
throw re;
} catch (Exception ie) {
LOG.error("[{}] Failed to update configuration {}/{}, {}", clientAppId(), configName, configValue,
ie.getMessage(), ie);
throw new RestException(ie);
}
}

}
Loading

0 comments on commit 92b79d0

Please sign in to comment.