From f04eb517e40ea23779fc7c0264c3537dd7d6671f Mon Sep 17 00:00:00 2001 From: Dave Rusek Date: Thu, 8 Mar 2018 15:35:09 -0700 Subject: [PATCH] Schema registry (2/N) (#1319) * Schema Registry proto changes * Infrastructure to store schemas * Renumber schema fields * Update Pulsar API with schema changes * Revert field number change * Fix merge conflict * Fix broken merge * Address issues in review * Add schema type back to proto definition * Address comments regarding lombok usage * Remove reserved future enum fields * regenerate code from protobuf * Remove unused code * Add schema version to producer success message * plumb schema through to producer * Revert "Add schema version to producer success message" This reverts commit e7e72f468cf46f1605524a7399520c22763583c9. * Revert "Revert "Add schema version to producer success message"" This reverts commit 7b902f6bdb1cb054e26577747ff4dd8c326a6248. * Persist schema on producer connect * Add principal to schema on publish * Reformat function for readability * Remove unused protoc profile * Rename put on schema registry to putIfAbsent * wip: address review comments * switch underscore to slash in schema name * blah * Fix protobuf version incompatibility * Add appropriate license headers --- pom.xml | 14 + .../pulsar/broker/ServiceConfiguration.java | 12 +- pulsar-broker/pom.xml | 6 + .../apache/pulsar/broker/PulsarService.java | 21 +- .../pulsar/broker/service/Producer.java | 23 +- .../pulsar/broker/service/ServerCnx.java | 122 +- .../apache/pulsar/broker/service/Topic.java | 8 +- .../nonpersistent/NonPersistentTopic.java | 30 +- .../service/persistent/PersistentTopic.java | 11 + .../schema/DefaultSchemaRegistryService.java | 57 + .../broker/service/schema/SchemaRegistry.java | 79 + .../service/schema/SchemaRegistryService.java | 46 + .../schema/SchemaRegistryServiceImpl.java | 190 +++ .../broker/service/schema/SchemaStorage.java | 36 + .../service/schema/SchemaStorageFactory.java | 27 + .../broker/service/schema/StoredSchema.java | 68 + .../schema/proto/SchemaRegistryFormat.java | 1373 +++++++++++++++++ .../src/main/proto/SchemaRegistryFormat.proto | 45 + .../broker/service/PersistentTopicTest.java | 15 +- .../pulsar/broker/service/ServerCnxTest.java | 19 +- .../pulsar/client/api/ClientErrorsTest.java | 16 +- .../pulsar/client/api/MockBrokerService.java | 3 +- .../apache/pulsar/common/api/Commands.java | 14 +- .../pulsar/common/naming/TopicName.java | 6 + .../pulsar/common/schema/EmptyVersion.java | 28 + .../pulsar/common/schema/LatestVersion.java | 28 + .../pulsar/common/schema/SchemaData.java | 34 + .../pulsar/common/schema/SchemaType.java | 23 + .../pulsar/common/schema/SchemaVersion.java | 26 + 29 files changed, 2281 insertions(+), 99 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java create mode 100644 pulsar-broker/src/main/proto/SchemaRegistryFormat.proto create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java diff --git a/pom.xml b/pom.xml index 081fcdb327366..71371957e0b86 100644 --- a/pom.xml +++ b/pom.xml @@ -402,6 +402,20 @@ flexible messaging model and an intuitive client API. ${log4j2.version} + + org.apache.logging.log4j + log4j-api + test-jar + ${log4j2.version} + + + + org.apache.logging.log4j + log4j-core + test-jar + ${log4j2.version} + + org.apache.logging.log4j log4j-api diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index bd4797023417d..cc2a409274a53 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -415,6 +415,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(dynamic = true) private boolean preferLaterVersions = false; + private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"; + /**** --- WebSocket --- ****/ // Number of IO threads in Pulsar Client used in WebSocket proxy private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors(); @@ -1449,7 +1451,15 @@ public boolean exposeTopicLevelMetricsInPrometheus() { public void setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) { this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus; } - + + public String getSchemaRegistryStorageClassName() { + return schemaRegistryStorageClassName; + } + + public void setSchemaRegistryStorageClassName(String className) { + schemaRegistryStorageClassName = className; + } + public boolean authenticateOriginalAuthData() { return authenticateOriginalAuthData; } diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index abead555d6037..8681cad717030 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -54,6 +54,12 @@ netty-all + + com.google.protobuf + protobuf-java + ${protobuf2.version} + + ${project.groupId} pulsar-common diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6bbc1e62c8be3..51014a7e144dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -20,6 +20,9 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.net.URL; import java.util.List; @@ -34,7 +37,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; - import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; @@ -52,6 +54,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.web.WebService; @@ -80,11 +83,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Main class for Pulsar broker service */ @@ -123,6 +121,7 @@ public class PulsarService implements AutoCloseable { private final String brokerServiceUrl; private final String brokerServiceUrlTls; private final String brokerVersion; + private SchemaRegistryService schemaRegistryService = null; private final Optional functionWorkerService; private final MessagingServiceShutdownHook shutdownService; @@ -233,6 +232,10 @@ public void close() throws PulsarServerException { loadManager.stop(); } + if (schemaRegistryService != null) { + schemaRegistryService.close(); + } + state = State.Closed; } catch (Exception e) { @@ -359,6 +362,8 @@ public synchronized void brokerIsAFollowerNow() { this.metricsGenerator = new MetricsGenerator(this); + schemaRegistryService = SchemaRegistryService.create(this); + state = State.Started; acquireSLANamespace(); @@ -701,4 +706,8 @@ public AtomicReference getLoadManager() { public String getBrokerVersion() { return brokerVersion; } + + public SchemaRegistryService getSchemaRegistryService() { + return schemaRegistryService; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 9268ac7af2967..35bc3151bf47f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -23,12 +23,16 @@ import static org.apache.pulsar.common.api.Commands.hasChecksum; import static org.apache.pulsar.common.api.Commands.readChecksum; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLongFieldUpdater; - import org.apache.bookkeeper.mledger.util.Rate; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; @@ -42,17 +46,11 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats; import org.apache.pulsar.common.policies.data.PublisherStats; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.DateFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; - -import io.netty.buffer.ByteBuf; -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; - /** * Represents a currently connected producer */ @@ -82,8 +80,10 @@ public class Producer { private final Map metadata; + private final SchemaVersion schemaVersion; + public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId, - boolean isEncrypted, Map metadata) { + boolean isEncrypted, Map metadata, SchemaVersion schemaVersion) { this.topic = topic; this.cnx = cnx; this.producerId = producerId; @@ -110,6 +110,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null; this.isEncrypted = isEncrypted; + this.schemaVersion = schemaVersion; } @Override @@ -492,6 +493,10 @@ public void checkEncryption() { } } + public SchemaVersion getSchemaVersion() { + return schemaVersion; + } + private static final Logger log = LoggerFactory.getLogger(Producer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a386ece9a156d..e0ff275f6451c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -25,6 +25,12 @@ import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse; import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5; +import com.google.protobuf.GeneratedMessageLite; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.handler.ssl.SslHandler; import java.net.SocketAddress; import java.util.List; import java.util.Map; @@ -32,6 +38,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import org.apache.bookkeeper.mledger.Position; @@ -52,6 +59,7 @@ import org.apache.pulsar.common.api.CommandUtils; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarHandler; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; @@ -74,24 +82,18 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; -import com.google.protobuf.GeneratedMessageLite; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOption; -import io.netty.handler.ssl.SslHandler; - public class ServerCnx extends PulsarHandler { private final BrokerService service; private final ConcurrentLongHashMap> producers; @@ -697,6 +699,36 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { }); } + private static SchemaType getType(PulsarApi.Schema.Type protocolType) { + switch (protocolType) { + case Json: + return SchemaType.JSON; + case Avro: + return SchemaType.AVRO; + case Thrift: + return SchemaType.THRIFT; + case Protobuf: + return SchemaType.PROTOBUF; + default: + return SchemaType.NONE; + } + } + + private SchemaData getSchema(PulsarApi.Schema protocolSchema) { + return SchemaData.builder() + .data(protocolSchema.getSchemaData().toByteArray()) + .isDeleted(false) + .timestamp(System.currentTimeMillis()) + .user(originalPrincipal) + .type(getType(protocolSchema.getType())) + .props(protocolSchema.getPropertiesList().stream().collect( + Collectors.toMap( + PulsarApi.KeyValue::getKey, + PulsarApi.KeyValue::getValue + ) + )).build(); + } + @Override protected void handleProducer(final CommandProducer cmdProducer) { checkArgument(state == State.Connected); @@ -752,7 +784,8 @@ protected void handleProducer(final CommandProducer cmdProducer) { Producer producer = existingProducerFuture.getNow(null); log.info("[{}] Producer with the same id is already created: {}", remoteAddress, producer); - ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName())); + ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName(), + producer.getSchemaVersion())); return null; } else { // There was an early request to create a producer with @@ -805,41 +838,56 @@ protected void handleProducer(final CommandProducer cmdProducer) { disableTcpNoDelayIfNeeded(topicName.toString(), producerName); - Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, - isEncrypted, metadata); + CompletableFuture schemaVersionFuture; + if (cmdProducer.hasSchema()) { + schemaVersionFuture = topic.addSchema(getSchema(cmdProducer.getSchema())); + } else { + schemaVersionFuture = CompletableFuture.completedFuture(SchemaVersion.Empty); + } + + schemaVersionFuture.exceptionally(exception -> { + ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, exception.getMessage())); + producers.remove(producerId, producerFuture); + return null; + }); + + schemaVersionFuture.thenAccept(schemaVersion -> { + Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole, + isEncrypted, metadata, schemaVersion); - try { - topic.addProducer(producer); + try { + topic.addProducer(producer); - if (isActive()) { - if (producerFuture.complete(producer)) { - log.info("[{}] Created new producer: {}", remoteAddress, producer); - ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName, - producer.getLastSequenceId())); - return; + if (isActive()) { + if (producerFuture.complete(producer)) { + log.info("[{}] Created new producer: {}", remoteAddress, producer); + ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName, + producer.getLastSequenceId(), producer.getSchemaVersion())); + return; + } else { + // The producer's future was completed before by + // a close command + producer.closeNow(); + log.info("[{}] Cleared producer created after timeout on client side {}", + remoteAddress, producer); + } } else { - // The producer's future was completed before by - // a close command producer.closeNow(); - log.info("[{}] Cleared producer created after timeout on client side {}", - remoteAddress, producer); - } - } else { - producer.closeNow(); - log.info("[{}] Cleared producer created after connection was closed: {}", + log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress, producer); - producerFuture.completeExceptionally( + producerFuture.completeExceptionally( new IllegalStateException("Producer created after connection was closed")); - } - } catch (BrokerServiceException ise) { - log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName, + } + } catch (BrokerServiceException ise) { + log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName, ise.getMessage()); - ctx.writeAndFlush(Commands.newError(requestId, + ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise), ise.getMessage())); - producerFuture.completeExceptionally(ise); - } + producerFuture.completeExceptionally(ise); + } - producers.remove(producerId, producerFuture); + producers.remove(producerId, producerFuture); + }); }).exceptionally(exception -> { Throwable cause = exception.getCause(); if (!(cause instanceof ServiceUnitNotReadyException)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 80aed778b5559..be60116a4ccda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.broker.service; +import io.netty.buffer.ByteBuf; import java.util.Map; import java.util.concurrent.CompletableFuture; - import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; @@ -30,13 +30,13 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicStats; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; -import io.netty.buffer.ByteBuf; - public interface Topic { public interface PublishContext { @@ -125,4 +125,6 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats PersistentTopicInternalStats getInternalStats(); Position getLastMessageId(); + + CompletableFuture addSchema(SchemaData schema); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index ef355f0eaf7ed..4a6de05e015e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -22,6 +22,13 @@ import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import com.carrotsearch.hppc.ObjectObjectHashMap; +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FastThreadLocal; import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; @@ -70,6 +76,8 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublisherStats; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; @@ -79,15 +87,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.carrotsearch.hppc.ObjectObjectHashMap; -import com.google.common.base.MoreObjects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import io.netty.buffer.ByteBuf; -import io.netty.util.concurrent.FastThreadLocal; - public class NonPersistentTopic implements Topic { private final String topic; @@ -973,7 +972,14 @@ public void markBatchMessagePublished() { this.hasBatchMessagePublished = true; } - - private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class); + + @Override + public CompletableFuture addSchema(SchemaData schema) { + String base = TopicName.get(getName()).getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + return brokerService.pulsar() + .getSchemaRegistryService() + .putSchemaIfAbsent(id, schema); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 62017f0f6ab6e..cdca1cd8f98b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -92,6 +92,8 @@ import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -1601,4 +1603,13 @@ public Position getLastMessageId() { } private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); + + @Override + public CompletableFuture addSchema(SchemaData schema) { + String base = TopicName.get(getName()).getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + return brokerService.pulsar() + .getSchemaRegistryService() + .putSchemaIfAbsent(id, schema); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java new file mode 100644 index 0000000000000..db3b9f7a9351b --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaVersion; + +public class DefaultSchemaRegistryService implements SchemaRegistryService { + @Override + public CompletableFuture getSchema(String schemaId) { + return completedFuture(null); + } + + @Override + public CompletableFuture getSchema(String schemaId, SchemaVersion version) { + return completedFuture(null); + } + + @Override + public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema) { + return completedFuture(null); + } + + @Override + public CompletableFuture deleteSchema(String schemaId, String user) { + return completedFuture(null); + } + + @Override + public SchemaVersion versionFromBytes(byte[] version) { + return null; + } + + @Override + public void close() throws Exception { + + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java new file mode 100644 index 0000000000000..4dfbd6d9909b4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import com.google.common.base.MoreObjects; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaVersion; + +public interface SchemaRegistry extends AutoCloseable { + + CompletableFuture getSchema(String schemaId); + + CompletableFuture getSchema(String schemaId, SchemaVersion version); + + CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema); + + CompletableFuture deleteSchema(String schemaId, String user); + + SchemaVersion versionFromBytes(byte[] version); + + class SchemaAndMetadata { + public final String id; + public final SchemaData schema; + public final SchemaVersion version; + + SchemaAndMetadata(String id, SchemaData schema, SchemaVersion version) { + this.id = id; + this.schema = schema; + this.version = version; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaAndMetadata that = (SchemaAndMetadata) o; + return version == that.version && + Objects.equals(id, that.id) && + Objects.equals(schema, that.schema); + } + + @Override + public int hashCode() { + return Objects.hash(id, schema, version); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("schema", schema) + .add("version", version) + .toString(); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java new file mode 100644 index 0000000000000..69e736481616e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import java.lang.reflect.Method; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface SchemaRegistryService extends SchemaRegistry { + String CreateMethodName = "create"; + Logger log = LoggerFactory.getLogger(SchemaRegistryService.class); + + static SchemaRegistryService create(PulsarService pulsar) { + try { + ServiceConfiguration config = pulsar.getConfiguration(); + final Class storageClass = Class.forName(config.getSchemaRegistryStorageClassName()); + Object factoryInstance = storageClass.newInstance(); + Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class); + SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar); + return new SchemaRegistryServiceImpl(schemaStorage); + } catch (Exception e) { + log.warn("Error when trying to create scehema registry storage: {}", e); + } + return new DefaultSchemaRegistryService(); + } + + void close() throws Exception; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java new file mode 100644 index 0000000000000..1aec0395029c7 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import static java.util.Objects.isNull; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import javax.validation.constraints.NotNull; +import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.schema.SchemaVersion; + +public class SchemaRegistryServiceImpl implements SchemaRegistryService { + private final SchemaStorage schemaStorage; + private final Clock clock; + + @VisibleForTesting + SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Clock clock) { + this.schemaStorage = schemaStorage; + this.clock = clock; + } + + @VisibleForTesting + SchemaRegistryServiceImpl(SchemaStorage schemaStorage) { + this(schemaStorage, Clock.systemUTC()); + } + + @Override + @NotNull + public CompletableFuture getSchema(String schemaId) { + return getSchema(schemaId, SchemaVersion.Latest); + } + + @Override + @NotNull + public CompletableFuture getSchema(String schemaId, SchemaVersion version) { + return schemaStorage.get(schemaId, version).thenCompose(stored -> { + if (isNull(stored)) { + return completedFuture(null); + } else { + return Functions.bytesToSchemaInfo(stored.data) + .thenApply(Functions::schemaInfoToSchema) + .thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version)); + } + } + ); + } + + @Override + @NotNull + public CompletableFuture putSchemaIfAbsent(String schemaId, SchemaData schema) { + SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() + .setType(Functions.convertFromDomainType(schema.getType())) + .setSchema(ByteString.copyFrom(schema.getData())) + .setSchemaId(schemaId) + .setUser(schema.getUser()) + .setDeleted(false) + .setTimestamp(clock.millis()) + .addAllProps(toPairs(schema.getProps())) + .build(); + return schemaStorage.put(schemaId, info.toByteArray()); + } + + @Override + @NotNull + public CompletableFuture deleteSchema(String schemaId, String user) { + byte[] deletedEntry = deleted(schemaId, user).toByteArray(); + return schemaStorage.put(schemaId, deletedEntry); + } + + @Override + public SchemaVersion versionFromBytes(byte[] version) { + return schemaStorage.versionFromBytes(version); + } + + @Override + public void close() throws Exception { + schemaStorage.close(); + } + + private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) { + return SchemaRegistryFormat.SchemaInfo.newBuilder() + .setSchemaId(schemaId) + .setType(SchemaRegistryFormat.SchemaInfo.SchemaType.NONE) + .setSchema(ByteString.EMPTY) + .setUser(user) + .setDeleted(true) + .setTimestamp(clock.millis()) + .build(); + } + + interface Functions { + static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) { + switch (type) { + case AVRO: + return SchemaType.AVRO; + case JSON: + return SchemaType.JSON; + case PROTO: + return SchemaType.PROTOBUF; + case THRIFT: + return SchemaType.THRIFT; + default: + return SchemaType.NONE; + } + } + + static SchemaRegistryFormat.SchemaInfo.SchemaType convertFromDomainType(SchemaType type) { + switch (type) { + case AVRO: + return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO; + case JSON: + return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON; + case THRIFT: + return SchemaRegistryFormat.SchemaInfo.SchemaType.THRIFT; + case PROTOBUF: + return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTO; + default: + return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; + } + } + + static Map toMap(List pairs) { + Map map = new HashMap<>(); + for (SchemaRegistryFormat.SchemaInfo.KeyValuePair pair : pairs) { + map.put(pair.getKey(), pair.getValue()); + } + return map; + } + + static List toPairs(Map map) { + List pairs = new ArrayList<>(map.size()); + for (Map.Entry entry : map.entrySet()) { + SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder = + SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder(); + pairs.add(builder.setKey(entry.getKey()).setValue(entry.getValue()).build()); + } + return pairs; + } + + static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo info) { + return SchemaData.builder() + .user(info.getUser()) + .type(convertToDomainType(info.getType())) + .data(info.getSchema().toByteArray()) + .isDeleted(info.getDeleted()) + .props(toMap(info.getPropsList())) + .build(); + } + + static CompletableFuture bytesToSchemaInfo(byte[] bytes) { + CompletableFuture future; + try { + future = completedFuture(SchemaRegistryFormat.SchemaInfo.parseFrom(bytes)); + } catch (InvalidProtocolBufferException e) { + future = new CompletableFuture<>(); + future.completeExceptionally(e); + } + return future; + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java new file mode 100644 index 0000000000000..4d0d8af1871a8 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.schema.SchemaVersion; + +public interface SchemaStorage { + + CompletableFuture put(String key, byte[] value); + + CompletableFuture get(String key, SchemaVersion version); + + CompletableFuture delete(String key); + + SchemaVersion versionFromBytes(byte[] version); + + void close() throws Exception; + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java new file mode 100644 index 0000000000000..c4cff34b3f964 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import javax.validation.constraints.NotNull; +import org.apache.pulsar.broker.PulsarService; + +public interface SchemaStorageFactory { + @NotNull + SchemaStorage create(PulsarService pulsar) throws Exception; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java new file mode 100644 index 0000000000000..f28a70797ccbe --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import com.google.common.base.MoreObjects; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import org.apache.pulsar.common.schema.SchemaVersion; + +public class StoredSchema { + public final byte[] data; + public final SchemaVersion version; + public final Map metadata; + + public StoredSchema(byte[] data, SchemaVersion version, Map metadata) { + this.data = data; + this.version = version; + this.metadata = metadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StoredSchema that = (StoredSchema) o; + return Arrays.equals(data, that.data) && + Objects.equals(version, that.version) && + Objects.equals(metadata, that.metadata); + } + + @Override + public int hashCode() { + + int result = Objects.hash(version, metadata); + result = 31 * result + Arrays.hashCode(data); + return result; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("data", data) + .add("version", version) + .add("metadata", metadata) + .toString(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java new file mode 100644 index 0000000000000..39227313c1b38 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java @@ -0,0 +1,1373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: src/main/proto/SchemaRegistryFormat.proto + +package org.apache.pulsar.broker.service.schema.proto; + +public final class SchemaRegistryFormat { + private SchemaRegistryFormat() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + public interface SchemaInfoOrBuilder + extends com.google.protobuf.MessageLiteOrBuilder { + + // required string schema_id = 1; + boolean hasSchemaId(); + String getSchemaId(); + + // required string user = 2; + boolean hasUser(); + String getUser(); + + // required .pulsar.schema.SchemaInfo.SchemaType type = 3; + boolean hasType(); + SchemaRegistryFormat.SchemaInfo.SchemaType getType(); + + // required bytes schema = 4; + boolean hasSchema(); + com.google.protobuf.ByteString getSchema(); + + // required int64 timestamp = 5; + boolean hasTimestamp(); + long getTimestamp(); + + // required bool deleted = 6; + boolean hasDeleted(); + boolean getDeleted(); + + // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7; + java.util.List + getPropsList(); + SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index); + int getPropsCount(); + } + public static final class SchemaInfo extends + com.google.protobuf.GeneratedMessageLite + implements SchemaInfoOrBuilder { + // Use SchemaInfo.newBuilder() to construct. + private SchemaInfo(Builder builder) { + super(builder); + } + private SchemaInfo(boolean noInit) {} + + private static final SchemaInfo defaultInstance; + public static SchemaInfo getDefaultInstance() { + return defaultInstance; + } + + public SchemaInfo getDefaultInstanceForType() { + return defaultInstance; + } + + public enum SchemaType + implements com.google.protobuf.Internal.EnumLite { + NONE(0, 1), + THRIFT(1, 2), + AVRO(2, 3), + JSON(3, 4), + PROTO(4, 5), + ; + + public static final int NONE_VALUE = 1; + public static final int THRIFT_VALUE = 2; + public static final int AVRO_VALUE = 3; + public static final int JSON_VALUE = 4; + public static final int PROTO_VALUE = 5; + + + public final int getNumber() { return value; } + + public static SchemaType valueOf(int value) { + switch (value) { + case 1: return NONE; + case 2: return THRIFT; + case 3: return AVRO; + case 4: return JSON; + case 5: return PROTO; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public SchemaType findValueByNumber(int number) { + return SchemaType.valueOf(number); + } + }; + + private final int value; + + private SchemaType(int index, int value) { + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:pulsar.schema.SchemaInfo.SchemaType) + } + + public interface KeyValuePairOrBuilder + extends com.google.protobuf.MessageLiteOrBuilder { + + // required string key = 1; + boolean hasKey(); + String getKey(); + + // required string value = 2; + boolean hasValue(); + String getValue(); + } + public static final class KeyValuePair extends + com.google.protobuf.GeneratedMessageLite + implements KeyValuePairOrBuilder { + // Use KeyValuePair.newBuilder() to construct. + private KeyValuePair(Builder builder) { + super(builder); + } + private KeyValuePair(boolean noInit) {} + + private static final KeyValuePair defaultInstance; + public static KeyValuePair getDefaultInstance() { + return defaultInstance; + } + + public KeyValuePair getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // required string key = 1; + public static final int KEY_FIELD_NUMBER = 1; + private java.lang.Object key_; + public boolean hasKey() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getKey() { + java.lang.Object ref = key_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + key_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getKeyBytes() { + java.lang.Object ref = key_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + key_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required string value = 2; + public static final int VALUE_FIELD_NUMBER = 2; + private java.lang.Object value_; + public boolean hasValue() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public String getValue() { + java.lang.Object ref = value_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + value_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getValueBytes() { + java.lang.Object ref = value_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + value_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + key_ = ""; + value_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasKey()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasValue()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getKeyBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getValueBytes()); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getKeyBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getValueBytes()); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(SchemaRegistryFormat.SchemaInfo.KeyValuePair prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessageLite.Builder< + SchemaRegistryFormat.SchemaInfo.KeyValuePair, Builder> + implements SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder { + // Construct using org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + key_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + value_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public SchemaRegistryFormat.SchemaInfo.KeyValuePair getDefaultInstanceForType() { + return SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance(); + } + + public SchemaRegistryFormat.SchemaInfo.KeyValuePair build() { + SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private SchemaRegistryFormat.SchemaInfo.KeyValuePair buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public SchemaRegistryFormat.SchemaInfo.KeyValuePair buildPartial() { + SchemaRegistryFormat.SchemaInfo.KeyValuePair result = new SchemaRegistryFormat.SchemaInfo.KeyValuePair(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.key_ = key_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.value_ = value_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo.KeyValuePair other) { + if (other == SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance()) return this; + if (other.hasKey()) { + setKey(other.getKey()); + } + if (other.hasValue()) { + setValue(other.getValue()); + } + return this; + } + + public final boolean isInitialized() { + if (!hasKey()) { + + return false; + } + if (!hasValue()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!parseUnknownField(input, extensionRegistry, tag)) { + + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + key_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + value_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required string key = 1; + private java.lang.Object key_ = ""; + public boolean hasKey() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getKey() { + java.lang.Object ref = key_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + key_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setKey(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + key_ = value; + + return this; + } + public Builder clearKey() { + bitField0_ = (bitField0_ & ~0x00000001); + key_ = getDefaultInstance().getKey(); + + return this; + } + void setKey(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000001; + key_ = value; + + } + + // required string value = 2; + private java.lang.Object value_ = ""; + public boolean hasValue() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public String getValue() { + java.lang.Object ref = value_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + value_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setValue(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + value_ = value; + + return this; + } + public Builder clearValue() { + bitField0_ = (bitField0_ & ~0x00000002); + value_ = getDefaultInstance().getValue(); + + return this; + } + void setValue(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000002; + value_ = value; + + } + + // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo.KeyValuePair) + } + + static { + defaultInstance = new KeyValuePair(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo.KeyValuePair) + } + + private int bitField0_; + // required string schema_id = 1; + public static final int SCHEMA_ID_FIELD_NUMBER = 1; + private java.lang.Object schemaId_; + public boolean hasSchemaId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getSchemaId() { + java.lang.Object ref = schemaId_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + schemaId_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getSchemaIdBytes() { + java.lang.Object ref = schemaId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + schemaId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required string user = 2; + public static final int USER_FIELD_NUMBER = 2; + private java.lang.Object user_; + public boolean hasUser() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public String getUser() { + java.lang.Object ref = user_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + user_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getUserBytes() { + java.lang.Object ref = user_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + user_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required .pulsar.schema.SchemaInfo.SchemaType type = 3; + public static final int TYPE_FIELD_NUMBER = 3; + private SchemaRegistryFormat.SchemaInfo.SchemaType type_; + public boolean hasType() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public SchemaRegistryFormat.SchemaInfo.SchemaType getType() { + return type_; + } + + // required bytes schema = 4; + public static final int SCHEMA_FIELD_NUMBER = 4; + private com.google.protobuf.ByteString schema_; + public boolean hasSchema() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public com.google.protobuf.ByteString getSchema() { + return schema_; + } + + // required int64 timestamp = 5; + public static final int TIMESTAMP_FIELD_NUMBER = 5; + private long timestamp_; + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public long getTimestamp() { + return timestamp_; + } + + // required bool deleted = 6; + public static final int DELETED_FIELD_NUMBER = 6; + private boolean deleted_; + public boolean hasDeleted() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public boolean getDeleted() { + return deleted_; + } + + // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7; + public static final int PROPS_FIELD_NUMBER = 7; + private java.util.List props_; + public java.util.List getPropsList() { + return props_; + } + public java.util.List + getPropsOrBuilderList() { + return props_; + } + public int getPropsCount() { + return props_.size(); + } + public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) { + return props_.get(index); + } + public SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder getPropsOrBuilder( + int index) { + return props_.get(index); + } + + private void initFields() { + schemaId_ = ""; + user_ = ""; + type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; + schema_ = com.google.protobuf.ByteString.EMPTY; + timestamp_ = 0L; + deleted_ = false; + props_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasSchemaId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasUser()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasType()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasSchema()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasTimestamp()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasDeleted()) { + memoizedIsInitialized = 0; + return false; + } + for (int i = 0; i < getPropsCount(); i++) { + if (!getProps(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getSchemaIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getUserBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeEnum(3, type_.getNumber()); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(4, schema_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeInt64(5, timestamp_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBool(6, deleted_); + } + for (int i = 0; i < props_.size(); i++) { + output.writeMessage(7, props_.get(i)); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getSchemaIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getUserBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(3, type_.getNumber()); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, schema_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(5, timestamp_); + } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(6, deleted_); + } + for (int i = 0; i < props_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(7, props_.get(i)); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static SchemaRegistryFormat.SchemaInfo parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static SchemaRegistryFormat.SchemaInfo parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static SchemaRegistryFormat.SchemaInfo parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static SchemaRegistryFormat.SchemaInfo parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(SchemaRegistryFormat.SchemaInfo prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessageLite.Builder< + SchemaRegistryFormat.SchemaInfo, Builder> + implements SchemaRegistryFormat.SchemaInfoOrBuilder { + // Construct using org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + schemaId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + user_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; + bitField0_ = (bitField0_ & ~0x00000004); + schema_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000008); + timestamp_ = 0L; + bitField0_ = (bitField0_ & ~0x00000010); + deleted_ = false; + bitField0_ = (bitField0_ & ~0x00000020); + props_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000040); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public SchemaRegistryFormat.SchemaInfo getDefaultInstanceForType() { + return SchemaRegistryFormat.SchemaInfo.getDefaultInstance(); + } + + public SchemaRegistryFormat.SchemaInfo build() { + SchemaRegistryFormat.SchemaInfo result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private SchemaRegistryFormat.SchemaInfo buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + SchemaRegistryFormat.SchemaInfo result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public SchemaRegistryFormat.SchemaInfo buildPartial() { + SchemaRegistryFormat.SchemaInfo result = new SchemaRegistryFormat.SchemaInfo(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.schemaId_ = schemaId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.user_ = user_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.type_ = type_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.schema_ = schema_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.timestamp_ = timestamp_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.deleted_ = deleted_; + if (((bitField0_ & 0x00000040) == 0x00000040)) { + props_ = java.util.Collections.unmodifiableList(props_); + bitField0_ = (bitField0_ & ~0x00000040); + } + result.props_ = props_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo other) { + if (other == SchemaRegistryFormat.SchemaInfo.getDefaultInstance()) return this; + if (other.hasSchemaId()) { + setSchemaId(other.getSchemaId()); + } + if (other.hasUser()) { + setUser(other.getUser()); + } + if (other.hasType()) { + setType(other.getType()); + } + if (other.hasSchema()) { + setSchema(other.getSchema()); + } + if (other.hasTimestamp()) { + setTimestamp(other.getTimestamp()); + } + if (other.hasDeleted()) { + setDeleted(other.getDeleted()); + } + if (!other.props_.isEmpty()) { + if (props_.isEmpty()) { + props_ = other.props_; + bitField0_ = (bitField0_ & ~0x00000040); + } else { + ensurePropsIsMutable(); + props_.addAll(other.props_); + } + + } + return this; + } + + public final boolean isInitialized() { + if (!hasSchemaId()) { + + return false; + } + if (!hasUser()) { + + return false; + } + if (!hasType()) { + + return false; + } + if (!hasSchema()) { + + return false; + } + if (!hasTimestamp()) { + + return false; + } + if (!hasDeleted()) { + + return false; + } + for (int i = 0; i < getPropsCount(); i++) { + if (!getProps(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!parseUnknownField(input, extensionRegistry, tag)) { + + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + schemaId_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + user_ = input.readBytes(); + break; + } + case 24: { + int rawValue = input.readEnum(); + SchemaRegistryFormat.SchemaInfo.SchemaType value = SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(rawValue); + if (value != null) { + bitField0_ |= 0x00000004; + type_ = value; + } + break; + } + case 34: { + bitField0_ |= 0x00000008; + schema_ = input.readBytes(); + break; + } + case 40: { + bitField0_ |= 0x00000010; + timestamp_ = input.readInt64(); + break; + } + case 48: { + bitField0_ |= 0x00000020; + deleted_ = input.readBool(); + break; + } + case 58: { + SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder subBuilder = SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder(); + input.readMessage(subBuilder, extensionRegistry); + addProps(subBuilder.buildPartial()); + break; + } + } + } + } + + private int bitField0_; + + // required string schema_id = 1; + private java.lang.Object schemaId_ = ""; + public boolean hasSchemaId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getSchemaId() { + java.lang.Object ref = schemaId_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + schemaId_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setSchemaId(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + schemaId_ = value; + + return this; + } + public Builder clearSchemaId() { + bitField0_ = (bitField0_ & ~0x00000001); + schemaId_ = getDefaultInstance().getSchemaId(); + + return this; + } + void setSchemaId(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000001; + schemaId_ = value; + + } + + // required string user = 2; + private java.lang.Object user_ = ""; + public boolean hasUser() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public String getUser() { + java.lang.Object ref = user_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + user_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setUser(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + user_ = value; + + return this; + } + public Builder clearUser() { + bitField0_ = (bitField0_ & ~0x00000002); + user_ = getDefaultInstance().getUser(); + + return this; + } + void setUser(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000002; + user_ = value; + + } + + // required .pulsar.schema.SchemaInfo.SchemaType type = 3; + private SchemaRegistryFormat.SchemaInfo.SchemaType type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; + public boolean hasType() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public SchemaRegistryFormat.SchemaInfo.SchemaType getType() { + return type_; + } + public Builder setType(SchemaRegistryFormat.SchemaInfo.SchemaType value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + type_ = value; + + return this; + } + public Builder clearType() { + bitField0_ = (bitField0_ & ~0x00000004); + type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; + + return this; + } + + // required bytes schema = 4; + private com.google.protobuf.ByteString schema_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasSchema() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public com.google.protobuf.ByteString getSchema() { + return schema_; + } + public Builder setSchema(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + schema_ = value; + + return this; + } + public Builder clearSchema() { + bitField0_ = (bitField0_ & ~0x00000008); + schema_ = getDefaultInstance().getSchema(); + + return this; + } + + // required int64 timestamp = 5; + private long timestamp_ ; + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public long getTimestamp() { + return timestamp_; + } + public Builder setTimestamp(long value) { + bitField0_ |= 0x00000010; + timestamp_ = value; + + return this; + } + public Builder clearTimestamp() { + bitField0_ = (bitField0_ & ~0x00000010); + timestamp_ = 0L; + + return this; + } + + // required bool deleted = 6; + private boolean deleted_ ; + public boolean hasDeleted() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public boolean getDeleted() { + return deleted_; + } + public Builder setDeleted(boolean value) { + bitField0_ |= 0x00000020; + deleted_ = value; + + return this; + } + public Builder clearDeleted() { + bitField0_ = (bitField0_ & ~0x00000020); + deleted_ = false; + + return this; + } + + // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7; + private java.util.List props_ = + java.util.Collections.emptyList(); + private void ensurePropsIsMutable() { + if (!((bitField0_ & 0x00000040) == 0x00000040)) { + props_ = new java.util.ArrayList(props_); + bitField0_ |= 0x00000040; + } + } + + public java.util.List getPropsList() { + return java.util.Collections.unmodifiableList(props_); + } + public int getPropsCount() { + return props_.size(); + } + public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) { + return props_.get(index); + } + public Builder setProps( + int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) { + if (value == null) { + throw new NullPointerException(); + } + ensurePropsIsMutable(); + props_.set(index, value); + + return this; + } + public Builder setProps( + int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builderForValue) { + ensurePropsIsMutable(); + props_.set(index, builderForValue.build()); + + return this; + } + public Builder addProps(SchemaRegistryFormat.SchemaInfo.KeyValuePair value) { + if (value == null) { + throw new NullPointerException(); + } + ensurePropsIsMutable(); + props_.add(value); + + return this; + } + public Builder addProps( + int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) { + if (value == null) { + throw new NullPointerException(); + } + ensurePropsIsMutable(); + props_.add(index, value); + + return this; + } + public Builder addProps( + SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builderForValue) { + ensurePropsIsMutable(); + props_.add(builderForValue.build()); + + return this; + } + public Builder addProps( + int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builderForValue) { + ensurePropsIsMutable(); + props_.add(index, builderForValue.build()); + + return this; + } + public Builder addAllProps( + java.lang.Iterable values) { + ensurePropsIsMutable(); + super.addAll(values, props_); + + return this; + } + public Builder clearProps() { + props_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000040); + + return this; + } + public Builder removeProps(int index) { + ensurePropsIsMutable(); + props_.remove(index); + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo) + } + + static { + defaultInstance = new SchemaInfo(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo) + } + + + static { + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto new file mode 100644 index 0000000000000..e497eaffccf9f --- /dev/null +++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto2"; + +package pulsar.schema; +option java_package = "org.apache.pulsar.broker.service.schema.proto"; +option optimize_for = LITE_RUNTIME; + +message SchemaInfo { + enum SchemaType { + NONE = 1; + THRIFT = 2; + AVRO = 3; + JSON = 4; + PROTO = 5; + } + message KeyValuePair { + required string key = 1; + required string value = 2; + } + required string schema_id = 1; + required string user = 2; + required SchemaType type = 3; + required bytes schema = 4; + required int64 timestamp = 5; + required bool deleted = 6; + + repeated KeyValuePair props = 7; +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index d6756b4ba96d3..ce053fe02694b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -96,6 +96,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.Compactor; @@ -335,7 +336,7 @@ public void testAddRemoveProducer() throws Exception { String role = "appid1"; // 1. simple add producer Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", - role, false, null); + role, false, null, SchemaVersion.Latest); topic.addProducer(producer); assertEquals(topic.getProducers().size(), 1); @@ -351,7 +352,7 @@ public void testAddRemoveProducer() throws Exception { // 3. add producer for a different topic PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService); Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name", - role, false, null); + role, false, null, SchemaVersion.Latest); try { topic.addProducer(failProducer); fail("should have failed"); @@ -371,18 +372,18 @@ public void testMaxProducers() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); String role = "appid1"; // 1. add producer1 - Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null); + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null, SchemaVersion.Latest); topic.addProducer(producer); assertEquals(topic.getProducers().size(), 1); // 2. add producer2 - Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null); + Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null, SchemaVersion.Latest); topic.addProducer(producer2); assertEquals(topic.getProducers().size(), 2); // 3. add producer3 but reached maxProducersPerTopic try { - Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null); + Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null, SchemaVersion.Latest); topic.addProducer(producer3); fail("should have failed"); } catch (BrokerServiceException e) { @@ -721,7 +722,7 @@ public void testDeleteTopic() throws Exception { // 2. delete topic with producer topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", - role, false, null); + role, false, null, SchemaVersion.Latest); topic.addProducer(producer); assertTrue(topic.delete().isCompletedExceptionally()); @@ -877,7 +878,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { String role = "appid1"; Thread.sleep(10); /* delay to ensure that the delete gets executed first */ Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", - role, false, null); + role, false, null, SchemaVersion.Latest); topic.addProducer(producer); fail("Should have failed"); } catch (BrokerServiceException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index ded2f008cd125..9fb02ea43fdd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -35,6 +35,12 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; @@ -43,9 +49,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.naming.AuthenticationException; - import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -70,6 +74,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.ServerCnx.State; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; import org.apache.pulsar.broker.service.utils.ClientChannelHelper; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.Commands; @@ -103,14 +108,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; - /** */ @Test @@ -143,6 +140,8 @@ public class ServerCnxTest { public void setup() throws Exception { svcConfig = spy(new ServiceConfiguration()); pulsar = spy(new PulsarService(svcConfig)); + doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService(); + svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS)); svcConfig.setBacklogQuotaCheckEnabled(false); doReturn(svcConfig).when(pulsar).getConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index 5d2b922c19d7c..a0f007a8395b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -23,12 +23,12 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.api.PulsarClientException.LookupException; import org.apache.pulsar.client.impl.ConsumerBase; @@ -36,12 +36,11 @@ import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.common.schema.SchemaVersion; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import io.netty.channel.ChannelHandlerContext; - /** */ public class ClientErrorsTest { @@ -143,7 +142,7 @@ private void producerCreateSuccessAfterRetry(String topic) throws Exception { mockBrokerService.setHandleProducer((ctx, producer) -> { if (counter.incrementAndGet() == 2) { - ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer")); + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); return; } ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.ServiceNotReady, "msg")); @@ -217,7 +216,8 @@ private void producerFailDoesNotFailOtherProducer(String topic1, String topic2) ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthenticationError, "msg")); return; } - ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer")); + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); + }); ProducerBase producer1 = (ProducerBase) client.newProducer().topic(topic1).create(); @@ -255,7 +255,7 @@ private void producerContinuousRetryAfterSendFail(String topic) throws Exception int i = counter.incrementAndGet(); if (i == 1 || i == 5) { // succeed on 1st and 5th attempts - ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer")); + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); return; } ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.PersistenceError, "msg")); @@ -479,7 +479,7 @@ public void testOneProducerFailShouldCloseAllProducersInPartitionedProducer() th ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg")); return; } - ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer")); + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); }); mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { @@ -583,7 +583,7 @@ public void testProducerReconnect() throws Exception { }); mockBrokerService.setHandleProducer((ctx, produce) -> { - ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(), "default-producer")); + ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(), "default-producer", SchemaVersion.Empty)); }); mockBrokerService.setHandleSend((ctx, sendCmd, headersAndPayload) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java index bdc8e322e342a..91e5bbe6dd925 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; @@ -174,7 +175,7 @@ protected void handleProducer(PulsarApi.CommandProducer producer) { return; } // default - ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer")); + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); } @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index b51d29ea070e5..093f94441247c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.api; +import static com.google.protobuf.ByteString.copyFromUtf8; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum; @@ -37,11 +38,11 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; @@ -72,6 +73,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; @@ -115,7 +117,7 @@ public static ByteBuf newConnect(String authMethodName, String authData, int pro } if (authData != null) { - connectBuilder.setAuthData(ByteString.copyFromUtf8(authData)); + connectBuilder.setAuthData(copyFromUtf8(authData)); } if (originalPrincipal != null) { @@ -165,15 +167,16 @@ public static ByteBuf newSuccess(long requestId) { return res; } - public static ByteBuf newProducerSuccess(long requestId, String producerName) { - return newProducerSuccess(requestId, producerName, -1); + public static ByteBuf newProducerSuccess(long requestId, String producerName, SchemaVersion schemaVersion) { + return newProducerSuccess(requestId, producerName, -1, schemaVersion); } - public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId) { + public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId, SchemaVersion schemaVersion) { CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder(); producerSuccessBuilder.setRequestId(requestId); producerSuccessBuilder.setProducerName(producerName); producerSuccessBuilder.setLastSequenceId(lastSequenceId); + producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes())); CommandProducerSuccess producerSuccess = producerSuccessBuilder.build(); ByteBuf res = serializeWithSize( BaseCommand.newBuilder().setType(Type.PRODUCER_SUCCESS).setProducerSuccess(producerSuccess)); @@ -980,4 +983,5 @@ public static boolean peerSupportsGetLastMessageId(int peerVersion) { public static boolean peerSupportsActiveConsumerListener(int peerVersion) { return peerVersion >= ProtocolVersion.v12.getNumber(); } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 61339e1395022..c34f46cd9b2e7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -288,6 +288,12 @@ public boolean isGlobal() { return cluster == null || Constants.GLOBAL_CLUSTER.equalsIgnoreCase(cluster); } + public String getSchemaName() { + return getProperty() + + "/" + getNamespacePortion() + + "/" + getLocalName(); + } + @Override public String toString() { return completeTopicName; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java new file mode 100644 index 0000000000000..0aaefb3eff24d --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +final public class EmptyVersion implements SchemaVersion { + private static final byte[] EMPTY = new byte[]{}; + + @Override + public byte[] bytes() { + return EMPTY; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java new file mode 100644 index 0000000000000..b26231c1d0aca --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +final class LatestVersion implements SchemaVersion { + private static final byte[] EMPTY = new byte[]{}; + + @Override + public byte[] bytes() { + return EMPTY; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java new file mode 100644 index 0000000000000..5a5012c9bd27f --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import lombok.Builder; +import lombok.Data; + +@Builder +@Data +public class SchemaData { + private final SchemaType type; + private final boolean isDeleted; + private final long timestamp; + private final String user; + private final byte[] data; + private final Map props; +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java new file mode 100644 index 0000000000000..e9a01f0a19a68 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +public enum SchemaType { + AVRO, PROTOBUF, THRIFT, JSON, NONE +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java new file mode 100644 index 0000000000000..e31e45d050e82 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +public interface SchemaVersion { + SchemaVersion Latest = new LatestVersion(); + SchemaVersion Empty = new EmptyVersion(); + + byte[] bytes(); +}