Skip to content

Commit

Permalink
Schema registry (2/N) (apache#1319)
Browse files Browse the repository at this point in the history
* Schema Registry proto changes

* Infrastructure to store schemas

* Renumber schema fields

* Update Pulsar API with schema changes

* Revert field number change

* Fix merge conflict

* Fix broken merge

* Address issues in review

* Add schema type back to proto definition

* Address comments regarding lombok usage

* Remove reserved future enum fields

* regenerate code from protobuf

* Remove unused code

* Add schema version to producer success message

* plumb schema through to producer

* Revert "Add schema version to producer success message"

This reverts commit e7e72f4.

* Revert "Revert "Add schema version to producer success message""

This reverts commit 7b902f6.

* Persist schema on producer connect

* Add principal to schema on publish

* Reformat function for readability

* Remove unused protoc profile

* Rename put on schema registry to putIfAbsent

* wip: address review comments

* switch underscore to slash in schema name

* blah

* Fix protobuf version incompatibility

* Add appropriate license headers
  • Loading branch information
mgodave authored and merlimat committed Mar 8, 2018
1 parent e35087c commit f04eb51
Show file tree
Hide file tree
Showing 29 changed files with 2,281 additions and 99 deletions.
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,20 @@ flexible messaging model and an intuitive client API.</description>
<version>${log4j2.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<type>test-jar</type>
<version>${log4j2.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<type>test-jar</type>
<version>${log4j2.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(dynamic = true)
private boolean preferLaterVersions = false;

private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";

/**** --- WebSocket --- ****/
// Number of IO threads in Pulsar Client used in WebSocket proxy
private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -1449,7 +1451,15 @@ public boolean exposeTopicLevelMetricsInPrometheus() {
public void setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
}


public String getSchemaRegistryStorageClassName() {
return schemaRegistryStorageClassName;
}

public void setSchemaRegistryStorageClassName(String className) {
schemaRegistryStorageClassName = className;
}

public boolean authenticateOriginalAuthData() {
return authenticateOriginalAuthData;
}
Expand Down
6 changes: 6 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf2.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.URL;
import java.util.List;
Expand All @@ -34,7 +37,6 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
Expand All @@ -52,6 +54,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.WebService;
Expand Down Expand Up @@ -80,11 +83,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import io.netty.util.concurrent.DefaultThreadFactory;

/**
* Main class for Pulsar broker service
*/
Expand Down Expand Up @@ -123,6 +121,7 @@ public class PulsarService implements AutoCloseable {
private final String brokerServiceUrl;
private final String brokerServiceUrlTls;
private final String brokerVersion;
private SchemaRegistryService schemaRegistryService = null;
private final Optional<WorkerService> functionWorkerService;

private final MessagingServiceShutdownHook shutdownService;
Expand Down Expand Up @@ -233,6 +232,10 @@ public void close() throws PulsarServerException {
loadManager.stop();
}

if (schemaRegistryService != null) {
schemaRegistryService.close();
}

state = State.Closed;

} catch (Exception e) {
Expand Down Expand Up @@ -359,6 +362,8 @@ public synchronized void brokerIsAFollowerNow() {

this.metricsGenerator = new MetricsGenerator(this);

schemaRegistryService = SchemaRegistryService.create(this);

state = State.Started;

acquireSLANamespace();
Expand Down Expand Up @@ -701,4 +706,8 @@ public AtomicReference<LoadManager> getLoadManager() {
public String getBrokerVersion() {
return brokerVersion;
}

public SchemaRegistryService getSchemaRegistryService() {
return schemaRegistryService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
Expand All @@ -42,17 +46,11 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

/**
* Represents a currently connected producer
*/
Expand Down Expand Up @@ -82,8 +80,10 @@ public class Producer {

private final Map<String, String> metadata;

private final SchemaVersion schemaVersion;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata) {
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
Expand All @@ -110,6 +110,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null;

this.isEncrypted = isEncrypted;
this.schemaVersion = schemaVersion;
}

@Override
Expand Down Expand Up @@ -492,6 +493,10 @@ public void checkEncryption() {
}
}

public SchemaVersion getSchemaVersion() {
return schemaVersion;
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@
import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;

import com.google.protobuf.GeneratedMessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.mledger.Position;
Expand All @@ -52,6 +59,7 @@
import org.apache.pulsar.common.api.CommandUtils;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
Expand All @@ -74,24 +82,18 @@
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;
import com.google.protobuf.GeneratedMessageLite;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslHandler;

public class ServerCnx extends PulsarHandler {
private final BrokerService service;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
Expand Down Expand Up @@ -697,6 +699,36 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
});
}

private static SchemaType getType(PulsarApi.Schema.Type protocolType) {
switch (protocolType) {
case Json:
return SchemaType.JSON;
case Avro:
return SchemaType.AVRO;
case Thrift:
return SchemaType.THRIFT;
case Protobuf:
return SchemaType.PROTOBUF;
default:
return SchemaType.NONE;
}
}

private SchemaData getSchema(PulsarApi.Schema protocolSchema) {
return SchemaData.builder()
.data(protocolSchema.getSchemaData().toByteArray())
.isDeleted(false)
.timestamp(System.currentTimeMillis())
.user(originalPrincipal)
.type(getType(protocolSchema.getType()))
.props(protocolSchema.getPropertiesList().stream().collect(
Collectors.toMap(
PulsarApi.KeyValue::getKey,
PulsarApi.KeyValue::getValue
)
)).build();
}

@Override
protected void handleProducer(final CommandProducer cmdProducer) {
checkArgument(state == State.Connected);
Expand Down Expand Up @@ -752,7 +784,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
Producer producer = existingProducerFuture.getNow(null);
log.info("[{}] Producer with the same id is already created: {}", remoteAddress,
producer);
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName(),
producer.getSchemaVersion()));
return null;
} else {
// There was an early request to create a producer with
Expand Down Expand Up @@ -805,41 +838,56 @@ protected void handleProducer(final CommandProducer cmdProducer) {

disableTcpNoDelayIfNeeded(topicName.toString(), producerName);

Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata);
CompletableFuture<SchemaVersion> schemaVersionFuture;
if (cmdProducer.hasSchema()) {
schemaVersionFuture = topic.addSchema(getSchema(cmdProducer.getSchema()));
} else {
schemaVersionFuture = CompletableFuture.completedFuture(SchemaVersion.Empty);
}

schemaVersionFuture.exceptionally(exception -> {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, exception.getMessage()));
producers.remove(producerId, producerFuture);
return null;
});

schemaVersionFuture.thenAccept(schemaVersion -> {
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata, schemaVersion);

try {
topic.addProducer(producer);
try {
topic.addProducer(producer);

if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
producer.getLastSequenceId()));
return;
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion()));
return;
} else {
// The producer's future was completed before by
// a close command
producer.closeNow();
log.info("[{}] Cleared producer created after timeout on client side {}",
remoteAddress, producer);
}
} else {
// The producer's future was completed before by
// a close command
producer.closeNow();
log.info("[{}] Cleared producer created after timeout on client side {}",
remoteAddress, producer);
}
} else {
producer.closeNow();
log.info("[{}] Cleared producer created after connection was closed: {}",
log.info("[{}] Cleared producer created after connection was closed: {}",
remoteAddress, producer);
producerFuture.completeExceptionally(
producerFuture.completeExceptionally(
new IllegalStateException("Producer created after connection was closed"));
}
} catch (BrokerServiceException ise) {
log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
}
} catch (BrokerServiceException ise) {
log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
ise.getMessage());
ctx.writeAndFlush(Commands.newError(requestId,
ctx.writeAndFlush(Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
producerFuture.completeExceptionally(ise);
}
producerFuture.completeExceptionally(ise);
}

producers.remove(producerId, producerFuture);
producers.remove(producerId, producerFuture);
});
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
if (!(cause instanceof ServiceUnitNotReadyException)) {
Expand Down
Loading

0 comments on commit f04eb51

Please sign in to comment.