Skip to content

Commit

Permalink
Fix get schema version in HttpLookupService. (apache#6193)
Browse files Browse the repository at this point in the history
### Motivation

Fix get schema version in HttpLookupService.  The com.yahoo.sketches.Util.bytesToLong method need to flip the byte[]. Otherwise, will get a wrong long value. So use ByteBuffer to convert byte[] version to long. 

This issue will happens when users use http protocol client and multiple version schemas.

### Verifying this change

New tests added for HttpLookupService and BinaryLookupService.
  • Loading branch information
codelipenghui authored Feb 7, 2020
1 parent d631156 commit 44dd412
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand All @@ -47,8 +53,10 @@
import org.testng.annotations.Test;

import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class SimpleSchemaTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -603,4 +611,29 @@ public void testAutoKeyValueConsume(boolean batching) throws Exception {
}
}

@Test
public void testGetSchemaByVersion() throws PulsarClientException, PulsarAdminException, ExecutionException, InterruptedException {
final String topic = "persistent://my-property/my-ns/testGetSchemaByVersion";

PulsarClientImpl httpProtocolClient = (PulsarClientImpl) PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient;

pulsarClient.newProducer(Schema.AVRO(V1Data.class))
.topic(topic)
.create();

pulsarClient.newProducer(Schema.AVRO(V2Data.class))
.topic(topic)
.create();

LookupService httpLookupService = httpProtocolClient.getLookup();
LookupService binaryLookupService = binaryProtocolClient.getLookup();
Assert.assertTrue(httpLookupService instanceof HttpLookupService);
Assert.assertTrue(binaryLookupService instanceof BinaryProtoLookupService);
Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
Assert.assertTrue(binaryLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(1).array()).get().isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
Expand All @@ -46,9 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.yahoo.sketches.Util.bytesToLong;

class HttpLookupService implements LookupService {
public class HttpLookupService implements LookupService {

private final HttpClient httpClient;
private final boolean useTls;
Expand Down Expand Up @@ -152,7 +151,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
if (version != null) {
path = String.format("admin/v2/schemas/%s/schema/%s",
schemaName,
bytesToLong(version));
ByteBuffer.wrap(version).getLong());
}
httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
Expand Down

0 comments on commit 44dd412

Please sign in to comment.