From 850c9448a5ac32e2f94988b8bf80955c93ef9d6c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 4 Oct 2022 08:22:03 +0200 Subject: [PATCH] Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914) This reverts commit 8d7ac33751c62383b510a04ec223981bd70cd4db. (cherry picked from commit 9d6c34ea5d77bb96ecc21b1ec3a18fa4b730e7bd) --- .../bookkeeper/BookieServiceInfoSerde.java | 55 +---------------- .../bookkeeper/PulsarRegistrationClient.java | 36 ----------- .../PulsarRegistrationClientTest.java | 59 ------------------- 3 files changed, 2 insertions(+), 148 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index b7e3024b637fb..78a33179e76b6 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,7 +24,6 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; -import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -64,57 +63,7 @@ public byte[] serialize(String path, BookieServiceInfo bookieServiceInfo) throws } @Override - public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { - // see https://github.com/apache/bookkeeper/blob/ - // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ - // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 - String bookieId = extractBookiedIdFromPath(path); - if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { - return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); - } - - BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); - BookieServiceInfo bsi = new BookieServiceInfo(); - List endpoints = builder.getEndpointsList().stream() - .map(e -> { - BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); - endpoint.setId(e.getId()); - endpoint.setPort(e.getPort()); - endpoint.setHost(e.getHost()); - endpoint.setProtocol(e.getProtocol()); - endpoint.setAuth(e.getAuthList()); - endpoint.setExtensions(e.getExtensionsList()); - return endpoint; - }) - .collect(Collectors.toList()); - - bsi.setEndpoints(endpoints); - bsi.setProperties(builder.getPropertiesMap()); - - return bsi; - - } - - /** - * Extract the BookieId - * The path should look like /ledgers/available/bookieId - * or /ledgers/available/readonly/bookieId. - * But the prefix depends on the configuration. - * @param path - * @return the bookieId - */ - private static String extractBookiedIdFromPath(String path) throws IOException { - // https://github.com/apache/bookkeeper/blob/ - // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ - // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 - if (path == null) { - path = ""; - } - int last = path.lastIndexOf("/"); - if (last >= 0) { - return path.substring(last + 1); - } else { - throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); - } + public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { + return null; } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 1c6924043182e..52b50e3ea4b08 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -25,21 +25,15 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.discover.BookieServiceInfo; import org.apache.bookkeeper.discover.RegistrationClient; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; -import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -55,14 +49,12 @@ public class PulsarRegistrationClient implements RegistrationClient { private final Map writableBookiesWatchers = new ConcurrentHashMap<>(); private final Map readOnlyBookiesWatchers = new ConcurrentHashMap<>(); - private final MetadataCache bookieServiceInfoMetadataCache; private final ScheduledExecutorService executor; public PulsarRegistrationClient(MetadataStore store, String ledgersRootPath) { this.store = store; this.ledgersRootPath = ledgersRootPath; - this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE); // Following Bookie Network Address Changes is an expensive operation // as it requires additional ZooKeeper watches @@ -161,32 +153,4 @@ private static Set convertToBookieAddresses(List children) { } return newBookieAddrs; } - - @Override - public CompletableFuture> getBookieServiceInfo(BookieId bookieId) { - String asWritable = bookieRegistrationPath + "/" + bookieId; - - return bookieServiceInfoMetadataCache.get(asWritable) - .thenCompose((Optional getResult) -> { - if (getResult.isPresent()) { - return CompletableFuture.completedFuture(new Versioned<>(getResult.get(), - new LongVersion(-1))); - } else { - return readBookieInfoAsReadonlyBookie(bookieId); - } - } - ); - } - - final CompletableFuture> readBookieInfoAsReadonlyBookie(BookieId bookieId) { - String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId; - return bookieServiceInfoMetadataCache.get(asReadonly) - .thenApply((Optional getResultAsReadOnly) -> { - if (getResultAsReadOnly.isPresent()) { - return new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1)); - } else { - throw new CompletionException(new BKException.BKBookieHandleNotAvailableException()); - } - }); - } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java index 914e70e406839..38195b230ce29 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java @@ -23,8 +23,6 @@ import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -116,63 +114,6 @@ public void testGetReadonlyBookies(String provider, Supplier urlSupplier assertEquals(addresses.size(), result.getValue().size()); } - @Test(dataProvider = "impl") - public void testGetBookieServiceInfo(String provider, Supplier urlSupplier) throws Exception { - @Cleanup - MetadataStoreExtended store = - MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); - - String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); - - @Cleanup - RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class)); - - @Cleanup - RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot); - - List addresses = new ArrayList<>(prepareNBookies(10)); - List bookieServiceInfos = new ArrayList<>(); - int port = 223; - for (BookieId address : addresses) { - BookieServiceInfo info = new BookieServiceInfo(); - BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); - endpoint.setAuth(Collections.emptyList()); - endpoint.setExtensions(Collections.emptyList()); - endpoint.setId("id"); - endpoint.setHost("localhost"); - endpoint.setPort(port++); - endpoint.setProtocol("bookie-rpc"); - info.setEndpoints(Arrays.asList(endpoint)); - bookieServiceInfos.add(info); - // some readonly, some writable - boolean readOnly = port % 2 == 0; - rm.registerBookie(address, readOnly, info); - } - - int i = 0; - for (BookieId address : addresses) { - BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue(); - compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++)); - } - - } - - private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) { - assertEquals(a.getProperties(), b.getProperties()); - assertEquals(a.getEndpoints().size(), b.getEndpoints().size()); - for (int i = 0; i < a.getEndpoints().size(); i++) { - BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i); - BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i); - assertEquals(e1.getHost(), e2.getHost()); - assertEquals(e1.getPort(), e2.getPort()); - assertEquals(e1.getId(), e2.getId()); - assertEquals(e1.getProtocol(), e2.getProtocol()); - assertEquals(e1.getExtensions(), e2.getExtensions()); - assertEquals(e1.getAuth(), e2.getAuth()); - } - - } - @Test(dataProvider = "impl") public void testGetAllBookies(String provider, Supplier urlSupplier) throws Exception { @Cleanup