Skip to content

Commit

Permalink
SAMZA-2575: Fix the flatten output values and add more tests (apache#…
Browse files Browse the repository at this point in the history
b-slim authored Aug 5, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 3b7d0d1 commit 5c4834e
Showing 2 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -269,26 +268,27 @@ private void updateMetrics(long arrivalTime, long outputTime, boolean isNewInput
private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
MessageStream<SamzaSqlRelMessage> inputStream) {
return inputStream.flatMap(message -> {
Object field = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
if (field != null && field instanceof List) {
List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
Object targetFlattenColumn = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
final List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
if (targetFlattenColumn != null && targetFlattenColumn instanceof List) {
List<Object> objectList = (List<Object>) targetFlattenColumn;
SamzaSqlRelMsgMetadata messageMetadata = message.getSamzaSqlRelMsgMetadata();
SamzaSqlRelMsgMetadata newMetadata =
new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(), messageMetadata.getArrivalTime(),
messageMetadata.getScanTimeNanos(), messageMetadata.getScanTimeMillis());
for (Object fieldValue : (List) field) {
for (Object fieldValue : objectList) {
List<Object> newValues = new ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
newValues.set(flattenIndex, Collections.singletonList(fieldValue));
newValues.set(flattenIndex, fieldValue);
outMessages.add(
new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues, newMetadata));
newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(), newMetadata.getArrivalTime(),
newMetadata.getScanTimeNanos(), newMetadata.getScanTimeMillis());
}
return outMessages;
} else {
message.getSamzaSqlRelMsgMetadata().isNewInputMessage = true;
return Collections.singletonList(message);
outMessages.add(message);
}
return outMessages;
});
}

Original file line number Diff line number Diff line change
@@ -27,11 +27,11 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.planner.SamzaSqlValidator;
@@ -422,15 +422,14 @@ public void testEndToEndWithLike() throws Exception {
}

@Test
public void testEndToEndFlatten() throws Exception {
public void testEndToEndFlatten() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);

LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
String sql1 =
"Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0) "
+ " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0 "
"Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0, array_values) "
+ " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0, array_values"
+ " from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
@@ -442,12 +441,29 @@ public void testEndToEndFlatten() throws Exception {

List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);

int expectedMessages = 0;
// Flatten de-normalizes the data. So there is separate record for each entry in the array.
for (int index = 1; index < numMessages; index++) {
expectedMessages = expectedMessages + Math.max(1, index);
}
// Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
int expectedMessages = (numMessages * (numMessages - 1)) / 2;
//Assert.assertEquals(outMessages.size(), actualList.size());
Assert.assertEquals(expectedMessages, outMessages.size());

// check that values are actually not null and within the expected range
Optional<GenericRecord> nullValueRecord = outMessages.stream()
.map(x -> (GenericRecord) x.getMessage())
.filter(x -> x.get("string_value") == null)
.findFirst();
// The String value column is result of dot product thus must be present in the Array column
Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
String value = (String) x.get("string_value");
List<Object> arrayValues = (List<Object>) x.get("array_values");
if (arrayValues == null) {
return true;
}
Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
return !notThere.isPresent();
}).findFirst();

Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent());
Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
}


@@ -475,7 +491,7 @@ public void testEndToEndComplexRecord() throws SamzaSqlValidatorException {
}

@Test
public void testEndToEndWithFloatToStringConversion() throws Exception {
public void testEndToEndWithFloatToStringConversion() {
int numMessages = 20;

TestAvroSystemFactory.messages.clear();
@@ -609,16 +625,34 @@ public void testEndToEndFlattenWithUdf() throws Exception {

List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);

int expectedMessages = 0;
// Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
int expectedMessages = (numMessages * (numMessages - 1)) / 2;
// Flatten de-normalizes the data. So there is separate record for each entry in the array.
for (int index = 1; index < numMessages; index++) {
expectedMessages = expectedMessages + Math.max(1, index);
}
Assert.assertEquals(expectedMessages, outMessages.size());

// check that values are actually not null and within the expected range
Optional<GenericRecord> nullValueRecord = outMessages.stream()
.map(x -> (GenericRecord) x.getMessage())
.filter(x -> x.get("id") == null)
.findFirst();
Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent());
//TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
/* // The String value column is result of dot product thus must be present in the Array column
Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
String value = (String) x.get("string_value");
List<Object> arrayValues = (List<Object>) x.get("array_values");
if (arrayValues == null) {
return true;
}
Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
return !notThere.isPresent();
}).findFirst();
Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
*/
}

@Test
public void testEndToEndSubQuery() throws Exception {
public void testEndToEndSubQuery() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
@@ -635,12 +669,32 @@ public void testEndToEndSubQuery() throws Exception {

List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);

int expectedMessages = 0;
// Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
int expectedMessages = (numMessages * (numMessages - 1)) / 2;
// Flatten de-normalizes the data. So there is separate record for each entry in the array.
for (int index = 1; index < numMessages; index++) {
expectedMessages = expectedMessages + Math.max(1, index);
}
Assert.assertEquals(expectedMessages, outMessages.size());

// check that values are actually not null and within the expected range
Optional<GenericRecord> nullValueRecord = outMessages.stream()
.map(x -> (GenericRecord) x.getMessage())
.filter(x -> x.get("id") == null)
.findFirst();
Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent());

//TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
/* // The String value column is result of dot product thus must be present in the Array column
Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
String value = (String) x.get("string_value");
List<Object> arrayValues = (List<Object>) x.get("array_values");
if (arrayValues == null) {
return true;
}
Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
return !notThere.isPresent();
}).findFirst();
Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
*/

}

@Test

0 comments on commit 5c4834e

Please sign in to comment.