From 5c4834e6b6952a8d66329a0671a002fbd2a38aa0 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Wed, 5 Aug 2020 10:55:15 -0700 Subject: [PATCH] SAMZA-2575: Fix the flatten output values and add more tests (#1409) --- .../sql/translator/ProjectTranslator.java | 16 ++-- .../test/samzasql/TestSamzaSqlEndToEnd.java | 94 +++++++++++++++---- 2 files changed, 82 insertions(+), 28 deletions(-) diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java index 16a320e559..79f58e85ee 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -269,26 +268,27 @@ private void updateMetrics(long arrivalTime, long outputTime, boolean isNewInput private MessageStream translateFlatten(Integer flattenIndex, MessageStream inputStream) { return inputStream.flatMap(message -> { - Object field = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex); - if (field != null && field instanceof List) { - List outMessages = new ArrayList<>(); + Object targetFlattenColumn = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex); + final List outMessages = new ArrayList<>(); + if (targetFlattenColumn != null && targetFlattenColumn instanceof List) { + List objectList = (List) targetFlattenColumn; SamzaSqlRelMsgMetadata messageMetadata = message.getSamzaSqlRelMsgMetadata(); SamzaSqlRelMsgMetadata newMetadata = new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(), messageMetadata.getArrivalTime(), messageMetadata.getScanTimeNanos(), messageMetadata.getScanTimeMillis()); - for (Object fieldValue : (List) field) { + for (Object fieldValue : objectList) { List newValues = new ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues()); - newValues.set(flattenIndex, Collections.singletonList(fieldValue)); + newValues.set(flattenIndex, fieldValue); outMessages.add( new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues, newMetadata)); newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(), newMetadata.getArrivalTime(), newMetadata.getScanTimeNanos(), newMetadata.getScanTimeMillis()); } - return outMessages; } else { message.getSamzaSqlRelMsgMetadata().isNewInputMessage = true; - return Collections.singletonList(message); + outMessages.add(message); } + return outMessages; }); } diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index ca78af2c2d..4a515c0662 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -27,11 +27,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.avro.generic.GenericRecord; -import org.apache.calcite.plan.RelOptUtil; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.sql.planner.SamzaSqlValidator; @@ -422,15 +422,14 @@ public void testEndToEndWithLike() throws Exception { } @Test - public void testEndToEndFlatten() throws Exception { + public void testEndToEndFlatten() { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); - LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath()); String sql1 = - "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0) " - + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0 " + "Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0, array_values) " + + " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0, array_values" + " from testavro.COMPLEX1"; List sqlStmts = Collections.singletonList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); @@ -442,12 +441,29 @@ public void testEndToEndFlatten() throws Exception { List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); - int expectedMessages = 0; - // Flatten de-normalizes the data. So there is separate record for each entry in the array. - for (int index = 1; index < numMessages; index++) { - expectedMessages = expectedMessages + Math.max(1, index); - } + // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$. + int expectedMessages = (numMessages * (numMessages - 1)) / 2; + //Assert.assertEquals(outMessages.size(), actualList.size()); Assert.assertEquals(expectedMessages, outMessages.size()); + + // check that values are actually not null and within the expected range + Optional nullValueRecord = outMessages.stream() + .map(x -> (GenericRecord) x.getMessage()) + .filter(x -> x.get("string_value") == null) + .findFirst(); + // The String value column is result of dot product thus must be present in the Array column + Optional missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> { + String value = (String) x.get("string_value"); + List arrayValues = (List) x.get("array_values"); + if (arrayValues == null) { + return true; + } + Optional notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny(); + return !notThere.isPresent(); + }).findFirst(); + + Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent()); + Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent()); } @@ -475,7 +491,7 @@ public void testEndToEndComplexRecord() throws SamzaSqlValidatorException { } @Test - public void testEndToEndWithFloatToStringConversion() throws Exception { + public void testEndToEndWithFloatToStringConversion() { int numMessages = 20; TestAvroSystemFactory.messages.clear(); @@ -609,16 +625,34 @@ public void testEndToEndFlattenWithUdf() throws Exception { List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); - int expectedMessages = 0; + // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$. + int expectedMessages = (numMessages * (numMessages - 1)) / 2; // Flatten de-normalizes the data. So there is separate record for each entry in the array. - for (int index = 1; index < numMessages; index++) { - expectedMessages = expectedMessages + Math.max(1, index); - } Assert.assertEquals(expectedMessages, outMessages.size()); + + // check that values are actually not null and within the expected range + Optional nullValueRecord = outMessages.stream() + .map(x -> (GenericRecord) x.getMessage()) + .filter(x -> x.get("id") == null) + .findFirst(); + Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent()); + //TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work. + /* // The String value column is result of dot product thus must be present in the Array column + Optional missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> { + String value = (String) x.get("string_value"); + List arrayValues = (List) x.get("array_values"); + if (arrayValues == null) { + return true; + } + Optional notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny(); + return !notThere.isPresent(); + }).findFirst(); + Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent()); + */ } @Test - public void testEndToEndSubQuery() throws Exception { + public void testEndToEndSubQuery() { int numMessages = 20; TestAvroSystemFactory.messages.clear(); Map staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages); @@ -635,12 +669,32 @@ public void testEndToEndSubQuery() throws Exception { List outMessages = new ArrayList<>(TestAvroSystemFactory.messages); - int expectedMessages = 0; + // Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$. + int expectedMessages = (numMessages * (numMessages - 1)) / 2; // Flatten de-normalizes the data. So there is separate record for each entry in the array. - for (int index = 1; index < numMessages; index++) { - expectedMessages = expectedMessages + Math.max(1, index); - } Assert.assertEquals(expectedMessages, outMessages.size()); + + // check that values are actually not null and within the expected range + Optional nullValueRecord = outMessages.stream() + .map(x -> (GenericRecord) x.getMessage()) + .filter(x -> x.get("id") == null) + .findFirst(); + Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent()); + + //TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work. + /* // The String value column is result of dot product thus must be present in the Array column + Optional missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> { + String value = (String) x.get("string_value"); + List arrayValues = (List) x.get("array_values"); + if (arrayValues == null) { + return true; + } + Optional notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny(); + return !notThere.isPresent(); + }).findFirst(); + Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent()); + */ + } @Test