diff --git a/docs/AdminTools.md b/docs/AdminTools.md
index 0c175887d7e47..66690fe6dd366 100644
--- a/docs/AdminTools.md
+++ b/docs/AdminTools.md
@@ -14,6 +14,9 @@
 		- [Brokers](#brokers)
 			- [list of active brokers](#list-of-active-brokers)
 			- [list of namespaces owned by a given broker](#list-of-namespaces-owned-by-a-given-broker)
+			- [update dynamic configuration](#update-dynamic-configuration)
+			- [get list of dynamic configuration name](#get-list-of-dynamic-configuration-name)
+			- [get value of dynamic configurations](#get-value-of-dynamic-configurations)
 		- [Properties](#properties)
 			- [list existing properties](#list-existing-properties)
 			- [create property](#create-property)
@@ -233,6 +236,157 @@ GET /admin/brokers/{cluster}/{broker}/ownedNamespaces
 admin.brokers().getOwnedNamespaces(cluster,brokerUrl)
 ```
 
+#### update dynamic configuration
+Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.
+
+###### CLI
+
+```
+$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
+```
+
+```
+N/A
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/{configName}/{configValue}
+```
+
+###### Java
+
+```java
+admin.brokers().updateDynamicConfiguration(configName, configValue)
+```
+
+#### get list of dynamic configuration name
+It gives list of updatable dynamic service-configuration name.
+
+###### CLI
+
+```
+$ pulsar-admin brokers list-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration
+```
+
+###### Java
+
+```java
+admin.brokers().getDynamicConfigurationNames()
+```
+
+#### get value of dynamic configurations
+It gives value of all dynamic configurations stored in zookeeper
+
+###### CLI
+
+```
+$ pulsar-admin brokers get-all-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs:100
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/values
+```
+
+###### Java
+
+```java
+admin.brokers().getAllDynamicConfigurations()
+```
+
+#### Update dynamic configuration
+Broker can locally override value of updatable dynamic service-configurations that are stored into zookeeper. This interface allows to change the value of broker's dynamic-configuration into the zookeeper. Broker receives zookeeper-watch with new changed value and broker updates new value locally.
+
+###### CLI
+
+```
+$ pulsar-admin brokers update-dynamic-config brokerShutdownTimeoutMs 100
+```
+
+```
+N/A
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/{configName}/{configValue}
+```
+
+###### Java
+
+```java
+admin.brokers().updateDynamicConfiguration(configName, configValue)
+```
+
+#### Get list of dynamic configuration name
+It gives list of updatable dynamic service-configuration name.
+
+###### CLI
+
+```
+$ pulsar-admin brokers list-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration
+```
+
+###### Java
+
+```java
+admin.brokers().getDynamicConfigurationNames()
+```
+
+#### Get value of dynamic configurations
+It gives value of all dynamic configurations stored in zookeeper
+
+###### CLI
+
+```
+$ pulsar-admin brokers get-all-dynamic-config
+```
+
+```
+brokerShutdownTimeoutMs:100
+```
+
+###### REST
+
+```
+GET /admin/brokers/configuration/values
+```
+
+###### Java
+
+```java
+admin.brokers().getAllDynamicConfigurations()
+```
+
+
 
 ### Properties
 
diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
index ab3099362e5d5..87553b3fe13a8 100644
--- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
@@ -62,6 +62,7 @@ public class ServiceConfiguration implements PulsarConfiguration{
     private long zooKeeperSessionTimeoutMillis = 30000;
     // Time to wait for broker graceful shutdown. After this time elapses, the
     // process will be killed
+    @FieldContext(dynamic = true)
     private long brokerShutdownTimeoutMs = 3000;
     // Enable backlog quota check. Enforces action on topic when the quota is
     // reached
diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java
index cf693239b5484..465d35cfa2784 100644
--- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java
+++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/common/configuration/FieldContext.java
@@ -56,4 +56,11 @@
      * @return character length of field
      */
     public int maxCharLength() default Integer.MAX_VALUE;
+    
+    /**
+     * allow field to be updated dynamically
+     * 
+     * @return
+     */
+    public boolean dynamic() default false;
 }
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
index bce449bddafba..3fea37d54bc4b 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/admin/Brokers.java
@@ -15,32 +15,48 @@
  */
 package com.yahoo.pulsar.broker.admin;
 
+import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
+
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response.Status;
 
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+import com.yahoo.pulsar.broker.ServiceConfiguration;
+import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import com.yahoo.pulsar.broker.service.BrokerService;
+import com.yahoo.pulsar.broker.web.RestException;
+import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
-import com.yahoo.pulsar.broker.web.RestException;
-import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
+
 
 @Path("/brokers")
 @Api(value = "/brokers", description = "Brokers admin apis", tags = "brokers")
 @Produces(MediaType.APPLICATION_JSON)
 public class Brokers extends AdminResource {
     private static final Logger LOG = LoggerFactory.getLogger(Brokers.class);
-
+    private int serviceConfigZkVersion = -1;
+    
     @GET
     @Path("/{cluster}")
     @ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.", response = String.class, responseContainer = "Set")
@@ -79,4 +95,91 @@ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaes(@PathParam("clust
             throw new RestException(e);
         }
     }
+    
+    @POST
+    @Path("/configuration/{configName}/{configValue}")
+    @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
+    @ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
+            @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
+            @ApiResponse(code = 404, message = "Configuration not found"),
+            @ApiResponse(code = 412, message = "Configuration can't be updated dynamically") })
+    public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception{
+        validateSuperUserAccess();
+        updateDynamicConfigurationOnZk(configName, configValue);
+    }
+
+    @GET
+    @Path("/configuration/values")
+    @ApiOperation(value = "Get value of all dynamic configurations' value overridden on local config")
+    @ApiResponses(value = { @ApiResponse(code = 404, message = "Configuration not found") })
+    public Map<String, String> getAllDynamicConfigurations() throws Exception {
+        ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
+                .getDynamicConfigurationCache();
+        Map<String, String> configurationMap = null;
+        try {
+            configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find configuration in zk"));
+        } catch (RestException e) {
+            LOG.error("[{}] couldn't find any configuration in zk {}", clientAppId(), e.getMessage(), e);
+            throw e;
+        } catch (Exception e) {
+            LOG.error("[{}] Failed to retrieve configuration from zk {}", clientAppId(), e.getMessage(), e);
+            throw new RestException(e);
+        }
+        return configurationMap;
+    }
+
+    @GET
+    @Path("/configuration")
+    @ApiOperation(value = "Get all updatable dynamic configurations's name")
+    public List<String> getDynamicConfigurationName() {
+        return BrokerService.getDynamicConfigurationMap().keys();
+    }
+    
+    /**
+     * if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
+     * all other brokers get the watch and can see the change and take appropriate action on the change.
+     * 
+     * @param configName
+     *            : configuration key
+     * @param configValue
+     *            : configuration value
+     */
+    private synchronized void updateDynamicConfigurationOnZk(String configName, String configValue) {
+        try {
+            if (BrokerService.getDynamicConfigurationMap().containsKey(configName)) {
+                ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService()
+                        .getDynamicConfigurationCache();
+                Map<String, String> configurationMap = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH)
+                        .orElse(null);
+                if (configurationMap != null) {
+                    configurationMap.put(configName, configValue);
+                    byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
+                    dynamicConfigurationCache.invalidate(BROKER_SERVICE_CONFIGURATION_PATH);
+                    serviceConfigZkVersion = localZk()
+                            .setData(BROKER_SERVICE_CONFIGURATION_PATH, content, serviceConfigZkVersion).getVersion();
+                } else {
+                    configurationMap = Maps.newHashMap();
+                    configurationMap.put(configName, configValue);
+                    byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configurationMap);
+                    ZkUtils.createFullPathOptimistic(localZk(), BROKER_SERVICE_CONFIGURATION_PATH, content,
+                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+                LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", clientAppId(), configName,
+                            configValue);
+                }
+                throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
+            }
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception ie) {
+            LOG.error("[{}] Failed to update configuration {}/{}, {}", clientAppId(), configName, configValue,
+                    ie.getMessage(), ie);
+            throw new RestException(ie);
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
index c999790df3958..aa10c9331aead 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
@@ -19,14 +19,17 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -41,7 +44,6 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.commons.lang.SystemUtils;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -66,6 +68,7 @@
 import com.yahoo.pulsar.client.api.PulsarClientException;
 import com.yahoo.pulsar.client.impl.PulsarClientImpl;
 import com.yahoo.pulsar.client.util.FutureUtil;
+import com.yahoo.pulsar.common.configuration.FieldContext;
 import com.yahoo.pulsar.common.naming.DestinationName;
 import com.yahoo.pulsar.common.naming.NamespaceBundle;
 import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
@@ -77,8 +80,12 @@
 import com.yahoo.pulsar.common.policies.data.Policies;
 import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
 import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
+import com.yahoo.pulsar.common.util.FieldParser;
+import com.yahoo.pulsar.common.util.ObjectMapperFactory;
 import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
+import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
@@ -115,6 +122,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private final EventLoopGroup workerGroup;
     // offline topic backlog cache
     private final ConcurrentOpenHashMap<DestinationName, PersistentOfflineTopicStats> offlineTopicStatCache;
+    private static final ConcurrentOpenHashMap<String, Field> dynamicConfigurationMap = prepareDynamicConfigurationMap();
+    private final ConcurrentOpenHashMap<String, Consumer> configRegisteredListeners;
 
     private AuthorizationManager authorizationManager = null;
     private final ScheduledExecutorService statsUpdater;
@@ -132,6 +141,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private final int keepAliveIntervalSeconds;
     private final PulsarStats pulsarStats;
     private final AuthenticationService authenticationService;
+    
+    public static final String BROKER_SERVICE_CONFIGURATION_PATH = "/admin/configuration";
+    private final ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache;
 
     public BrokerService(PulsarService pulsar) throws Exception {
         this.pulsar = pulsar;
@@ -141,6 +153,7 @@ public BrokerService(PulsarService pulsar) throws Exception {
         this.topics = new ConcurrentOpenHashMap<>();
         this.replicationClients = new ConcurrentOpenHashMap<>();
         this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
+        this.configRegisteredListeners = new ConcurrentOpenHashMap<>();
 
         this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
         this.pulsarStats = new PulsarStats(pulsar);
@@ -186,6 +199,14 @@ public BrokerService(PulsarService pulsar) throws Exception {
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
         this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
+        
+        this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String, String>>(pulsar().getLocalZkCache()) {
+            @Override
+            public Map<String, String> deserialize(String key, byte[] content) throws Exception {
+                return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
+            }
+        };
+        updateConfigurationAndRegisterListeners();
 
         PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
     }
@@ -824,4 +845,117 @@ public AuthenticationService getAuthenticationService() {
     public List<PersistentTopic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
         return multiLayerTopicsMap.get(namespace).get(bundle).values();
     }
+    
+    public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache() {
+        return dynamicConfigurationCache;
+    }
+
+    /**
+     * Update dynamic-ServiceConfiguration with value present into zk-configuration-map and register listeners on
+     * dynamic-ServiceConfiguration field to take appropriate action on change of zk-configuration-map.
+     */
+    private void updateConfigurationAndRegisterListeners() {
+        // update ServiceConfiguration value by reading zk-configuration-map
+        updateDynamicServiceConfiguration();
+        //add more listeners here
+    }
+
+    /**
+     * Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate
+     * action if any specific config-field value has been changed.
+     * </p>
+     * On notification, listener should first check if config value has been changed and after taking appropriate
+     * action, listener should update config value with new value if it has been changed (so, next time listener can
+     * compare values on configMap change).
+     * @param <T>
+     * 
+     * @param configKey
+     *            : configuration field name
+     * @param listener
+     *            : listener which takes appropriate action on config-value change
+     */
+    public <T> void registerConfigurationListener(String configKey, Consumer<T> listener) {
+        configRegisteredListeners.put(configKey, listener);
+        dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public void onUpdate(String path, Map<String, String> data, Stat stat) {
+                if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null
+                        && data.containsKey(configKey)) {
+                    log.info("Updating configuration {}/{}", configKey, data.get(configKey));
+                    listener.accept((T) FieldParser.value(data.get(configKey), dynamicConfigurationMap.get(configKey)));
+                }
+            }
+        });
+    }
+
+    private void updateDynamicServiceConfiguration() {
+        try {
+            Optional<Map<String, String>> data = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
+            if (data.isPresent() && data.get() != null) {
+                data.get().forEach((key,value)-> {
+                    try {
+                        Field field = ServiceConfiguration.class.getDeclaredField(key);
+                        if (field != null && field.isAnnotationPresent(FieldContext.class)) {
+                            field.setAccessible(true);
+                            field.set(pulsar().getConfiguration(), FieldParser.value(value,field));
+                            log.info("Successfully updated {}/{}", key, value);
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to update service configuration {}/{}, {}",key,value,e.getMessage());
+                    }                    
+                });
+            }
+            // register a listener: it updates field value and triggers appropriate registered field-listener only if
+            // field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration
+            dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() {
+                @SuppressWarnings("unchecked")
+                @Override
+                public void onUpdate(String path, Map<String, String> data, Stat stat) {
+                    if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) {
+                        data.forEach((configKey, value) -> {
+                            Field configField = dynamicConfigurationMap.get(configKey);
+                            Object newValue = FieldParser.value(data.get(configKey), configField);
+                            if (configField != null) {
+                                Consumer listener = configRegisteredListeners.get(configKey);
+                                try {
+                                    Object existingValue = configField.get(pulsar.getConfiguration());
+                                    configField.set(pulsar.getConfiguration(), newValue);
+                                    log.info("Successfully updated configuration {}/{}", configKey,
+                                            data.get(configKey));
+                                    if (listener != null && !existingValue.equals(newValue)) {
+                                        listener.accept(newValue);
+                                    }
+                                } catch (Exception e) {
+                                    log.error("Failed to update config {}/{}", configKey, newValue);
+                                }
+                            } else {
+                                log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
+                            }
+                        });
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e);
+        }
+    }
+    
+    public static ConcurrentOpenHashMap<String, Field> getDynamicConfigurationMap() {
+        return dynamicConfigurationMap;
+    }
+
+    private static ConcurrentOpenHashMap<String, Field> prepareDynamicConfigurationMap() {
+        ConcurrentOpenHashMap<String, Field> dynamicConfigurationMap = new ConcurrentOpenHashMap<>();
+        for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
+            if (field != null && field.isAnnotationPresent(FieldContext.class)) {
+                field.setAccessible(true);
+                if (((FieldContext) field.getAnnotation(FieldContext.class)).dynamic()) {
+                    dynamicConfigurationMap.put(field.getName(), field);
+                }
+            }
+        }
+        return dynamicConfigurationMap;
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
index f909dcc21d6f3..cadb2fa111449 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
@@ -15,12 +15,15 @@
  */
 package com.yahoo.pulsar.broker.admin;
 
+import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -36,6 +39,9 @@
 import javax.ws.rs.client.WebTarget;
 
 import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -56,6 +62,7 @@
 import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import com.yahoo.pulsar.broker.namespace.NamespaceEphemeralData;
 import com.yahoo.pulsar.broker.namespace.NamespaceService;
+import com.yahoo.pulsar.broker.service.BrokerService;
 import com.yahoo.pulsar.client.admin.PulsarAdmin;
 import com.yahoo.pulsar.client.admin.PulsarAdminException;
 import com.yahoo.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -377,8 +384,116 @@ public void brokers() throws Exception {
         admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
         admin.clusters().deleteCluster("use");
         assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+        
     }
+    
+    /**
+     * <pre>
+     * Verifies: zk-update configuration updates service-config
+     * 1. create znode for dynamic-config
+     * 2. start pulsar service so, pulsar can set the watch on that znode
+     * 3. update the configuration with new value
+     * 4. wait and verify that new value has been updated
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testUpdateDynamicConfigurationWithZkWatch() throws Exception {
+        // create configuration znode
+        ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
+                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        // Now, znode is created: set the watch and listener on the znode
+        Method updateConfigListenerMethod = BrokerService.class
+                .getDeclaredMethod("updateConfigurationAndRegisterListeners");
+        updateConfigListenerMethod.setAccessible(true);
+        updateConfigListenerMethod.invoke(pulsar.getBrokerService());
+        pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
+        // (1) try to update dynamic field
+        final long shutdownTime = 10;
+        // update configuration
+        admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
+        // wait config to be updated
+        for (int i = 0; i < 5; i++) {
+            if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != shutdownTime) {
+                Thread.sleep(100 + (i * 10));
+            } else {
+                break;
+            }
+        }
+        // verify value is updated
+        assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
 
+        // (2) try to update non-dynamic field
+        try {
+            admin.brokers().updateDynamicConfiguration("zookeeperServers", "test-zk:1234");
+        } catch (Exception e) {
+            assertTrue(e instanceof PreconditionFailedException);
+        }
+
+        // (3) try to update non-existent field
+        try {
+            admin.brokers().updateDynamicConfiguration("test", Long.toString(shutdownTime));
+        } catch (Exception e) {
+            assertTrue(e instanceof PreconditionFailedException);
+        }
+
+    }
+
+    /**
+     * <pre>
+     * verifies: that registerListener updates pulsar.config value with newly updated zk-dynamic config
+     * NOTE: pulsar can't set the watch on non-existing znode
+     * So, when pulsar starts it is not able to set the watch on non-existing znode of dynamicConfiguration
+     * So, here, after creating znode we will trigger register explicitly
+     * 1.start pulsar
+     * 2.update zk-config with admin api
+     * 3. trigger watch and listener
+     * 4. verify that config is updated
+     * </pre>
+     * @throws Exception
+     */
+    @Test
+    public void testUpdateDynamicLocalConfiguration() throws Exception {
+        // (1) try to update dynamic field
+        final long shutdownTime = 10;
+        pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
+        // update configuration
+        admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
+        // Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
+        Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("updateConfigurationAndRegisterListeners");
+        getPermitZkNodeMethod.setAccessible(true);
+        getPermitZkNodeMethod.invoke(pulsar.getBrokerService());
+        // verify value is updated
+        assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+    }
+
+    @Test
+    public void testUpdatableConfigurationName() throws Exception {
+        // (1) try to update dynamic field
+        final String configName = "brokerShutdownTimeoutMs";
+        assertTrue(admin.brokers().getDynamicConfigurationNames().contains(configName));
+    }
+
+    @Test
+    public void testGetDynamicLocalConfiguration() throws Exception {
+        // (1) try to update dynamic field
+        final String configName = "brokerShutdownTimeoutMs";
+        final long shutdownTime = 10;
+        pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
+        try {
+            admin.brokers().getAllDynamicConfigurations();
+            fail("should have fail as configuration is not exist");
+        } catch (PulsarAdminException.NotFoundException ne) {
+            // ok : expected
+        }
+        assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+        // update configuration
+        admin.brokers().updateDynamicConfiguration(configName, Long.toString(shutdownTime));
+        // Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
+        assertEquals(Long.parseLong(admin.brokers().getAllDynamicConfigurations().get(configName)), shutdownTime);
+    }
+    
     @Test(enabled = true)
     public void properties() throws PulsarAdminException {
         Set<String> allowedClusters = Sets.newHashSet("use");
diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
index 4e07b61ff97b4..008f7e35334e0 100644
--- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
+++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/Brokers.java
@@ -67,4 +67,32 @@ public interface Brokers {
      * @throws PulsarAdminException
      */
     Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster, String brokerUrl) throws PulsarAdminException;
+    
+    /**
+	 * It updates dynamic configuration value in to Zk that triggers watch on
+	 * brokers and all brokers can update {@link ServiceConfiguration} value
+	 * locally
+	 * 
+	 * @param key
+	 * @param value
+	 * @throws PulsarAdminException
+	 */
+    void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException;
+
+    /**
+     * Get list of updatable configuration name
+     * 
+     * @return
+     * @throws PulsarAdminException
+     */
+    List<String> getDynamicConfigurationNames() throws PulsarAdminException;
+
+    /**
+     * Get values of all overridden dynamic-configs
+     * 
+     * @return
+     * @throws PulsarAdminException
+     */
+    Map<String, String> getAllDynamicConfigurations() throws PulsarAdminException;
+
 }
diff --git a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
index 3051a89c27e0c..2829b8a1a2e8d 100644
--- a/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
+++ b/pulsar-client-admin/src/main/java/com/yahoo/pulsar/client/admin/internal/BrokersImpl.java
@@ -18,12 +18,14 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 
 import com.yahoo.pulsar.client.admin.Brokers;
 import com.yahoo.pulsar.client.admin.PulsarAdminException;
 import com.yahoo.pulsar.client.api.Authentication;
+import com.yahoo.pulsar.common.policies.data.ErrorData;
 import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
 
 public class BrokersImpl extends BaseResource implements Brokers {
@@ -56,4 +58,34 @@ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(String cluster,
         }
     }
 
+    @Override
+    public void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException {
+        try {
+            request(brokers.path("/configuration/").path(configName).path(configValue)).post(Entity.json(""),
+                    ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public Map<String, String> getAllDynamicConfigurations() throws PulsarAdminException {
+        try {
+            return request(brokers.path("/configuration/").path("values")).get(new GenericType<Map<String, String>>() {
+            });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<String> getDynamicConfigurationNames() throws PulsarAdminException {
+        try {
+            return request(brokers.path("/configuration")).get(new GenericType<List<String>>() {
+            });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
 }
diff --git a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
index 82890d7ce3389..4eaa1d0688a05 100644
--- a/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
+++ b/pulsar-client-tools/src/main/java/com/yahoo/pulsar/admin/cli/CmdBrokers.java
@@ -48,9 +48,43 @@ void run() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Update dynamic-serviceConfiguration of broker")
+    private class UpdateConfigurationCmd extends CliCommand {
+        @Parameter(names = "--config", description = "service-configuration name", required = true)
+        private String configName;
+        @Parameter(names = "--value", description = "service-configuration value", required = true)
+        private String configValue;
+
+        @Override
+        void run() throws Exception {
+            admin.brokers().updateDynamicConfiguration(configName, configValue);
+        }
+    }
+
+    @Parameters(commandDescription = "Get all overridden dynamic-configuration values")
+    private class GetAllConfigurationsCmd extends CliCommand {
+
+        @Override
+        void run() throws Exception {
+            print(admin.brokers().getAllDynamicConfigurations());
+        }
+    }
+    
+    @Parameters(commandDescription = "Get list of updatable configuration name")
+    private class GetUpdatableConfigCmd extends CliCommand {
+
+        @Override
+        void run() throws Exception {
+            print(admin.brokers().getDynamicConfigurationNames());
+        }
+    }
+    
     CmdBrokers(PulsarAdmin admin) {
         super("brokers", admin);
         jcommander.addCommand("list", new List());
         jcommander.addCommand("namespaces", new Namespaces());
+        jcommander.addCommand("update-dynamic-config", new UpdateConfigurationCmd());
+        jcommander.addCommand("list-dynamic-config", new GetUpdatableConfigCmd());
+        jcommander.addCommand("get-all-dynamic-config", new GetAllConfigurationsCmd());
     }
 }
diff --git a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java
index 328da033d18ff..3dbf536737509 100644
--- a/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/com/yahoo/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -59,6 +59,15 @@ void brokers() throws Exception {
 
         brokers.run(split("list use"));
         verify(mockBrokers).getActiveBrokers("use");
+        
+        brokers.run(split("get-all-dynamic-config"));
+        verify(mockBrokers).getAllDynamicConfigurations();
+        
+        brokers.run(split("list-dynamic-config"));
+        verify(mockBrokers).getDynamicConfigurationNames();
+        
+        brokers.run(split("update-dynamic-config --config brokerShutdownTimeoutMs --value 100"));
+        verify(mockBrokers).updateDynamicConfiguration("brokerShutdownTimeoutMs", "100");
     }
 
     @Test
diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java
index a0bc1d1f8a066..e1ddf8e02a3e4 100644
--- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/FieldParser.java
@@ -135,7 +135,7 @@ public static <T> void update(Map<String, String> properties, T obj) throws Ille
      *            : field of the attribute
      * @return
      */
-    private static Object value(String strValue, Field field) {
+    public static Object value(String strValue, Field field) {
         checkNotNull(field);
         // if field is not primitive type
         if (field.getGenericType() instanceof ParameterizedType) {