Skip to content

Commit

Permalink
Add underReplicate state in the topic internal stats (apache#10013)
Browse files Browse the repository at this point in the history
* Add underReplicate state in the topic internal stats

* Apply comments.
  • Loading branch information
codelipenghui authored Apr 21, 2021
1 parent 350fdab commit bfba8c8
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +48,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -122,6 +124,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
Expand Down Expand Up @@ -3735,4 +3738,16 @@ public void setEntriesAddedCounter(long count) {

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
LedgerInfo ledgerInfo = ledgers.get(ledgerId);
if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) {
return CompletableFuture.completedFuture(Collections.emptySet());
}

return getLedgerHandle(ledgerId).thenCompose(lh -> {
Set<BookieId> ensembles = new HashSet<>();
lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll);
return CompletableFuture.completedFuture(ensembles);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -1503,6 +1504,10 @@ public CoordinationService getCoordinationService() {
return coordinationService;
}

public CompletableFuture<Set<String>> getAvailableBookiesAsync() {
return this.localZkCacheService.availableBookiesCache().getAsync();
}

public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig,
String workerConfigFile) throws IOException {
WorkerConfig workerConfig = WorkerConfig.load(workerConfigFile);
Expand Down Expand Up @@ -1550,4 +1555,5 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort()));
return workerConfig;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.CreateMode;
Expand All @@ -48,13 +49,15 @@ public class LocalZooKeeperCacheService {
private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers";
public static final String OWNER_INFO_ROOT = "/namespace";
public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available";

private final ZooKeeperCache cache;

private ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache;
private ZooKeeperManagedLedgerCache managedLedgerListCache;
private ResourceQuotaCache resourceQuotaCache;
private ZooKeeperDataCache<LocalPolicies> policiesCache;
private ZooKeeperChildrenCache availableBookiesCache;

private ConfigurationCacheService configurationCacheService;

Expand Down Expand Up @@ -119,6 +122,7 @@ public CompletableFuture<Optional<Entry<LocalPolicies, Stat>>> getWithStatAsync(
this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
this.resourceQuotaCache = new ResourceQuotaCache(cache);
this.resourceQuotaCache.initZK();
this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT);
}

private void initZK() throws PulsarServerException {
Expand Down Expand Up @@ -245,6 +249,10 @@ public ZooKeeperManagedLedgerCache managedLedgerListCache() {
return this.managedLedgerListCache;
}

public ZooKeeperChildrenCache availableBookiesCache() {
return this.availableBookiesCache;
}

public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand All @@ -65,6 +66,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -1775,20 +1777,35 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean

stats.ledgers = Lists.newArrayList();
List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
ml.getLedgersInfo().forEach((id, li) -> {
LedgerInfo info = new LedgerInfo();
info.ledgerId = li.getLedgerId();
info.entries = li.getEntries();
info.size = li.getSize();
info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
stats.ledgers.add(info);
if (futures != null) {
futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
if (ex == null) {
info.metadata = lMetadata;
CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync();
availableBookiesFuture.whenComplete((bookies, e) -> {
if (e != null) {
log.error("[{}] Failed to fetch available bookies.", topic, e);
statFuture.completeExceptionally(e);
} else {
ml.getLedgersInfo().forEach((id, li) -> {
LedgerInfo info = new LedgerInfo();
info.ledgerId = li.getLedgerId();
info.entries = li.getEntries();
info.size = li.getSize();
info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
stats.ledgers.add(info);
if (futures != null) {
futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
if (ex == null) {
info.metadata = lMetadata;
}
return null;
}));
futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
if (ex == null) {
info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
.collect(Collectors.toList()));
}
return null;
}));
}
return null;
}));
});
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.ServerErrorException;

import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -124,6 +125,10 @@ public void testTopicInternalStats() throws Exception {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
PersistentTopicInternalStats internalStats = topic.getInternalStats(true).get();
assertNotNull(internalStats.ledgers.get(0).metadata);
// For the mock test, the default ensembles is ["192.0.2.1:1234","192.0.2.2:1234","192.0.2.3:1234"]
// The registed bookie ID is 192.168.1.1:5000
assertTrue(internalStats.ledgers.get(0).underReplicated);

CursorStats cursor = internalStats.cursors.get(subscriptionName);
assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static class LedgerInfo {
public long size;
public boolean offloaded;
public String metadata;
public boolean underReplicated;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public Set<String> get(String path) throws KeeperException, InterruptedException
return children;
}

public CompletableFuture<Set<String>> getAsync() {
return getAsync(this.path);
}

public CompletableFuture<Set<String>> getAsync(String path) {
return cache.getChildrenAsync(path, this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.admin;

import static org.testng.Assert.assertNotNull;

import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.tests.integration.messaging.MessagingBase;
import org.testng.Assert;
import org.testng.annotations.Test;

/**
* Integration tests for Pulsar Admin.
*/
@Slf4j
public class AdminTest extends MessagingBase {

@Test(dataProvider = "ServiceAndAdminUrls")
public void testUnderReplicatedState(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {

String topicName = getNonPartitionedTopic("replicated-state", true);

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl.get())
.build();

@Cleanup
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl.get())
.build();

@Cleanup
final Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.create();

for (int i = 0; i < 10; i++) {
MessageId messageId = producer.newMessage().value(producer.getProducerName() + "-" + i).send();
assertNotNull(messageId);
}

log.info("Successfully to publish 10 messages to {}", topicName);
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertTrue(stats.ledgers.size() > 0);
for (PersistentTopicInternalStats.LedgerInfo ledger : stats.ledgers) {
Assert.assertFalse(ledger.underReplicated);
}
}
}
1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-messaging.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
<class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
<class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
<class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
</classes>
</test>
</suite>

0 comments on commit bfba8c8

Please sign in to comment.