From 371b311eac93ac1cf5d5a3ceeeb36f7ae11538a1 Mon Sep 17 00:00:00 2001 From: ran Date: Thu, 25 Feb 2021 16:49:53 +0800 Subject: [PATCH] [Pulsar SQL] Fix Pulsar SQL query bytes schema data error (#9631) ### Motivation Currently, the Pulsar SQL query bytes schema data will cause an error. *Reproduce* 1. produce bytes schema data. 2. query data by the Pulsar SQL. 3. the error log could be seen. *Error log* ``` com.google.common.util.concurrent.UncheckedExecutionException: java.nio.BufferUnderflowException at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) at com.google.common.cache.LocalCache.get(LocalCache.java:3951) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.getSchemaByVersion(PulsarSqlSchemaInfoProvider.java:76) at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:471) at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90) at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302) at io.prestosql.operator.Driver.processInternal(Driver.java:379) at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283) at io.prestosql.operator.Driver.tryWithLock(Driver.java:675) at io.prestosql.operator.Driver.processFor(Driver.java:276) at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075) at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163) at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484) at io.prestosql.$gen.Presto_332__testversion____20210219_094906_2.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Buffer.java:509) at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:415) at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.loadSchema(PulsarSqlSchemaInfoProvider.java:106) at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.access$000(PulsarSqlSchemaInfoProvider.java:49) at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:61) at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:58) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ... 18 more ``` ### Modifications Add check for bytes schema, if the schema is bytes schema use the schema info of the bytes schema directly. ### Verifying this change Add a new integration test for different schemas. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) - The rest endpoints: (no) - The admin cli options: (no) - Anything that affects deployment: (no) --- .../pulsar/sql/presto/PulsarRecordCursor.java | 19 +- .../integration/presto/TestBasicPresto.java | 216 +++++++++++++----- .../presto/TestPrestoQueryTieredStorage.java | 22 +- .../integration/presto/TestPulsarSQLBase.java | 49 ++-- 4 files changed, 233 insertions(+), 73 deletions(-) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index c782de693559f..1171902acb601 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.common.api.raw.MessageParser; import org.apache.pulsar.common.api.raw.RawMessage; @@ -441,9 +442,11 @@ public boolean advanceNextPosition() { //start time for deseralizing record metricsTracker.start_RECORD_DESERIALIZE_TIME(); - SchemaInfo schemaInfo; + SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName()); try { - schemaInfo = schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get(); + if (schemaInfo == null) { + schemaInfo = schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get(); + } } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -560,6 +563,18 @@ public boolean advanceNextPosition() { return true; } + private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String schemaName) { + if (!schemaType.equals(SchemaType.BYTES) && !schemaType.equals(SchemaType.NONE)) { + return null; + } + if (schemaName.equals(Schema.BYTES.getSchemaInfo().getName())) { + return Schema.BYTES.getSchemaInfo(); + } else if (schemaName.equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) { + return Schema.BYTEBUFFER.getSchemaInfo(); + } else { + return Schema.BYTES.getSchemaInfo(); + } + } @Override public boolean getBoolean(int field) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 5597a95703caa..fe8ba7eed6ad9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -18,26 +18,31 @@ */ package org.apache.pulsar.tests.integration.presto; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; -import org.apache.pulsar.tests.integration.docker.ContainerExecResult; -import org.awaitility.Awaitility; +import org.apache.pulsar.common.schema.SchemaType; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.util.concurrent.TimeUnit; - -import static org.assertj.core.api.Assertions.assertThat; +/** + * Test basic Pulsar SQL query, the Pulsar SQL is standalone mode. + */ @Slf4j public class TestBasicPresto extends TestPulsarSQLBase { @@ -55,85 +60,124 @@ public void teardownPresto() { pulsarCluster.stopPrestoWorker(); } + @DataProvider(name = "schemaProvider") + public Object[][] schemaProvider() { + return new Object[][] { + { Schema.BYTES}, + { Schema.BYTEBUFFER}, + { Schema.STRING}, + { AvroSchema.of(Stock.class)}, + { JSONSchema.of(Stock.class)}, + { Schema.KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), KeyValueEncodingType.INLINE) }, + { Schema.KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), KeyValueEncodingType.SEPARATED) } + }; + } + @Test public void testSimpleSQLQueryBatched() throws Exception { TopicName topicName = TopicName.get("public/default/stocks_batched_" + randomName(5)); - pulsarSQLBasicTest(topicName, true, false); + pulsarSQLBasicTest(topicName, true, false, JSONSchema.of(Stock.class)); } @Test public void testSimpleSQLQueryNonBatched() throws Exception { TopicName topicName = TopicName.get("public/default/stocks_nonbatched_" + randomName(5)); - pulsarSQLBasicTest(topicName, false, false); + pulsarSQLBasicTest(topicName, false, false, JSONSchema.of(Stock.class)); } - @DataProvider(name = "keyValueEncodingType") - public Object[][] keyValueEncodingType() { - return new Object[][] { { KeyValueEncodingType.INLINE }, { KeyValueEncodingType.SEPARATED } }; + @Test(dataProvider = "schemaProvider") + public void testForSchema(Schema schema) throws Exception { + String schemaFlag; + if (schema.getSchemaInfo().getType().isStruct()) { + schemaFlag = schema.getSchemaInfo().getType().name(); + } else if(schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) { + schemaFlag = schema.getSchemaInfo().getType().name() + "_" + + ((KeyValueSchema) schema).getKeyValueEncodingType(); + } else { + // Because some schema types are same(such as BYTES and BYTEBUFFER), so use the schema name as flag. + schemaFlag = schema.getSchemaInfo().getName(); + } + String topic = String.format("public/default/schema_%s_test_%s", schemaFlag, randomName(5)).toLowerCase(); + pulsarSQLBasicTest(TopicName.get(topic), false, false, schema); } - @Test(dataProvider = "keyValueEncodingType") - public void testKeyValueSchema(KeyValueEncodingType type) throws Exception { - waitPulsarSQLReady(); - TopicName topicName = TopicName.get("public/default/stocks" + randomName(20)); + @Override + protected int prepareData(TopicName topicName, + boolean isBatch, + boolean useNsOffloadPolices, + Schema schema) throws Exception { @Cleanup PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .build(); + if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) { + prepareDataForBytesSchema(pulsarClient, topicName, isBatch); + } else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) { + prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch); + } else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) { + prepareDataForStringSchema(pulsarClient, topicName, isBatch); + } else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON) + || schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) { + prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema); + } else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) { + prepareDataForKeyValueSchema(pulsarClient, topicName, schema); + } + + return NUM_OF_STOCKS; + } + + private void prepareDataForBytesSchema(PulsarClient pulsarClient, + TopicName topicName, + boolean isBatch) throws PulsarClientException { @Cleanup - Producer> producer = pulsarClient.newProducer(Schema - .KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class), type)) + Producer producer = pulsarClient.newProducer(Schema.BYTES) .topic(topicName.toString()) + .enableBatching(isBatch) .create(); for (int i = 0 ; i < NUM_OF_STOCKS; ++i) { - int j = 100 * i; - final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10); - final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10); - producer.send(new KeyValue<>(stock1, stock2)); + producer.send(("bytes schema test" + i).getBytes()); } - producer.flush(); - - validateMetadata(topicName); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( - () -> { - ContainerExecResult containerExecResult = execQuery( - String.format("select * from pulsar.\"%s\".\"%s\" order by entryid;", - topicName.getNamespace(), topicName.getLocalName())); - assertThat(containerExecResult.getExitCode()).isEqualTo(0); - log.info("select sql query output \n{}", containerExecResult.getStdout()); - String[] split = containerExecResult.getStdout().split("\n"); - assertThat(split.length).isEqualTo(NUM_OF_STOCKS); - String[] split2 = containerExecResult.getStdout().split("\n|,"); - for (int i = 0; i < NUM_OF_STOCKS; ++i) { - int j = 100 * i; - assertThat(split2).contains("\"" + i + "\""); - assertThat(split2).contains("\"" + "STOCK_" + i + "\""); - assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); - - assertThat(split2).contains("\"" + j + "\""); - assertThat(split2).contains("\"" + "STOCK_" + j + "\""); - assertThat(split2).contains("\"" + (100.0 + j * 10) + "\""); - } - } - ); - } + private void prepareDataForByteBufferSchema(PulsarClient pulsarClient, + TopicName topicName, + boolean isBatch) throws PulsarClientException { + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.BYTEBUFFER) + .topic(topicName.toString()) + .enableBatching(isBatch) + .create(); + for (int i = 0 ; i < NUM_OF_STOCKS; ++i) { + producer.send(ByteBuffer.wrap(("bytes schema test" + i).getBytes())); + } + producer.flush(); + } - @Override - protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception { + private void prepareDataForStringSchema(PulsarClient pulsarClient, + TopicName topicName, + boolean isBatch) throws PulsarClientException { @Cleanup - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) - .build(); + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName.toString()) + .enableBatching(isBatch) + .create(); + for (int i = 0 ; i < NUM_OF_STOCKS; ++i) { + producer.send("string" + i); + } + producer.flush(); + } + + private void prepareDataForStructSchema(PulsarClient pulsarClient, + TopicName topicName, + boolean isBatch, + Schema schema) throws Exception { @Cleanup - Producer producer = pulsarClient.newProducer(JSONSchema.of(Stock.class)) + Producer producer = pulsarClient.newProducer(schema) .topic(topicName.toString()) .enableBatching(isBatch) .create(); @@ -143,7 +187,71 @@ protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOff producer.send(stock); } producer.flush(); - return NUM_OF_STOCKS; + } + + private void prepareDataForKeyValueSchema(PulsarClient pulsarClient, + TopicName topicName, + Schema> schema) throws Exception { + @Cleanup + Producer> producer = pulsarClient.newProducer(schema) + .topic(topicName.toString()) + .create(); + + for (int i = 0 ; i < NUM_OF_STOCKS; ++i) { + int j = 100 * i; + final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10); + final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10); + producer.send(new KeyValue<>(stock1, stock2)); + } + } + + @Override + protected void validateContent(int messageNum, String[] contentArr, Schema schema) { + switch (schema.getSchemaInfo().getType()) { + case BYTES: + log.info("Skip validate content for BYTES schema type."); + break; + case STRING: + validateContentForStringSchema(messageNum, contentArr); + log.info("finish validate content for STRING schema type."); + break; + case JSON: + case AVRO: + validateContentForStructSchema(messageNum, contentArr); + log.info("finish validate content for {} schema type.", schema.getSchemaInfo().getType()); + break; + case KEY_VALUE: + validateContentForKeyValueSchema(messageNum, contentArr); + log.info("finish validate content for KEY_VALUE {} schema type.", + ((KeyValueSchema) schema).getKeyValueEncodingType()); + } + } + + private void validateContentForStringSchema(int messageNum, String[] contentArr) { + for (int i = 0; i < messageNum; i++) { + assertThat(contentArr).contains("\"string" + i + "\""); + } + } + + private void validateContentForStructSchema(int messageNum, String[] contentArr) { + for (int i = 0; i < messageNum; ++i) { + assertThat(contentArr).contains("\"" + i + "\""); + assertThat(contentArr).contains("\"" + "STOCK_" + i + "\""); + assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\""); + } + } + + private void validateContentForKeyValueSchema(int messageNum, String[] contentArr) { + for (int i = 0; i < messageNum; ++i) { + int j = 100 * i; + assertThat(contentArr).contains("\"" + i + "\""); + assertThat(contentArr).contains("\"" + "STOCK_" + i + "\""); + assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\""); + + assertThat(contentArr).contains("\"" + j + "\""); + assertThat(contentArr).contains("\"" + "STOCK_" + j + "\""); + assertThat(contentArr).contains("\"" + (100.0 + j * 10) + "\""); + } } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java index 80374e95ead07..e7c3cd4efb443 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.presto; import static com.google.common.base.Preconditions.checkNotNull; +import static org.assertj.core.api.Assertions.assertThat; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -28,6 +29,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.schema.JSONSchema; @@ -43,7 +45,7 @@ /** - * Test presto query from tiered storage. + * Test presto query from tiered storage, the Pulsar SQL is cluster mode. */ @Slf4j public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase { @@ -106,18 +108,21 @@ public void teardownPresto() { public void testQueryTieredStorage1() throws Exception { TopicName topicName = TopicName.get( TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_nons_" + randomName(5)); - pulsarSQLBasicTest(topicName, false, false); + pulsarSQLBasicTest(topicName, false, false, JSONSchema.of(Stock.class)); } @Test public void testQueryTieredStorage2() throws Exception { TopicName topicName = TopicName.get( TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_ns_" + randomName(5)); - pulsarSQLBasicTest(topicName, false, true); + pulsarSQLBasicTest(topicName, false, true, JSONSchema.of(Stock.class)); } @Override - protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception { + protected int prepareData(TopicName topicName, + boolean isBatch, + boolean useNsOffloadPolices, + Schema schema) throws Exception { @Cleanup PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) @@ -206,4 +211,13 @@ private void offloadAndDeleteFromBK(boolean useNsOffloadPolices, TopicName topic } } + @Override + protected void validateContent(int messageNum, String[] contentArr, Schema schema) { + for (int i = 0; i < messageNum; ++i) { + assertThat(contentArr).contains("\"" + i + "\""); + assertThat(contentArr).contains("\"" + "STOCK_" + i + "\""); + assertThat(contentArr).contains("\"" + (100.0 + i * 10) + "\""); + } + } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java index 5781549369784..91186f4629685 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java @@ -31,7 +31,9 @@ import java.util.List; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.suites.PulsarSQLTestSuite; @@ -42,19 +44,30 @@ import org.testcontainers.shaded.okhttp3.Response; import org.testng.Assert; + +/** + * Pulsar SQL test base. + */ @Slf4j public class TestPulsarSQLBase extends PulsarSQLTestSuite { - protected void pulsarSQLBasicTest(TopicName topic, boolean isBatch, boolean useNsOffloadPolices) throws Exception { + protected void pulsarSQLBasicTest(TopicName topic, + boolean isBatch, + boolean useNsOffloadPolices, + Schema schema) throws Exception { + log.info("Pulsar SQL basic test. topic: {}", topic); + waitPulsarSQLReady(); log.info("start prepare data for query. topic: {}", topic); - int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices); + int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices, schema); log.info("finish prepare data for query. topic: {}, messageCnt: {}", topic, messageCnt); validateMetadata(topic); - validateData(topic, messageCnt); + validateData(topic, messageCnt, schema); + + log.info("Finish Pulsar SQL basic test. topic: {}", topic); } public void waitPulsarSQLReady() throws Exception { @@ -98,7 +111,10 @@ public void waitPulsarSQLReady() throws Exception { } } - protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception { + protected int prepareData(TopicName topicName, + boolean isBatch, + boolean useNsOffloadPolices, + Schema schema) throws Exception { throw new Exception("Unsupported operation prepareData."); } @@ -122,24 +138,31 @@ public void validateMetadata(TopicName topicName) throws Exception { ); } - public void validateData(TopicName topicName, int messageNum) throws Exception { + protected void validateContent(int messageNum, String[] contentArr, Schema schema) throws Exception { + throw new Exception("Unsupported operation validateContent."); + } + + private void validateData(TopicName topicName, int messageNum, Schema schema) throws Exception { String namespace = topicName.getNamespace(); String topic = topicName.getLocalName(); + final String queryAllDataSql; + if (schema.getSchemaInfo().getType().isStruct() + || schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) { + queryAllDataSql = String.format("select * from pulsar.\"%s\".\"%s\" order by entryid;", namespace, topic); + } else { + queryAllDataSql = String.format("select * from pulsar.\"%s\".\"%s\";", namespace, topic); + } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( () -> { - ContainerExecResult containerExecResult = execQuery( - String.format("select * from pulsar.\"%s\".\"%s\" order by entryid;", namespace, topic)); + ContainerExecResult containerExecResult = execQuery(queryAllDataSql); assertThat(containerExecResult.getExitCode()).isEqualTo(0); log.info("select sql query output \n{}", containerExecResult.getStdout()); String[] split = containerExecResult.getStdout().split("\n"); assertThat(split.length).isEqualTo(messageNum); - String[] split2 = containerExecResult.getStdout().split("\n|,"); - for (int i = 0; i < messageNum; ++i) { - assertThat(split2).contains("\"" + i + "\""); - assertThat(split2).contains("\"" + "STOCK_" + i + "\""); - assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); - } + String[] contentArr = containerExecResult.getStdout().split("\n|,"); + validateContent(messageNum, contentArr, schema); } );