Skip to content

Commit

Permalink
Schema registry documentation (apache#1457)
Browse files Browse the repository at this point in the history
* add basic notes for schema registry

* more notes on schema registry in C&A doc

* add java interfaces to concepts doc

* add theoretical docs

* add schema storage cookbook

* add schema storage cookbook to sidebar

* remove schema storage cookbook for now

* remove unused notes

* add java examples

* add schema components

* add user config

* add examples of user-defined props

* finish draft of java client docs

* add custom schema storage document

* more sections in custom doc

* add schemaRegistryStorageClassName to conf/broker.conf file

* change sentence in getting started guide
  • Loading branch information
lucperkins authored and merlimat committed May 8, 2018
1 parent faa107b commit 7056384
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 18 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ functionsWorkerEnabled=false
# Enable topic level metrics
exposePublisherStats=true

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory

### --- Ledger Offloading --- ###

# Driver to use to offload old data to long term storage (Possible values: S3)
Expand Down
2 changes: 2 additions & 0 deletions site/_data/sidebar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ groups:
endpoint: BinaryProtocol
- title: Codebase
endpoint: Codebase
- title: Custom schema storage
endpoint: schema-storage
- title: Modular load manager
endpoint: ModularLoadManager

Expand Down
2 changes: 1 addition & 1 deletion site/_includes/explanations/install-package.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Pulsar is currently available for **MacOS** and **Linux**. In order to use Pulsa

To get started running Pulsar, download a binary tarball release in one of the following ways:

* by clicking the link below, which will automatically trigger a download:
* by clicking the link below and downloading the release from an Apache mirror:

* <a href="{{ binary_release_url }}" download>Pulsar {{ site.current_version }} binary release</a>

Expand Down
98 changes: 81 additions & 17 deletions site/docs/latest/clients/Java.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: The Pulsar Java client
tags: [client, java]
tags: [client, java, schema, schema registry]
---

<!--
Expand Down Expand Up @@ -306,22 +306,6 @@ consumerBuilder
});
```

## Message schemas {#schemas}

In Pulsar, all message data consists of byte arrays. Message **schemas** enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a [producer](#producers) without specifying a schema, then the producer can only produce messages of type `byte[]`. Here's an example:
```java
Producer producer = client.newProducer()
.topic(topic)
.create();
```
The producer above is equivalent to a `Producer<byte[]>` (in fact, you should always explicitly specify the type). If you'd like



The same schema-based logic applies to [consumers](#consumers) and [readers](#readers).

## Reader interface {#readers}

With the [reader interface](../../getting-started/ConceptsAndArchitecture#reader-interface), Pulsar clients can "manually position" themselves within a topic, reading all messages from a specified message onward. The Pulsar API for Java enables you to create {% javadoc Reader client org.apache.pulsar.client.api.Reader %} objects by specifying a {% popover topic %}, a {% javadoc MessageId client org.apache.pulsar.client.api.MessageId %}, and {% javadoc ReaderConfiguration client org.apache.pulsar.client.api.ReaderConfiguration %}.
Expand All @@ -347,6 +331,86 @@ In the example above, a `Reader` object is instantiated for a specific topic and
The code sample above shows pointing the `Reader` object to a specific message (by ID), but you can also use `MessageId.earliest` to point to the earliest available message on the topic of `MessageId.latest` to point to the most recent available message.
## Schemas
In Pulsar, all message data consists of byte arrays "under the hood." [Message schemas](../../getting-started/ConceptsAndArchitecture#schema-registry) enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a [producer](#producers) without specifying a schema, then the producer can only produce messages of type `byte[]`. Here's an example:

```java
Producer producer = client.newProducer()
.topic(topic)
.create();
```

The producer above is equivalent to a `Producer<byte[]>` (in fact, you should *always* explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a **schema** that informs Pulsar which data type will be transmitted over the {% popover topic %}.

### Schema example

Let's say that you have a `SensorReading` class that you'd like to transmit over a Pulsar topic:

```java
public class SensorReading {
public float temperature;

public SensorReading(float temperature) {
this.temperature = temperature;
}

// A no-arg constructor is required
public SensorReading() {
}

public float getTemperature() {
return temperature;
}

public void setTemperature(float temperature) {
this.temperature = temperature;
}
}
```

You could then create a `Producer<SensorReading>` (or `Consumer<SensorReading>`) like so:

```java
Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
```

The following schema formats are currently available for Java:

* No schema or the byte array schema (which can be applied using `Schema.BYTES`):

```java
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
.topic("some-raw-bytes-topic")
.create();
```

Or, equivalently:

```java
Producer<byte[]> bytesProducer = client.newProducer()
.topic("some-raw-bytes-topic")
.create();
```

* `String` for normal UTF-8-encoded string data. This schema can be applied using `Schema.STRING`:

```java
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("some-string-topic")
.create();
```
* JSON schemas can be created for POJOs using the `JSONSchema` class. Here's an example:
```java
Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class);
Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema)
.topic("some-pojo-topic")
.create();
```
## Authentication
Pulsar currently supports two authentication schemes: [TLS](../../admin/Authz#tls-client-auth) and [Athenz](../../admin/Authz#athenz). The Pulsar Java client can be used with both.
Expand Down
71 changes: 71 additions & 0 deletions site/docs/latest/getting-started/ConceptsAndArchitecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,74 @@ byte[] msgIdBytes = // Some byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration());
```

## Schema registry

Type safety is extremely important in any application built around a message bus like Pulsar. {% popover Producers %} and {% popover consumers %} need some kind of mechanism for coordinating types at the {% popover topic %} level lest a wide variety of potential problems arise (for example serialization and deserialization issues). Applications typically adopt one of two basic approaches to type safety in messaging:

1. A "client-side" approach in which message producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as, say, moisture sensor readings.
1. A "server-side" approach in which producers and consumers inform the system which data types can be transmitted via the topic. With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced.

Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis.

1. For the "client-side" approach, producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis.
1. For the "server-side" approach, Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic.

{% include admonition.html type="info" content="The Pulsar schema registry is currently available only for the [Java client](../../clients/Java)." %}

### Basic architecture

In Pulsar, schemas are uploaded to, fetched from, and update via Pulsar's [REST API](../../reference/RestApi).

{% include admonition.html type="success" title="Other schema registry backends"
content="Out of the box, Pulsar uses the [Apache BookKeeper](#persistent-storage) log storage system for schema storage. You can, however, use different backends if you wish. Documentation for custom schema storage logic is coming soon." %}

### How schemas work

Pulsar schemas are applied and enforced *at the topic level* (schemas cannot be applied at the {% popover namespace %} or {% popover tenant %} level). Producers and consumers upload schemas to Pulsar {% popover brokers %}.

Pulsar schemas are fairly simple data structures that consist of:

* A **name**. In Pulsar, a schema's name is the {% popover topic %} to which the schema is applied.
* A **payload**, which is a binary representation of the schema
* A schema [**type**](#schema-types)
* User-defined **properties** as a string/string map. Usage of properties is wholly application specific. Possible properties might be the Git hash associated with a schema, an environment like `dev` or `prod`, etc.

### Schema versions

In order to illustrate how schema versioning works, let's walk through an example. Imagine that the Pulsar [Java client](../../clients/Java) created using the code below attempts to connect to Pulsar and begin sending messages:

```java
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();

Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-data")
.sendTimeout(3, TimeUnit.SECONDS)
.create();
```

The table below lists the possible scenarios when this connection attempt occurs and what will happen in light of each scenario:

Scenario | What happens
:--------|:------------
No schema exists for the topic | The {% popover producer %} is created using the given schema. The schema is transmitted to the {% popover broker %} and stored (since no existing schema is "compatible" with the `SensorReading` schema). Any {% popover consumer %} created using the same schema/topic can consume messages from the `sensor-data` topic.
A schema already exists; the producer connects using the same schema that's already stored | The schema is transmitted to the Pulsar broker. The broker determines that the schema is compatible. The broker attempts to store the schema in [BookKeeper](#persistent-storage) but then determines that it's already stored, so it's then used to tag produced messages.
A schema already exists; the producer connects using a new schema that is compatible | The producer transmits the schema to the broker. The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number).

{% include admonition.html type="info" content="Schemas are versioned in succession. Schema storage happens in the broker that handles the associated topic so that version assignments can be made. Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version." %}

### Supported schema formats {#schema-types}

The following formats are supported by the Pulsar schema registry:

* None. If no schema is specified for a topic, producers and consumers will handle raw bytes.
* `String` (used for UTF-8-encoded strings)
* [JSON](https://www.json.org/)

For usage instructions, see the documentation for your preferred client library:

* [Java](../../clients/Java#schemas)

{% include admonition.html type="success" content="Support for other schema formats will be added in future releases of Pulsar." %}
58 changes: 58 additions & 0 deletions site/docs/latest/project/schema-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
title: Custom schema storage
tags: [schema, schema registry]
---

By default, Pulsar stores data type [schemas](../../getting-started/ConceptsAndArchitecture#schema-registry) in [Apache BookKeeper](https://bookkeeper.apache.org) (which is deployed alongside Pulsar). You can, however, use another storage system if you wish. This doc walks you through creating your own schema storage implementation.

## Interface

In order to use a non-default (i.e. non-BookKeeper) storage system for Pulsar schemas, you need to implement two Java interfaces: [`SchemaStorage`](#schema-storage) and [`SchemaStorageFactory`](#schema-storage-factory).

### The `SchemaStorage` interface {#schema-storage}

The `SchemaStorage` interface has the following methods:

```java
public interface SchemaStorage {
// How schemas are updated
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);

// How schemas are fetched from storage
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);

// How schemas are deleted
CompletableFuture<SchemaVersion> delete(String key);

// Utility method for converting a schema version byte array to a SchemaVersion object
SchemaVersion versionFromBytes(byte[] version);

// Startup behavior for the schema storage client
void start() throws Exception;

// Shutdown behavior for the schema storage client
void close() throws Exception;
}
```

{% include admonition.html type="info" title="Example implementation" content="For a full-fledged example schema storage implementation, see the [`BookKeeperSchemaStorage`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class." %}

### The `SchemaStorageFactory` interface {#schema-storage-factory}

```java
public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
}
```

{% include admonition.html type="info" title="Example implementation" content="For a full-fledged example schema storage factory implementation, see the [`BookKeeperSchemaStorageFactory`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class." %}

## Deployment

In order to use your custom schema storage implementation, you'll need to:

1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file.
1. Add that jar to the `lib` folder in your Pulsar [binary or source distribution](../../getting-started/LocalCluster#installing-pulsar).
1. Change the `schemaRegistryStorageClassName` configuration in [`broker.conf`](../../reference/Configuration#broker) to your custom factory class (i.e. the `SchemaStorageFactory` implementation, not the `SchemaStorage` implementation).
1. Start up Pulsar.

0 comments on commit 7056384

Please sign in to comment.