From bfba8c8597cfcf0013a2ae1a6490dfd48f15fa76 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Thu, 22 Apr 2021 01:04:52 +0800 Subject: [PATCH] Add underReplicate state in the topic internal stats (#10013) * Add underReplicate state in the topic internal stats * Apply comments. --- .../mledger/impl/ManagedLedgerImpl.java | 15 ++++ .../apache/pulsar/broker/PulsarService.java | 6 ++ .../cache/LocalZooKeeperCacheService.java | 8 ++ .../service/persistent/PersistentTopic.java | 43 +++++++---- .../client/PulsarBrokerStatsClientTest.java | 5 ++ .../data/PersistentTopicInternalStats.java | 1 + .../zookeeper/ZooKeeperChildrenCache.java | 4 + .../tests/integration/admin/AdminTest.java | 75 +++++++++++++++++++ .../src/test/resources/pulsar-messaging.xml | 1 + 9 files changed, 145 insertions(+), 13 deletions(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3e0352782a0e9..d43b05fd90b48 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -39,6 +39,7 @@ import java.time.Clock; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -47,6 +48,7 @@ import java.util.Optional; import java.util.Queue; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -122,6 +124,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.Futures; +import org.apache.bookkeeper.net.BookieId; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; @@ -3735,4 +3738,16 @@ public void setEntriesAddedCounter(long count) { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class); + public CompletableFuture> getEnsemblesAsync(long ledgerId) { + LedgerInfo ledgerInfo = ledgers.get(ledgerId); + if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) { + return CompletableFuture.completedFuture(Collections.emptySet()); + } + + return getLedgerHandle(ledgerId).thenCompose(lh -> { + Set ensembles = new HashSet<>(); + lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll); + return CompletableFuture.completedFuture(ensembles); + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 86630bb6bcb81..b3c0aec426eee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -1503,6 +1504,10 @@ public CoordinationService getCoordinationService() { return coordinationService; } + public CompletableFuture> getAvailableBookiesAsync() { + return this.localZkCacheService.availableBookiesCache().getAsync(); + } + public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig, String workerConfigFile) throws IOException { WorkerConfig workerConfig = WorkerConfig.load(workerConfigFile); @@ -1550,4 +1555,5 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu ? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort())); return workerConfig; } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java index 07cc764616e16..e1970de709639 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZooKeeperCache; +import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache; import org.apache.zookeeper.CreateMode; @@ -48,6 +49,7 @@ public class LocalZooKeeperCacheService { private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers"; public static final String OWNER_INFO_ROOT = "/namespace"; public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies"; + public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available"; private final ZooKeeperCache cache; @@ -55,6 +57,7 @@ public class LocalZooKeeperCacheService { private ZooKeeperManagedLedgerCache managedLedgerListCache; private ResourceQuotaCache resourceQuotaCache; private ZooKeeperDataCache policiesCache; + private ZooKeeperChildrenCache availableBookiesCache; private ConfigurationCacheService configurationCacheService; @@ -119,6 +122,7 @@ public CompletableFuture>> getWithStatAsync( this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT); this.resourceQuotaCache = new ResourceQuotaCache(cache); this.resourceQuotaCache.initZK(); + this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT); } private void initZK() throws PulsarServerException { @@ -245,6 +249,10 @@ public ZooKeeperManagedLedgerCache managedLedgerListCache() { return this.managedLedgerListCache; } + public ZooKeeperChildrenCache availableBookiesCache() { + return this.availableBookiesCache; + } + public CompletableFuture managedLedgerExists(String persistentPath) { return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index deccca48b8ede..ab4483ac53953 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -65,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.net.BookieId; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -1775,20 +1777,35 @@ public CompletableFuture getInternalStats(boolean stats.ledgers = Lists.newArrayList(); List> futures = includeLedgerMetadata ? Lists.newArrayList() : null; - ml.getLedgersInfo().forEach((id, li) -> { - LedgerInfo info = new LedgerInfo(); - info.ledgerId = li.getLedgerId(); - info.entries = li.getEntries(); - info.size = li.getSize(); - info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); - stats.ledgers.add(info); - if (futures != null) { - futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { - if (ex == null) { - info.metadata = lMetadata; + CompletableFuture> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync(); + availableBookiesFuture.whenComplete((bookies, e) -> { + if (e != null) { + log.error("[{}] Failed to fetch available bookies.", topic, e); + statFuture.completeExceptionally(e); + } else { + ml.getLedgersInfo().forEach((id, li) -> { + LedgerInfo info = new LedgerInfo(); + info.ledgerId = li.getLedgerId(); + info.entries = li.getEntries(); + info.size = li.getSize(); + info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); + stats.ledgers.add(info); + if (futures != null) { + futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { + if (ex == null) { + info.metadata = lMetadata; + } + return null; + })); + futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> { + if (ex == null) { + info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString) + .collect(Collectors.toList())); + } + return null; + })); } - return null; - })); + }); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index 938f72108ee9b..576cbdfdce146 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import javax.ws.rs.ClientErrorException; import javax.ws.rs.ServerErrorException; + import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -124,6 +125,10 @@ public void testTopicInternalStats() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get(); assertNotNull(internalStats.ledgers.get(0).metadata); + // For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"] + // The registed bookie ID is 192.168.1.1:5000 + assertTrue(internalStats.ledgers.get(0).underReplicated); + CursorStats cursor = internalStats.cursors.get(subscriptionName); assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs); assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0 diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java index c9b5ae5ff15cb..47b777c86e0d2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java @@ -57,6 +57,7 @@ public static class LedgerInfo { public long size; public boolean offloaded; public String metadata; + public boolean underReplicated; } /** diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java index 3cc381fdda9a5..6b05f415ddc4b 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java @@ -66,6 +66,10 @@ public Set get(String path) throws KeeperException, InterruptedException return children; } + public CompletableFuture> getAsync() { + return getAsync(this.path); + } + public CompletableFuture> getAsync(String path) { return cache.getChildrenAsync(path, this); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java new file mode 100644 index 0000000000000..3400b6962f982 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/admin/AdminTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.admin; + +import static org.testng.Assert.assertNotNull; + +import java.util.function.Supplier; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.tests.integration.messaging.MessagingBase; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Integration tests for Pulsar Admin. + */ +@Slf4j +public class AdminTest extends MessagingBase { + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testUnderReplicatedState(Supplier serviceUrl, Supplier adminUrl) throws Exception { + + String topicName = getNonPartitionedTopic("replicated-state", true); + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl.get()) + .build(); + + @Cleanup + final PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl.get()) + .build(); + + @Cleanup + final Producer producer = client.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + for (int i = 0; i < 10; i++) { + MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send(); + assertNotNull(messageId); + } + + log.info("Successfully to publish 10 messages to {}", topicName); + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName); + Assert.assertTrue(stats.ledgers.size() > 0); + for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) { + Assert.assertFalse(ledger.underReplicated); + } + } +} diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml index 766d790bec971..e0ac367e6e19b 100644 --- a/tests/integration/src/test/resources/pulsar-messaging.xml +++ b/tests/integration/src/test/resources/pulsar-messaging.xml @@ -26,6 +26,7 @@ + \ No newline at end of file