From 55af39759d78db90eaa26008ce21f14063b51130 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 12 May 2021 14:16:37 -0700 Subject: [PATCH] Migrated BookkeeperSchemaStorage to use MetadataStore (#10545) * Migrated BookkeeperSchemaStorage to use MetadataStore * Fixed test * Fixed checkstyle * Fixed tests --- .../schema/BookkeeperSchemaStorage.java | 117 ++++++------------ .../BookkeeperSchemaStorageFactory.java | 6 +- .../schema/BookkeeperSchemaStorageTest.java | 4 +- .../service/schema/SchemaServiceTest.java | 1 - 4 files changed, 43 insertions(+), 85 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 3009236e9b871..f47d0f61f0ffb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -25,6 +25,8 @@ import static java.util.Objects.nonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; +import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; +import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.ByteBuffer; @@ -46,7 +48,6 @@ import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; -import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; @@ -55,14 +56,9 @@ import org.apache.pulsar.common.protocol.schema.StoredSchema; import org.apache.pulsar.common.schema.LongSchemaVersion; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.zookeeper.ZooKeeperCache; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; +import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.MetadataSerde; +import org.apache.pulsar.metadata.api.MetadataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,12 +66,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage { private static final Logger log = LoggerFactory.getLogger(BookkeeperSchemaStorage.class); private static final String SchemaPath = "/schemas"; - private static final List Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private static final byte[] LedgerPassword = "".getBytes(); + private final MetadataStore store; private final PulsarService pulsar; - private final ZooKeeper zooKeeper; - private final ZooKeeperCache localZkCache; + private final MetadataCache locatorEntryCache; + private final ServiceConfiguration config; private BookKeeper bookKeeper; @@ -85,20 +81,19 @@ public class BookkeeperSchemaStorage implements SchemaStorage { @VisibleForTesting BookkeeperSchemaStorage(PulsarService pulsar) { this.pulsar = pulsar; - this.localZkCache = pulsar.getLocalZkCache(); - this.zooKeeper = localZkCache.getZooKeeper(); + this.store = pulsar.getLocalMetadataStore(); this.config = pulsar.getConfiguration(); - } + this.locatorEntryCache = store.getMetadataCache(new MetadataSerde() { + @Override + public byte[] serialize(SchemaStorageFormat.SchemaLocator value) { + return value.toByteArray(); + } - @VisibleForTesting - public void init() throws KeeperException, InterruptedException { - try { - if (zooKeeper.exists(SchemaPath, false) == null) { - zooKeeper.create(SchemaPath, new byte[]{}, Acl, CreateMode.PERSISTENT); + @Override + public SchemaStorageFormat.SchemaLocator deserialize(byte[] content) throws IOException { + return SchemaStorageFormat.SchemaLocator.parseFrom(content); } - } catch (KeeperException.NodeExistsException error) { - // race on startup, ignore. - } + }); } @Override @@ -158,10 +153,6 @@ CompletableFuture> getLocator(String key) { return getSchemaLocator(getSchemaPath(key)); } - public void clearLocatorCache(String key) { - localZkCache.invalidate(getSchemaPath(key)); - } - public List getSchemaLedgerList(String key) throws IOException { Optional locatorEntry = null; try { @@ -302,7 +293,7 @@ private CompletableFuture putSchema(String schemaId, byte[] data, byte[] h updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash) .thenAccept(future::complete) .exceptionally(ex -> { - if (ex.getCause() instanceof KeeperException.BadVersionException) { + if (ex.getCause() instanceof BadVersionException) { // There was a race condition on the schema creation. // Since it has now been created, // retry the whole operation so that we have a chance to @@ -328,8 +319,8 @@ private CompletableFuture putSchema(String schemaId, byte[] data, byte[] h createNewSchema(schemaId, data, hash) .thenAccept(future::complete) .exceptionally(ex -> { - if (ex.getCause() instanceof NodeExistsException - || ex.getCause() instanceof KeeperException.BadVersionException) { + if (ex.getCause() instanceof AlreadyExistsException + || ex.getCause() instanceof BadVersionException) { // There was a race condition on the schema creation. Since it has now been created, // retry the whole operation so that we have a chance to recover without bubbling error // back to producer/consumer @@ -411,15 +402,13 @@ private CompletableFuture deleteSchema(String schemaId, boolean forceFully }); FutureUtil.waitForAll(deleteFutures).whenComplete((v, e) -> { final String path = getSchemaPath(schemaId); - ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, path, -1, (rc, path1, ctx) -> { - if (rc != Code.OK.intValue()) { - future.completeExceptionally(KeeperException.create(Code.get(rc))); - } else { - clearLocatorCache(getSchemaPath(schemaId)); - future.complete(version); - } - }, path); - + store.delete(path, Optional.empty()) + .thenRun(() -> { + future.complete(version); + }).exceptionally(ex1 -> { + future.completeExceptionally(ex1); + return null; + }); }); } }); @@ -468,7 +457,7 @@ private CompletableFuture updateSchemaLocator( .setInfo(info) .addAllIndex( concat(locator.getIndexList(), newArrayList(info)) - ).build(), locatorEntry.zkZnodeVersion + ).build(), locatorEntry.version ).thenApply(ignore -> nextVersion); } @@ -518,42 +507,21 @@ private CompletableFuture readSchemaEntry( @NotNull private CompletableFuture updateSchemaLocator(String id, - SchemaStorageFormat.SchemaLocator schema, int version) { - CompletableFuture future = new CompletableFuture<>(); - zooKeeper.setData(id, schema.toByteArray(), version, (rc, path, ctx, stat) -> { - Code code = Code.get(rc); - if (code != Code.OK) { - future.completeExceptionally(KeeperException.create(code)); - } else { - future.complete(null); - } - }, null); - return future; + SchemaStorageFormat.SchemaLocator schema, long version) { + return store.put(id, schema.toByteArray(), Optional.of(version)).thenApply(__ -> null); } @NotNull private CompletableFuture createSchemaLocator(String id, SchemaStorageFormat.SchemaLocator locator) { - CompletableFuture future = new CompletableFuture<>(); - - ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, id, locator.toByteArray(), Acl, - CreateMode.PERSISTENT, (rc, path, ctx, name) -> { - Code code = Code.get(rc); - if (code != Code.OK) { - future.completeExceptionally(KeeperException.create(code)); - } else { - // Newly created z-node will have version 0 - future.complete(new LocatorEntry(locator, 0)); - } - }, null); - - return future; + return store.put(id, locator.toByteArray(), Optional.of(-1L)) + .thenApply(stat -> new LocatorEntry(locator, stat.getVersion())); } @NotNull private CompletableFuture> getSchemaLocator(String schema) { - return localZkCache.getEntryAsync(schema, new SchemaLocatorDeserializer()).thenApply(optional -> - optional.map(entry -> new LocatorEntry(entry.getKey(), entry.getValue().getVersion())) - ); + return locatorEntryCache.getWithStats(schema) + .thenApply(o -> + o.map(r -> new LocatorEntry(r.getValue(), r.getStat().getVersion()))); } @NotNull @@ -692,20 +660,13 @@ static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId, long entr } } - static class SchemaLocatorDeserializer implements ZooKeeperCache.Deserializer { - @Override - public SchemaStorageFormat.SchemaLocator deserialize(String key, byte[] content) throws Exception { - return SchemaStorageFormat.SchemaLocator.parseFrom(content); - } - } - static class LocatorEntry { final SchemaStorageFormat.SchemaLocator locator; - final Integer zkZnodeVersion; + final long version; - LocatorEntry(SchemaStorageFormat.SchemaLocator locator, Integer zkZnodeVersion) { + LocatorEntry(SchemaStorageFormat.SchemaLocator locator, long version) { this.locator = locator; - this.zkZnodeVersion = zkZnodeVersion; + this.version = version; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java index 8304ed192e407..f3bf01000f0ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java @@ -26,9 +26,7 @@ public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory { @Override @NotNull - public SchemaStorage create(PulsarService pulsar) throws Exception { - BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar); - service.init(); - return service; + public SchemaStorage create(PulsarService pulsar) { + return new BookkeeperSchemaStorage(pulsar); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java index be16ec9f30c51..6d89fef19a5d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java @@ -22,7 +22,7 @@ import org.apache.bookkeeper.client.api.BKException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.schema.LongSchemaVersion; -import org.apache.pulsar.zookeeper.LocalZooKeeperCache; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.testng.annotations.Test; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.bkException; @@ -61,7 +61,7 @@ public void testVersionFromBytes() { byte[] versionBytesPost240 = bbPost240.array(); PulsarService mockPulsarService = mock(PulsarService.class); - when(mockPulsarService.getLocalZkCache()).thenReturn(mock(LocalZooKeeperCache.class)); + when(mockPulsarService.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class)); BookkeeperSchemaStorage schemaStorage = new BookkeeperSchemaStorage(mockPulsarService); assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPre240)); assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPost240)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java index 328db87098422..931a7f23b4d08 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java @@ -82,7 +82,6 @@ protected void setup() throws Exception { conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"); super.internalSetup(); BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar); - storage.init(); storage.start(); Map checkMap = new HashMap<>(); checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());