Skip to content

Commit

Permalink
[Pulsar SQL] Fix Pulsar SQL query bytes schema data error (apache#9631)
Browse files Browse the repository at this point in the history
### Motivation

Currently, the Pulsar SQL query bytes schema data will cause an error.

*Reproduce*
1. produce bytes schema data.
2. query data by the Pulsar SQL.
3. the error log could be seen.

*Error log*
```
com.google.common.util.concurrent.UncheckedExecutionException: java.nio.BufferUnderflowException
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.getSchemaByVersion(PulsarSqlSchemaInfoProvider.java:76)
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:471)
	at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
	at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
	at io.prestosql.operator.Driver.processInternal(Driver.java:379)
	at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
	at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
	at io.prestosql.operator.Driver.processFor(Driver.java:276)
	at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.prestosql.$gen.Presto_332__testversion____20210219_094906_2.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.BufferUnderflowException
	at java.nio.Buffer.nextGetIndex(Buffer.java:509)
	at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:415)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.loadSchema(PulsarSqlSchemaInfoProvider.java:106)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.access$000(PulsarSqlSchemaInfoProvider.java:49)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:61)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:58)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
	... 18 more

```

### Modifications

Add check for bytes schema, if the schema is bytes schema use the schema info of the bytes schema directly.

### Verifying this change

Add a new integration test for different schemas.

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)
  • Loading branch information
gaoran10 authored Feb 25, 2021
1 parent e65deaf commit 371b311
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
Expand Down Expand Up @@ -441,9 +442,11 @@ public boolean advanceNextPosition() {
//start time for deseralizing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();

SchemaInfo schemaInfo;
SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
try {
schemaInfo = schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
if (schemaInfo == null) {
schemaInfo = schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -560,6 +563,18 @@ public boolean advanceNextPosition() {
return true;
}

private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String schemaName) {
if (!schemaType.equals(SchemaType.BYTES) && !schemaType.equals(SchemaType.NONE)) {
return null;
}
if (schemaName.equals(Schema.BYTES.getSchemaInfo().getName())) {
return Schema.BYTES.getSchemaInfo();
} else if (schemaName.equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
return Schema.BYTEBUFFER.getSchemaInfo();
} else {
return Schema.BYTES.getSchemaInfo();
}
}

@Override
public boolean getBoolean(int field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@
*/
package org.apache.pulsar.tests.integration.presto;

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.ByteBuffer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.awaitility.Awaitility;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Test basic Pulsar SQL query, the Pulsar SQL is standalone mode.
*/
@Slf4j
public class TestBasicPresto extends TestPulsarSQLBase {

Expand All @@ -55,85 +60,124 @@ public void teardownPresto() {
pulsarCluster.stopPrestoWorker();
}

@DataProvider(name = "schemaProvider")
public Object[][] schemaProvider() {
return new Object[][] {
{ Schema.BYTES},
{ Schema.BYTEBUFFER},
{ Schema.STRING},
{ AvroSchema.of(Stock.class)},
{ JSONSchema.of(Stock.class)},
{ Schema.KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), KeyValueEncodingType.INLINE) },
{ Schema.KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), KeyValueEncodingType.SEPARATED) }
};
}

@Test
public void testSimpleSQLQueryBatched() throws Exception {
TopicName topicName = TopicName.get("public/default/stocks_batched_" + randomName(5));
pulsarSQLBasicTest(topicName, true, false);
pulsarSQLBasicTest(topicName, true, false, JSONSchema.of(Stock.class));
}

@Test
public void testSimpleSQLQueryNonBatched() throws Exception {
TopicName topicName = TopicName.get("public/default/stocks_nonbatched_" + randomName(5));
pulsarSQLBasicTest(topicName, false, false);
pulsarSQLBasicTest(topicName, false, false, JSONSchema.of(Stock.class));
}

@DataProvider(name = "keyValueEncodingType")
public Object[][] keyValueEncodingType() {
return new Object[][] { { KeyValueEncodingType.INLINE }, { KeyValueEncodingType.SEPARATED } };
@Test(dataProvider = "schemaProvider")
public void testForSchema(Schema schema) throws Exception {
String schemaFlag;
if (schema.getSchemaInfo().getType().isStruct()) {
schemaFlag = schema.getSchemaInfo().getType().name();
} else if(schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
schemaFlag = schema.getSchemaInfo().getType().name() + "_"
+ ((KeyValueSchema) schema).getKeyValueEncodingType();
} else {
// Because some schema types are same(such as BYTES and BYTEBUFFER), so use the schema name as flag.
schemaFlag = schema.getSchemaInfo().getName();
}
String topic = String.format("public/default/schema_%s_test_%s", schemaFlag, randomName(5)).toLowerCase();
pulsarSQLBasicTest(TopicName.get(topic), false, false, schema);
}

@Test(dataProvider = "keyValueEncodingType")
public void testKeyValueSchema(KeyValueEncodingType type) throws Exception {
waitPulsarSQLReady();
TopicName topicName = TopicName.get("public/default/stocks" + randomName(20));
@Override
protected int prepareData(TopicName topicName,
boolean isBatch,
boolean useNsOffloadPolices,
Schema schema) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();

if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) {
prepareDataForBytesSchema(pulsarClient, topicName, isBatch);
} else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) {
prepareDataForStringSchema(pulsarClient, topicName, isBatch);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON)
|| schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) {
prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
prepareDataForKeyValueSchema(pulsarClient, topicName, schema);
}

return NUM_OF_STOCKS;
}

private void prepareDataForBytesSchema(PulsarClient pulsarClient,
TopicName topicName,
boolean isBatch) throws PulsarClientException {
@Cleanup
Producer<KeyValue<Stock,Stock>> producer = pulsarClient.newProducer(Schema
.KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), type))
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName.toString())
.enableBatching(isBatch)
.create();

for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
int j = 100 * i;
final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
producer.send(new KeyValue<>(stock1, stock2));
producer.send(("bytes schema test" + i).getBytes());
}

producer.flush();

validateMetadata(topicName);

Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
ContainerExecResult containerExecResult = execQuery(
String.format("select * from pulsar.\"%s\".\"%s\" order by entryid;",
topicName.getNamespace(), topicName.getLocalName()));
assertThat(containerExecResult.getExitCode()).isEqualTo(0);
log.info("select sql query output \n{}", containerExecResult.getStdout());
String[] split = containerExecResult.getStdout().split("\n");
assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
String[] split2 = containerExecResult.getStdout().split("\n|,");
for (int i = 0; i < NUM_OF_STOCKS; ++i) {
int j = 100 * i;
assertThat(split2).contains("\"" + i + "\"");
assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");

assertThat(split2).contains("\"" + j + "\"");
assertThat(split2).contains("\"" + "STOCK_" + j + "\"");
assertThat(split2).contains("\"" + (100.0 + j * 10) + "\"");
}
}
);

}

private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
TopicName topicName,
boolean isBatch) throws PulsarClientException {
@Cleanup
Producer<ByteBuffer> producer = pulsarClient.newProducer(Schema.BYTEBUFFER)
.topic(topicName.toString())
.enableBatching(isBatch)
.create();

for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
producer.send(ByteBuffer.wrap(("bytes schema test" + i).getBytes()));
}
producer.flush();
}

@Override
protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception {
private void prepareDataForStringSchema(PulsarClient pulsarClient,
TopicName topicName,
boolean isBatch) throws PulsarClientException {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName.toString())
.enableBatching(isBatch)
.create();

for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
producer.send("string" + i);
}
producer.flush();
}

private void prepareDataForStructSchema(PulsarClient pulsarClient,
TopicName topicName,
boolean isBatch,
Schema<Stock> schema) throws Exception {
@Cleanup
Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
Producer<Stock> producer = pulsarClient.newProducer(schema)
.topic(topicName.toString())
.enableBatching(isBatch)
.create();
Expand All @@ -143,7 +187,71 @@ protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOff
producer.send(stock);
}
producer.flush();
return NUM_OF_STOCKS;
}

private void prepareDataForKeyValueSchema(PulsarClient pulsarClient,
TopicName topicName,
Schema<KeyValue<Stock, Stock>> schema) throws Exception {
@Cleanup
Producer<KeyValue<Stock,Stock>> producer = pulsarClient.newProducer(schema)
.topic(topicName.toString())
.create();

for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
int j = 100 * i;
final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
producer.send(new KeyValue<>(stock1, stock2));
}
}

@Override
protected void validateContent(int messageNum, String[] contentArr, Schema schema) {
switch (schema.getSchemaInfo().getType()) {
case BYTES:
log.info("Skip validate content for BYTES schema type.");
break;
case STRING:
validateContentForStringSchema(messageNum, contentArr);
log.info("finish validate content for STRING schema type.");
break;
case JSON:
case AVRO:
validateContentForStructSchema(messageNum, contentArr);
log.info("finish validate content for {} schema type.", schema.getSchemaInfo().getType());
break;
case KEY_VALUE:
validateContentForKeyValueSchema(messageNum, contentArr);
log.info("finish validate content for KEY_VALUE {} schema type.",
((KeyValueSchema) schema).getKeyValueEncodingType());
}
}

private void validateContentForStringSchema(int messageNum, String[] contentArr) {
for (int i = 0; i < messageNum; i++) {
assertThat(contentArr).contains("\"string" + i + "\"");
}
}

private void validateContentForStructSchema(int messageNum, String[] contentArr) {
for (int i = 0; i < messageNum; ++i) {
assertThat(contentArr).contains("\"" + i + "\"");
assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
}
}

private void validateContentForKeyValueSchema(int messageNum, String[] contentArr) {
for (int i = 0; i < messageNum; ++i) {
int j = 100 * i;
assertThat(contentArr).contains("\"" + i + "\"");
assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");

assertThat(contentArr).contains("\"" + j + "\"");
assertThat(contentArr).contains("\"" + "STOCK_" + j + "\"");
assertThat(contentArr).contains("\"" + (100.0 + j * 10) + "\"");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.presto;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,6 +29,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
Expand All @@ -43,7 +45,7 @@


/**
* Test presto query from tiered storage.
* Test presto query from tiered storage, the Pulsar SQL is cluster mode.
*/
@Slf4j
public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
Expand Down Expand Up @@ -106,18 +108,21 @@ public void teardownPresto() {
public void testQueryTieredStorage1() throws Exception {
TopicName topicName = TopicName.get(
TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_nons_" + randomName(5));
pulsarSQLBasicTest(topicName, false, false);
pulsarSQLBasicTest(topicName, false, false, JSONSchema.of(Stock.class));
}

@Test
public void testQueryTieredStorage2() throws Exception {
TopicName topicName = TopicName.get(
TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_ns_" + randomName(5));
pulsarSQLBasicTest(topicName, false, true);
pulsarSQLBasicTest(topicName, false, true, JSONSchema.of(Stock.class));
}

@Override
protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception {
protected int prepareData(TopicName topicName,
boolean isBatch,
boolean useNsOffloadPolices,
Schema schema) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
Expand Down Expand Up @@ -206,4 +211,13 @@ private void offloadAndDeleteFromBK(boolean useNsOffloadPolices, TopicName topic
}
}

@Override
protected void validateContent(int messageNum, String[] contentArr, Schema schema) {
for (int i = 0; i < messageNum; ++i) {
assertThat(contentArr).contains("\"" + i + "\"");
assertThat(contentArr).contains("\"" + "STOCK_" + i + "\"");
assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\"");
}
}

}
Loading

0 comments on commit 371b311

Please sign in to comment.