Skip to content

Commit

Permalink
Allow to configure the default number of bundles for new namespaces (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Oct 26, 2017
1 parent 16819f0 commit 8448198
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 68 deletions.
6 changes: 5 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ brokerDeduplicationEntriesInterval=1000
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# When a namespace is created without specifying the number of bundle, this
# value will be used as the default
defaultNumberOfNamespaceBundles=4

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down Expand Up @@ -287,7 +291,7 @@ managedLedgerMaxUnackedRangesToPersist=10000

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000


Expand Down
5 changes: 4 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ brokerDeduplicationEntriesInterval=1000
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# When a namespace is created without specifying the number of bundle, this
# value will be used as the default
defaultNumberOfNamespaceBundles=4

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
Expand Down Expand Up @@ -260,7 +263,7 @@ managedLedgerMaxUnackedRangesToPersist=10000

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
// relative to a disconnected producer. Default is 6 hours.
private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;

// When a namespace is created without specifying the number of bundle, this
// value will be used as the default
private int defaultNumberOfNamespaceBundles = 4;

// Enable check for minimum allowed client library version
private boolean clientLibraryVersionCheckEnabled = false;
// Allow client libraries with no version information
Expand Down Expand Up @@ -269,7 +273,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxUnackedRangesToPersist = 10000;
// Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
// than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
// zookeeper.
// zookeeper.
private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;

/*** --- Load balancer --- ****/
Expand Down Expand Up @@ -502,6 +506,14 @@ public void setBrokerDeduplicationProducerInactivityTimeoutMinutes(
this.brokerDeduplicationProducerInactivityTimeoutMinutes = brokerDeduplicationProducerInactivityTimeoutMinutes;
}

public int getDefaultNumberOfNamespaceBundles() {
return defaultNumberOfNamespaceBundles;
}

public void setDefaultNumberOfNamespaceBundles(int defaultNumberOfNamespaceBundles) {
this.defaultNumberOfNamespaceBundles = defaultNumberOfNamespaceBundles;
}

public long getBrokerDeleteInactiveTopicsFrequencySeconds() {
return brokerDeleteInactiveTopicsFrequencySeconds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ public void createNamespace(@PathParam("property") String property, @PathParam("
} else {
policies.bundles = validateBundlesData(initialBundles);
}
} else {
int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles();
policies.bundles = getBundles(defaultNumberOfBundles);
}

zkCreateOptimistic(path(POLICIES, property, cluster, namespace),
Expand Down Expand Up @@ -1174,7 +1177,7 @@ private void validatePersistencePolicies(PersistencePolicies persistence) {
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
"Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)", persistence.getBookkeeperEnsemble(),
persistence.getBookkeeperWriteQuorum(), persistence.getBookkeeperAckQuorum());
persistence.getBookkeeperWriteQuorum(), persistence.getBookkeeperAckQuorum());
}catch(NullPointerException | IllegalArgumentException e) {
throw new RestException(Status.PRECONDITION_FAILED, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void setup() throws Exception {
config.setWebServicePort(brokerWebServicePorts[i]);
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setDefaultNumberOfNamespaceBundles(1);
configurations[i] = config;

pulsarServices[i] = new PulsarService(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ protected void resetConfig() {
this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
this.conf.setManagedLedgerCacheSizeMB(8);
this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
this.conf.setDefaultNumberOfNamespaceBundles(1);
}

protected final void internalSetup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;

import static org.testng.Assert.assertEquals;

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.common.policies.data.Policies;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class NamespaceCreateBundlesTest extends BrokerTestBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setDefaultNumberOfNamespaceBundles(16);
super.baseSetup();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testCreateNamespaceWithDefaultBundles() throws Exception {
String namespaceName = "prop/use/default-bundles";

admin.namespaces().createNamespace(namespaceName);

Policies policies = admin.namespaces().getPolicies(namespaceName);
assertEquals(policies.bundles.numBundles, 16);
assertEquals(policies.bundles.boundaries.size(), 17);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,71 @@ public void testLoadReportDeserialize() throws Exception {
System.out.println(result2);
}

@Test
public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
OwnershipCache MockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
doNothing().when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), MockOwnershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = new NamespaceName("pulsar/global/ns1");
DestinationName dn = DestinationName.get("persistent://pulsar/global/ns1/topic-1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(dn);

// Split bundle and take ownership of split bundles
CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle);

try {
result.get();
} catch (Exception e) {
// make sure: no failure
fail("split bundle faild", e);
}
NamespaceBundleFactory bundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
NamespaceBundles updatedNsBundles = bundleFactory.getBundles(nsname);

// new updated bundles shouldn't be null
assertNotNull(updatedNsBundles);
List<NamespaceBundle> bundleList = updatedNsBundles.getBundles();
assertNotNull(bundles);

NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());

// (1) validate bundleFactory-cache has newly split bundles and removed old parent bundle
Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = splitBundles(utilityFactory, nsname, bundles,
originalBundle);
assertNotNull(splitBundles);
Set<NamespaceBundle> splitBundleSet = new HashSet<>(splitBundles.getRight());
splitBundleSet.removeAll(bundleList);
assertTrue(splitBundleSet.isEmpty());

// (2) validate LocalZookeeper policies updated with newly created split
// bundles
String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
byte[] content = this.pulsar.getLocalZkCache().getZooKeeper().getData(path, null, new Stat());
Policies policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);
NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles);
assertTrue(updatedNsBundles.equals(localZkBundles));
log.info("Policies: {}", policies);

// (3) validate ownership of new split bundles by local owner
bundleList.stream().forEach(b -> {
try {
byte[] data = this.pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(b), null,
new Stat());
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
NamespaceEphemeralData.class);
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
} catch (Exception e) {
fail("failed to setup ownership", e);
}
});

}


@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down
Loading

0 comments on commit 8448198

Please sign in to comment.