From 44dd4129941ebf4018509ac77b9dda7efc64febe Mon Sep 17 00:00:00 2001 From: lipenghui Date: Fri, 7 Feb 2020 08:43:01 +0800 Subject: [PATCH] Fix get schema version in HttpLookupService. (#6193) ### Motivation Fix get schema version in HttpLookupService. The com.yahoo.sketches.Util.bytesToLong method need to flip the byte[]. Otherwise, will get a wrong long value. So use ByteBuffer to convert byte[] version to long. This issue will happens when users use http protocol client and multiple version schemas. ### Verifying this change New tests added for HttpLookupService and BinaryLookupService. --- .../pulsar/client/api/SimpleSchemaTest.java | 33 +++++++++++++++++++ .../pulsar/client/impl/HttpLookupService.java | 7 ++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index aa0f0e0c9018b..554808966ebed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -31,10 +31,16 @@ import org.apache.avro.reflect.ReflectData; import org.apache.avro.Schema.Parser; import org.apache.pulsar.broker.service.schema.LongSchemaVersion; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException; import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.BinaryProtoLookupService; +import org.apache.pulsar.client.impl.HttpLookupService; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.writer.AvroWriter; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -47,8 +53,10 @@ import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class SimpleSchemaTest extends ProducerConsumerBase { @@ -603,4 +611,29 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception { } } + @Test + public void testGetSchemaByVersion() throws PulsarClientException, PulsarAdminException, ExecutionException, InterruptedException { + final String topic = "persistent://my-property/my-ns/testGetSchemaByVersion"; + + PulsarClientImpl httpProtocolClient = (PulsarClientImpl) PulsarClient.builder().serviceUrl(brokerUrl.toString()).build(); + PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient; + + pulsarClient.newProducer(Schema.AVRO(V1Data.class)) + .topic(topic) + .create(); + + pulsarClient.newProducer(Schema.AVRO(V2Data.class)) + .topic(topic) + .create(); + + LookupService httpLookupService = httpProtocolClient.getLookup(); + LookupService binaryLookupService = binaryProtocolClient.getLookup(); + Assert.assertTrue(httpLookupService instanceof HttpLookupService); + Assert.assertTrue(binaryLookupService instanceof BinaryProtoLookupService); + Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); + Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent()); + Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent()); + Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent()); + Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 6a0746c8af327..5bc3bcd135fd1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.net.URI; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Base64; import java.util.List; @@ -46,9 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.yahoo.sketches.Util.bytesToLong; - -class HttpLookupService implements LookupService { +public class HttpLookupService implements LookupService { private final HttpClient httpClient; private final boolean useTls; @@ -152,7 +151,7 @@ public CompletableFuture> getSchema(TopicName topicName, by if (version != null) { path = String.format("admin/v2/schemas/%s/schema/%s", schemaName, - bytesToLong(version)); + ByteBuffer.wrap(version).getLong()); } httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> { future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));