Skip to content

Commit

Permalink
move general schema hash to common module (apache#5466)
Browse files Browse the repository at this point in the history
* move general schema hash to common module
  • Loading branch information
yittg authored and codelipenghui committed Oct 29, 2019
1 parent 1a4b126 commit 74ec920
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,10 @@
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -47,11 +43,11 @@
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;

public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private static HashFunction hashFunction = Hashing.sha256();
private final Map<SchemaType, SchemaCompatibilityCheck> compatibilityChecks;
private final SchemaStorage schemaStorage;
private final Clock clock;
Expand Down Expand Up @@ -142,7 +138,7 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, Schem
return maxDeleteVersionFuture;
}
).thenCompose(maxDeleteVersion -> {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
byte[] context = SchemaHash.of(schema).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
.setSchema(ByteString.copyFrom(schema.getData()))
Expand Down Expand Up @@ -216,8 +212,8 @@ private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) {

private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema,
SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData());
HashCode newHash = hashFunction.hashBytes(newSchema.getData());
SchemaHash existingHash = SchemaHash.of(existingSchema.schema);
SchemaHash newHash = SchemaHash.of(newSchema);
SchemaData existingSchemaData = existingSchema.schema;
if (existingSchemaData.getType().isPrimitive()) {
if (newSchema.getType() != existingSchemaData.getType()) {
Expand All @@ -234,9 +230,9 @@ private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData newSch

public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> {
HashCode newHash = hashFunction.hashBytes(schemaData.getData());
SchemaHash newHash = SchemaHash.of(schemaData);
for (SchemaAndMetadata schemaAndMetadata:schemaAndMetadataList) {
if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(), newHash.asBytes())) {
if (newHash.equals(SchemaHash.of(schemaAndMetadata.schema))) {
return completedFuture(((LongSchemaVersion)schemaStorage
.versionFromBytes(schemaAndMetadata.version.bytes())).getVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.schema.SchemaHash;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaHash;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,45 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
package org.apache.pulsar.common.protocol.schema;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

import java.util.Optional;
import lombok.EqualsAndHashCode;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;

import java.util.Optional;

// TODO(yittg): use same hash function with broker,
// move it to pulsar-common and provide it to broker also.
/**
* Schema hash wrapper with a HashCode inner type.
*/
@EqualsAndHashCode
public class SchemaHash {

private static HashFunction hashFunction = Hashing.sha256();

private final byte[] value;
private final HashCode hash;

private SchemaHash(byte[] value) {
this.value = value;
private SchemaHash(HashCode hash) {
this.hash = hash;
}

public static SchemaHash of(Schema schema) {
byte[] schemaBytes = Optional.ofNullable(schema)
.map(Schema::getSchemaInfo)
.map(SchemaInfo::getSchema).orElse(new byte[0]);
return new SchemaHash(hashFunction.hashBytes(schemaBytes).asBytes());
return of(Optional.ofNullable(schema)
.map(Schema::getSchemaInfo)
.map(SchemaInfo::getSchema).orElse(new byte[0]));
}

public static SchemaHash of(SchemaData schemaData) {
return of(schemaData.getData());
}

private static SchemaHash of(byte[] schemaBytes) {
return new SchemaHash(hashFunction.hashBytes(schemaBytes));
}

public byte[] asBytes() {
return hash.asBytes();
}
}

0 comments on commit 74ec920

Please sign in to comment.