Skip to content

Commit

Permalink
Schema registry 3/N (apache#1363)
Browse files Browse the repository at this point in the history
* Schema Registry proto changes

* Infrastructure to store schemas

* A default schema registry implementation

* Renumber schema fields

* Update Pulsar API with schema changes

* Revert field number change

* Fix merge conflict

* Fix broken merge

* Address issues in review

* Add schema type back to proto definition

* Address comments regarding lombok usage

* Remove reserved future enum fields

* regenerate code from protobuf

* Remove unused code

* Add schema version to producer success message

* plumb schema through to producer

* Revert "Add schema version to producer success message"

This reverts commit e7e72f4.

* Revert "Revert "Add schema version to producer success message""

This reverts commit 7b902f6.

* Persist schema on producer connect

* Add principal to schema on publish

* Reformat function for readability

* Remove unused protoc profile

* Rename put on schema registry to putIfAbsent

* Reformat function for readability

* Remove unused protoc profile

* Rename put on schema registry to putIfAbsent

* fix compile errors from parent branch changes

* fix lombok tomfoolery on builder

* plumb hash through and allow lookup by data

* wip

* run tests

* wip: address review comments

* switch underscore to slash in schema name

* blah

* Get duplicate schema detection to work

* Fix protobuf version incompatibility

* fix merge issues

* Fix license headers

* Address review
  • Loading branch information
mgodave authored and merlimat committed Mar 14, 2018
1 parent 9251a44 commit 80dc0de
Show file tree
Hide file tree
Showing 11 changed files with 2,702 additions and 4 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.PulsarService;

@SuppressWarnings("unused")
public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
@Override
@NotNull
public SchemaStorage create(PulsarService pulsar) throws Exception {
BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar);
service.init();
return service;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import com.google.common.base.MoreObjects;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.pulsar.common.schema.SchemaVersion;

class LongSchemaVersion implements SchemaVersion {
private final long version;

LongSchemaVersion(long version) {
this.version = version;
}

public long getVersion() {
return version;
}

@Override
public byte[] bytes() {
ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE);
buffer.putLong(version);
buffer.rewind();
return buffer.array();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongSchemaVersion that = (LongSchemaVersion) o;
return version == that.version;
}

@Override
public int hashCode() {

return Objects.hash(version);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("version", version)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -38,6 +41,7 @@
import org.apache.pulsar.common.schema.SchemaVersion;

public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private static HashFunction hashFunction = Hashing.sha256();
private final SchemaStorage schemaStorage;
private final Clock clock;

Expand Down Expand Up @@ -76,6 +80,7 @@ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVer
@Override
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
.setSchema(ByteString.copyFrom(schema.getData()))
Expand All @@ -85,14 +90,14 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, Schem
.setTimestamp(clock.millis())
.addAllProps(toPairs(schema.getProps()))
.build();
return schemaStorage.put(schemaId, info.toByteArray());
return schemaStorage.put(schemaId, info.toByteArray(), context);
}

@Override
@NotNull
public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) {
byte[] deletedEntry = deleted(schemaId, user).toByteArray();
return schemaStorage.put(schemaId, deletedEntry);
return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
}

@Override
Expand Down Expand Up @@ -156,6 +161,9 @@ static Map<String, String> toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePa
}

static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> toPairs(Map<String, String> map) {
if (isNull(map)) {
return Collections.emptyList();
}
List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new ArrayList<>(map.size());
for (Map.Entry<String, String> entry : map.entrySet()) {
SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public interface SchemaStorage {

CompletableFuture<SchemaVersion> put(String key, byte[] value);
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);

CompletableFuture<StoredSchema> get(String key, SchemaVersion version);

Expand Down
Loading

0 comments on commit 80dc0de

Please sign in to comment.