diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 26f030950bd4c..25e2588147d5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -27,14 +27,10 @@ import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs; import com.google.common.annotations.VisibleForTesting; -import com.google.common.hash.HashCode; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.time.Clock; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -47,11 +43,11 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat; import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.protocol.schema.SchemaVersion; public class SchemaRegistryServiceImpl implements SchemaRegistryService { - private static HashFunction hashFunction = Hashing.sha256(); private final Map compatibilityChecks; private final SchemaStorage schemaStorage; private final Clock clock; @@ -142,7 +138,7 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem return maxDeleteVersionFuture; } ).thenCompose(maxDeleteVersion -> { - byte[] context = hashFunction.hashBytes(schema.getData()).asBytes(); + byte[] context = SchemaHash.of(schema).asBytes(); SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder() .setType(Functions.convertFromDomainType(schema.getType())) .setSchema(ByteString.copyFrom(schema.getData())) @@ -216,8 +212,8 @@ private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) { private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData newSchema, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { - HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData()); - HashCode newHash = hashFunction.hashBytes(newSchema.getData()); + SchemaHash existingHash = SchemaHash.of(existingSchema.schema); + SchemaHash newHash = SchemaHash.of(newSchema); SchemaData existingSchemaData = existingSchema.schema; if (existingSchemaData.getType().isPrimitive()) { if (newSchema.getType() != existingSchemaData.getType()) { @@ -234,9 +230,9 @@ private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData newSch public CompletableFuture findSchemaVersion(String schemaId, SchemaData schemaData) { return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> { - HashCode newHash = hashFunction.hashBytes(schemaData.getData()); + SchemaHash newHash = SchemaHash.of(schemaData); for (SchemaAndMetadata schemaAndMetadata:schemaAndMetadataList) { - if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(), newHash.asBytes())) { + if (newHash.equals(SchemaHash.of(schemaAndMetadata.schema))) { return completedFuture(((LongSchemaVersion)schemaStorage .versionFromBytes(schemaAndMetadata.version.bytes())).getVersion()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 7dc4e49114cdb..8afa293449dfb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -31,8 +31,8 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.apache.pulsar.client.impl.schema.SchemaHash; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 2a8cbd4246d11..ff27c88e4ec14 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -64,14 +64,14 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.client.impl.schema.SchemaHash; -import org.apache.pulsar.common.protocol.ByteBufPair; -import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.Commands.ChecksumType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.protocol.ByteBufPair; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Commands.ChecksumType; +import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.DateFormatter; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java similarity index 66% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaHash.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java index 7310405ef6ffb..da7657ea777a5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaHash.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java @@ -16,34 +16,45 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl.schema; +package org.apache.pulsar.common.protocol.schema; +import com.google.common.hash.HashCode; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; - +import java.util.Optional; import lombok.EqualsAndHashCode; - import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaInfo; -import java.util.Optional; - -// TODO(yittg): use same hash function with broker, -// move it to pulsar-common and provide it to broker also. +/** + * Schema hash wrapper with a HashCode inner type. + */ @EqualsAndHashCode public class SchemaHash { + private static HashFunction hashFunction = Hashing.sha256(); - private final byte[] value; + private final HashCode hash; - private SchemaHash(byte[] value) { - this.value = value; + private SchemaHash(HashCode hash) { + this.hash = hash; } public static SchemaHash of(Schema schema) { - byte[] schemaBytes = Optional.ofNullable(schema) - .map(Schema::getSchemaInfo) - .map(SchemaInfo::getSchema).orElse(new byte[0]); - return new SchemaHash(hashFunction.hashBytes(schemaBytes).asBytes()); + return of(Optional.ofNullable(schema) + .map(Schema::getSchemaInfo) + .map(SchemaInfo::getSchema).orElse(new byte[0])); + } + + public static SchemaHash of(SchemaData schemaData) { + return of(schemaData.getData()); + } + + private static SchemaHash of(byte[] schemaBytes) { + return new SchemaHash(hashFunction.hashBytes(schemaBytes)); + } + + public byte[] asBytes() { + return hash.asBytes(); } }