From 844819843835ccc0a6fb1c150656d2ddb21fc1df Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 26 Oct 2017 14:02:37 -0700 Subject: [PATCH] Allow to configure the default number of bundles for new namespaces (#854) --- conf/broker.conf | 6 +- conf/standalone.conf | 5 +- .../pulsar/broker/ServiceConfiguration.java | 14 +- .../pulsar/broker/admin/Namespaces.java | 5 +- .../pulsar/broker/SLAMonitoringTest.java | 1 + .../auth/MockedPulsarServiceBaseTest.java | 1 + .../namespace/NamespaceCreateBundlesTest.java | 55 ++++++++ .../namespace/NamespaceServiceTest.java | 65 ++++++++++ .../client/api/BrokerServiceLookupTest.java | 121 +++++++++--------- site/docs/latest/deployment/Kubernetes.md | 4 +- site/ja/deployment/Kubernetes.md | 4 +- 11 files changed, 213 insertions(+), 68 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java diff --git a/conf/broker.conf b/conf/broker.conf index b672bf0ebc7d6..b842e6e299553 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -92,6 +92,10 @@ brokerDeduplicationEntriesInterval=1000 # relative to a disconnected producer. Default is 6 hours. brokerDeduplicationProducerInactivityTimeoutMinutes=360 +# When a namespace is created without specifying the number of bundle, this +# value will be used as the default +defaultNumberOfNamespaceBundles=4 + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false @@ -287,7 +291,7 @@ managedLedgerMaxUnackedRangesToPersist=10000 # Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into -# zookeeper. +# zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 diff --git a/conf/standalone.conf b/conf/standalone.conf index ccbb5c0921711..54e951dada96f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -85,6 +85,9 @@ brokerDeduplicationEntriesInterval=1000 # relative to a disconnected producer. Default is 6 hours. brokerDeduplicationProducerInactivityTimeoutMinutes=360 +# When a namespace is created without specifying the number of bundle, this +# value will be used as the default +defaultNumberOfNamespaceBundles=4 # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false @@ -260,7 +263,7 @@ managedLedgerMaxUnackedRangesToPersist=10000 # Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into -# zookeeper. +# zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 587f6a1cc11e8..eb4437270fb7c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -100,6 +100,10 @@ public class ServiceConfiguration implements PulsarConfiguration { // relative to a disconnected producer. Default is 6 hours. private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360; + // When a namespace is created without specifying the number of bundle, this + // value will be used as the default + private int defaultNumberOfNamespaceBundles = 4; + // Enable check for minimum allowed client library version private boolean clientLibraryVersionCheckEnabled = false; // Allow client libraries with no version information @@ -269,7 +273,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private int managedLedgerMaxUnackedRangesToPersist = 10000; // Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher // than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into - // zookeeper. + // zookeeper. private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000; /*** --- Load balancer --- ****/ @@ -502,6 +506,14 @@ public void setBrokerDeduplicationProducerInactivityTimeoutMinutes( this.brokerDeduplicationProducerInactivityTimeoutMinutes = brokerDeduplicationProducerInactivityTimeoutMinutes; } + public int getDefaultNumberOfNamespaceBundles() { + return defaultNumberOfNamespaceBundles; + } + + public void setDefaultNumberOfNamespaceBundles(int defaultNumberOfNamespaceBundles) { + this.defaultNumberOfNamespaceBundles = defaultNumberOfNamespaceBundles; + } + public long getBrokerDeleteInactiveTopicsFrequencySeconds() { return brokerDeleteInactiveTopicsFrequencySeconds; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java index f18ff2d39c35c..fdb727990e0f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java @@ -217,6 +217,9 @@ public void createNamespace(@PathParam("property") String property, @PathParam(" } else { policies.bundles = validateBundlesData(initialBundles); } + } else { + int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); + policies.bundles = getBundles(defaultNumberOfBundles); } zkCreateOptimistic(path(POLICIES, property, cluster, namespace), @@ -1174,7 +1177,7 @@ private void validatePersistencePolicies(PersistencePolicies persistence) { (persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum()) && (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()), "Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)", persistence.getBookkeeperEnsemble(), - persistence.getBookkeeperWriteQuorum(), persistence.getBookkeeperAckQuorum()); + persistence.getBookkeeperWriteQuorum(), persistence.getBookkeeperAckQuorum()); }catch(NullPointerException | IllegalArgumentException e) { throw new RestException(Status.PRECONDITION_FAILED, e.getMessage()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 6029ed5af2b02..e0d55a6612ebc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -91,6 +91,7 @@ void setup() throws Exception { config.setWebServicePort(brokerWebServicePorts[i]); config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); config.setBrokerServicePort(brokerNativeBrokerPorts[i]); + config.setDefaultNumberOfNamespaceBundles(1); configurations[i] = config; pulsarServices[i] = new PulsarService(config); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 849f5f34383c3..29f2ef65feeb9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -93,6 +93,7 @@ protected void resetConfig() { this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate this.conf.setManagedLedgerCacheSizeMB(8); this.conf.setActiveConsumerFailoverDelayTimeMillis(0); + this.conf.setDefaultNumberOfNamespaceBundles(1); } protected final void internalSetup() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java new file mode 100644 index 0000000000000..7bb6a8fa55cc3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.namespace; + +import static org.testng.Assert.assertEquals; + +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.common.policies.data.Policies; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class NamespaceCreateBundlesTest extends BrokerTestBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setDefaultNumberOfNamespaceBundles(16); + super.baseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testCreateNamespaceWithDefaultBundles() throws Exception { + String namespaceName = "prop/use/default-bundles"; + + admin.namespaces().createNamespace(namespaceName); + + Policies policies = admin.namespaces().getPolicies(namespaceName); + assertEquals(policies.bundles.numBundles, 16); + assertEquals(policies.bundles.boundaries.size(), 17); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 523c8c6e745a7..857104ab6b69b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -333,6 +333,71 @@ public void testLoadReportDeserialize() throws Exception { System.out.println(result2); } + @Test + public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception { + OwnershipCache MockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache()); + doNothing().when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class)); + Field ownership = NamespaceService.class.getDeclaredField("ownershipCache"); + ownership.setAccessible(true); + ownership.set(pulsar.getNamespaceService(), MockOwnershipCache); + NamespaceService namespaceService = pulsar.getNamespaceService(); + NamespaceName nsname = new NamespaceName("pulsar/global/ns1"); + DestinationName dn = DestinationName.get("persistent://pulsar/global/ns1/topic-1"); + NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); + NamespaceBundle originalBundle = bundles.findBundle(dn); + + // Split bundle and take ownership of split bundles + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle); + + try { + result.get(); + } catch (Exception e) { + // make sure: no failure + fail("split bundle faild", e); + } + NamespaceBundleFactory bundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory(); + NamespaceBundles updatedNsBundles = bundleFactory.getBundles(nsname); + + // new updated bundles shouldn't be null + assertNotNull(updatedNsBundles); + List bundleList = updatedNsBundles.getBundles(); + assertNotNull(bundles); + + NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + + // (1) validate bundleFactory-cache has newly split bundles and removed old parent bundle + Pair> splitBundles = splitBundles(utilityFactory, nsname, bundles, + originalBundle); + assertNotNull(splitBundles); + Set splitBundleSet = new HashSet<>(splitBundles.getRight()); + splitBundleSet.removeAll(bundleList); + assertTrue(splitBundleSet.isEmpty()); + + // (2) validate LocalZookeeper policies updated with newly created split + // bundles + String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString()); + byte[] content = this.pulsar.getLocalZkCache().getZooKeeper().getData(path, null, new Stat()); + Policies policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class); + NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles); + assertTrue(updatedNsBundles.equals(localZkBundles)); + log.info("Policies: {}", policies); + + // (3) validate ownership of new split bundles by local owner + bundleList.stream().forEach(b -> { + try { + byte[] data = this.pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(b), null, + new Stat()); + NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data, + NamespaceEphemeralData.class); + Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl()); + } catch (Exception e) { + fail("failed to setup ownership", e); + } + }); + + } + + @SuppressWarnings("unchecked") private Pair> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index ccd1ba4c46ef7..f57bcea65af20 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -94,6 +94,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase { @BeforeMethod @Override protected void setup() throws Exception { + conf.setDefaultNumberOfNamespaceBundles(1); super.init(); org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration(); clientConf.setStatsInterval(0, TimeUnit.SECONDS); @@ -108,15 +109,15 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - + /** * UsecaseL Multiple Broker => Lookup Redirection test - * + * * 1. Broker1 is a leader * 2. Lookup request reaches to Broker2 which redirects to leader (Broker1) with authoritative = false * 3. Leader (Broker1) finds out least loaded broker as Broker2 and redirects request to Broker2 with authoritative = true * 4. Broker2 receives final request to own a bundle with authoritative = true and client connects to Broker2 - * + * * @throws Exception */ @Test @@ -134,8 +135,8 @@ public void testMultipleBrokerLookup() throws Exception { PulsarService pulsar2 = startBroker(conf2); pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); - - + + LoadManager loadManager1 = spy(pulsar.getLoadManager().get()); LoadManager loadManager2 = spy(pulsar2.getLoadManager().get()); Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); @@ -144,23 +145,23 @@ public void testMultipleBrokerLookup() throws Exception { // mock: redirect request to leader [2] doReturn(true).when(loadManager2).isCentralized(); loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2)); - - // mock: return Broker2 as a Least-loaded broker when leader receies request [3] + + // mock: return Broker2 as a Least-loaded broker when leader receies request [3] doReturn(true).when(loadManager1).isCentralized(); SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null); doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class)); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); - + /**** started broker-2 ****/ URI brokerServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort()); PulsarClient pulsarClient2 = PulsarClient.create(brokerServiceUrl.toString(), new ClientConfiguration()); - + // load namespace-bundle by calling Broker2 Consumer consumer = pulsarClient2.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", new ConsumerConfiguration()); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", new ProducerConfiguration()); - + for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -184,16 +185,16 @@ public void testMultipleBrokerLookup() throws Exception { pulsar2.close(); loadManager1 = null; loadManager2 = null; - + } - + /** - * Usecase: Redirection due to different cluster - * 1. Broker1 runs on cluster: "use" and Broker2 runs on cluster: "use2" + * Usecase: Redirection due to different cluster + * 1. Broker1 runs on cluster: "use" and Broker2 runs on cluster: "use2" * 2. Broker1 receives "use2" cluster request => Broker1 reads "/clusters" from global-zookkeeper and * redirects request to Broker2 whch serves "use2" * 3. Broker2 receives redirect request and own namespace bundle - * + * * @throws Exception */ @Test @@ -211,42 +212,42 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception { conf2.setAdvertisedAddress("localhost"); conf2.setClusterName(newCluster); // Broker2 serves newCluster String broker2ServiceUrl = "pulsar://localhost:" + conf2.getBrokerServicePort(); - + admin.clusters().createCluster(newCluster, new ClusterData("http://127.0.0.1:" + BROKER_WEBSERVICE_PORT, null, broker2ServiceUrl, null)); admin.properties().createProperty(property, new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(newCluster))); admin.namespaces().createNamespace(property + "/" + newCluster + "/my-ns"); - - + + PulsarService pulsar2 = startBroker(conf2); pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); - + URI brokerServiceUrl = new URI(broker2ServiceUrl); PulsarClient pulsarClient2 = PulsarClient.create(brokerServiceUrl.toString(), new ClientConfiguration()); - + // enable authorization: so, broker can validate cluster and redirect if finds different cluster pulsar.getConfiguration().setAuthorizationEnabled(true); // restart broker with authorization enabled: it initialize AuthorizationManager stopBroker(); startBroker(); - + LoadManager loadManager2 = spy(pulsar2.getLoadManager().get()); Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); loadManagerField.setAccessible(true); - + // mock: return Broker2 as a Least-loaded broker when leader receies request doReturn(true).when(loadManager2).isCentralized(); SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getWebServiceAddress(), null); doReturn(resourceUnit).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class)); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager2)); /**** started broker-2 ****/ - + // load namespace-bundle by calling Broker2 Consumer consumer = pulsarClient.subscribe("persistent://my-property2/use2/my-ns/my-topic1", "my-subscriber-name", new ConsumerConfiguration()); Producer producer = pulsarClient2.createProducer("persistent://my-property2/use2/my-ns/my-topic1", new ProducerConfiguration()); - + for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -265,21 +266,21 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); producer.close(); - - // disable authorization + + // disable authorization pulsar.getConfiguration().setAuthorizationEnabled(false); pulsarClient2.close(); pulsar2.close(); loadManager2 = null; - + } - + /** - * Create #PartitionedTopic and let it served by multiple brokers which requries + * Create #PartitionedTopic and let it served by multiple brokers which requries * a. tcp partitioned-metadata-lookup - * b. multiple topic-lookup + * b. multiple topic-lookup * c. partitioned producer-consumer - * + * * @throws Exception */ @Test @@ -305,17 +306,17 @@ public void testPartitionTopicLookup() throws Exception { PulsarService pulsar2 = startBroker(conf2); pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); - - + + LoadManager loadManager1 = spy(pulsar.getLoadManager().get()); LoadManager loadManager2 = spy(pulsar2.getLoadManager().get()); Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); loadManagerField.setAccessible(true); - + // mock: return Broker2 as a Least-loaded broker when leader receies request doReturn(true).when(loadManager1).isCentralized(); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); - + // mock: redirect request to leader doReturn(true).when(loadManager2).isCentralized(); loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2)); @@ -347,7 +348,7 @@ public void testPartitionTopicLookup() throws Exception { consumer.unsubscribe(); consumer.close(); admin.persistentTopics().deletePartitionedTopic(dn.toString()); - + pulsar2.close(); loadManager2 = null; @@ -356,8 +357,8 @@ public void testPartitionTopicLookup() throws Exception { /** * 1. Start broker1 and broker2 with tls enable - * 2. Hit HTTPS lookup url at broker2 which redirects to HTTPS broker1 - * + * 2. Hit HTTPS lookup url at broker2 which redirects to HTTPS broker1 + * * @throws Exception */ @Test @@ -415,7 +416,7 @@ public void testWebserviceServiceTls() throws Exception { final String lookupResourceUrl = "/lookup/v2/destination/persistent/my-property/use/my-ns/my-topic1"; - // set client cert_key file + // set client cert_key file KeyManager[] keyManagers = null; Certificate[] tlsCert = SecurityUtility.loadCertificatesFromPemFile(TLS_CLIENT_CERT_FILE_PATH); PrivateKey tlsKey = SecurityUtility.loadPrivateKeyFromPemFile(TLS_CLIENT_KEY_FILE_PATH); @@ -450,13 +451,13 @@ public void testWebserviceServiceTls() throws Exception { loadManager2 = null; } - + /** * Discovery-Service lookup over binary-protocol * 1. Start discovery service * 2. start broker * 3. Create Producer/Consumer: by calling Discovery service for partitionedMetadata and topic lookup - * + * * @throws Exception */ @Test @@ -469,7 +470,7 @@ public void testDiscoveryLookup() throws Exception { DiscoveryService discoveryService = spy(new DiscoveryService(config)); doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); discoveryService.start(); - + // (2) lookup using discovery service final String discoverySvcUrl = discoveryService.getServiceUrl(); ClientConfiguration clientConfig = new ClientConfiguration(); @@ -477,7 +478,7 @@ public void testDiscoveryLookup() throws Exception { Consumer consumer = pulsarClient2.subscribe("persistent://my-property2/use2/my-ns/my-topic1", "my-subscriber-name", new ConsumerConfiguration()); Producer producer = pulsarClient2.createProducer("persistent://my-property2/use2/my-ns/my-topic1", new ProducerConfiguration()); - + for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -496,13 +497,13 @@ public void testDiscoveryLookup() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); producer.close(); - + } - - + + /** * Verify discovery-service binary-proto lookup using tls - * + * * @throws Exception */ @Test @@ -512,7 +513,7 @@ public void testDiscoveryLookupTls() throws Exception { final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key"; final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; - + // (1) restart broker1 with tls enabled conf.setTlsAllowInsecureConnection(true); conf.setTlsEnabled(true); @@ -520,7 +521,7 @@ public void testDiscoveryLookupTls() throws Exception { conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); stopBroker(); startBroker(); - + // (2) start discovery service ServiceConfig config = new ServiceConfig(); config.setServicePort(nextFreePort()); @@ -532,11 +533,11 @@ public void testDiscoveryLookupTls() throws Exception { DiscoveryService discoveryService = spy(new DiscoveryService(config)); doReturn(mockZooKeeperClientFactory).when(discoveryService).getZooKeeperClientFactory(); discoveryService.start(); - + // (3) lookup using discovery service final String discoverySvcUrl = discoveryService.getServiceUrlTls(); ClientConfiguration clientConfig = new ClientConfiguration(); - + Map authParams = new HashMap<>(); authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); @@ -545,13 +546,13 @@ public void testDiscoveryLookupTls() throws Exception { clientConfig.setAuthentication(auth); clientConfig.setUseTls(true); clientConfig.setTlsAllowInsecureConnection(true); - - + + PulsarClient pulsarClient2 = PulsarClient.create(discoverySvcUrl, clientConfig); Consumer consumer = pulsarClient2.subscribe("persistent://my-property2/use2/my-ns/my-topic1", "my-subscriber-name", new ConsumerConfiguration()); Producer producer = pulsarClient2.createProducer("persistent://my-property2/use2/my-ns/my-topic1", new ProducerConfiguration()); - + for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -570,7 +571,7 @@ public void testDiscoveryLookupTls() throws Exception { consumer.acknowledgeCumulative(msg); consumer.close(); producer.close(); - + } @Test @@ -696,7 +697,7 @@ public void testDiscoveryLookupAuthorizationFailure() throws Exception { ServiceConfig config = new ServiceConfig(); config.setServicePort(nextFreePort()); config.setBindOnLocalhost(true); - // set Authentication provider which returns "invalid" appid so, authorization fails + // set Authentication provider which returns "invalid" appid so, authorization fails Set providersClassNames = Sets.newHashSet(MockAuthorizationProviderFail.class.getName()); config.setAuthenticationProviders(providersClassNames); // enable authentication @@ -741,11 +742,11 @@ public void start() throws PulsarClientException { } /** - * + * *
      * When broker-1's load-manager splits the bundle and update local-policies, broker-2 should get watch of
      * local-policies and update bundleCache so, new lookup can be redirected properly.
-     * 
+     *
      * (1) Start broker-1 and broker-2
      * (2) Make sure broker-2 always assign bundle to broker1
      * (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
@@ -753,9 +754,9 @@ public void start() throws PulsarClientException {
      * (5) Split the bundle for topic-1
      * (6) Broker-2 should get the watch and update bundle cache
      * (7) Make lookup request again to Broker-2 which should succeed.
-     * 
+     *
      * 
- * + * * @throws Exception */ @Test(timeOut = 5000) diff --git a/site/docs/latest/deployment/Kubernetes.md b/site/docs/latest/deployment/Kubernetes.md index d880412e85f54..9ea4a8e713aec 100644 --- a/site/docs/latest/deployment/Kubernetes.md +++ b/site/docs/latest/deployment/Kubernetes.md @@ -237,8 +237,8 @@ $ bin/pulsar-admin properties create $MY_PROPERTY \ --admin-roles admin \ --allowed-clusters us-central -# Create a namespace that can be spread across up to 16 brokers -$ bin/pulsar-admin namespaces create $MY_NAMESPACE --bundles 16 +# Create a namespace +$ bin/pulsar-admin namespaces create $MY_NAMESPACE ``` #### Experimenting with your cluster diff --git a/site/ja/deployment/Kubernetes.md b/site/ja/deployment/Kubernetes.md index 2b0638a1830fb..873bf73c71027 100644 --- a/site/ja/deployment/Kubernetes.md +++ b/site/ja/deployment/Kubernetes.md @@ -237,8 +237,8 @@ $ bin/pulsar-admin properties create $MY_PROPERTY \ --admin-roles admin \ --allowed-clusters us-central -# 16のBrokerを横断しうるネームスペースの作成 -$ bin/pulsar-admin namespaces create $MY_NAMESPACE --bundles 16 +# 4のBrokerを横断しうるネームスペースの作成 +$ bin/pulsar-admin namespaces create $MY_NAMESPACE ``` #### 作成したクラスタでの実験