Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SAMZA-2575] Fix the flatten output values and add more tests #1409

Merged
merged 1 commit into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -269,26 +268,27 @@ private void updateMetrics(long arrivalTime, long outputTime, boolean isNewInput
private MessageStream<SamzaSqlRelMessage> translateFlatten(Integer flattenIndex,
MessageStream<SamzaSqlRelMessage> inputStream) {
return inputStream.flatMap(message -> {
Object field = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
if (field != null && field instanceof List) {
List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
Object targetFlattenColumn = message.getSamzaSqlRelRecord().getFieldValues().get(flattenIndex);
final List<SamzaSqlRelMessage> outMessages = new ArrayList<>();
if (targetFlattenColumn != null && targetFlattenColumn instanceof List) {
List<Object> objectList = (List<Object>) targetFlattenColumn;
SamzaSqlRelMsgMetadata messageMetadata = message.getSamzaSqlRelMsgMetadata();
SamzaSqlRelMsgMetadata newMetadata =
new SamzaSqlRelMsgMetadata(messageMetadata.getEventTime(), messageMetadata.getArrivalTime(),
messageMetadata.getScanTimeNanos(), messageMetadata.getScanTimeMillis());
for (Object fieldValue : (List) field) {
for (Object fieldValue : objectList) {
List<Object> newValues = new ArrayList<>(message.getSamzaSqlRelRecord().getFieldValues());
newValues.set(flattenIndex, Collections.singletonList(fieldValue));
newValues.set(flattenIndex, fieldValue);
outMessages.add(
new SamzaSqlRelMessage(message.getSamzaSqlRelRecord().getFieldNames(), newValues, newMetadata));
newMetadata = new SamzaSqlRelMsgMetadata(newMetadata.getEventTime(), newMetadata.getArrivalTime(),
newMetadata.getScanTimeNanos(), newMetadata.getScanTimeMillis());
}
return outMessages;
} else {
message.getSamzaSqlRelMsgMetadata().isNewInputMessage = true;
return Collections.singletonList(message);
outMessages.add(message);
}
return outMessages;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.planner.SamzaSqlValidator;
Expand Down Expand Up @@ -422,15 +422,14 @@ public void testEndToEndWithLike() throws Exception {
}

@Test
public void testEndToEndFlatten() throws Exception {
public void testEndToEndFlatten() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);

LOG.info(" Class Path : " + RelOptUtil.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
String sql1 =
"Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0) "
+ " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0 "
"Insert into testavro.outputTopic(string_value, id, bool_value, bytes_value, fixed_value, float_value0, array_values) "
+ " select Flatten(array_values) as string_value, id, NOT(id = 5) as bool_value, bytes_value, fixed_value, float_value0, array_values"
+ " from testavro.COMPLEX1";
List<String> sqlStmts = Collections.singletonList(sql1);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
Expand All @@ -442,12 +441,29 @@ public void testEndToEndFlatten() throws Exception {

List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);

int expectedMessages = 0;
// Flatten de-normalizes the data. So there is separate record for each entry in the array.
for (int index = 1; index < numMessages; index++) {
expectedMessages = expectedMessages + Math.max(1, index);
}
// Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
int expectedMessages = (numMessages * (numMessages - 1)) / 2;
//Assert.assertEquals(outMessages.size(), actualList.size());
Assert.assertEquals(expectedMessages, outMessages.size());

// check that values are actually not null and within the expected range
Optional<GenericRecord> nullValueRecord = outMessages.stream()
.map(x -> (GenericRecord) x.getMessage())
.filter(x -> x.get("string_value") == null)
.findFirst();
// The String value column is result of dot product thus must be present in the Array column
Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
String value = (String) x.get("string_value");
List<Object> arrayValues = (List<Object>) x.get("array_values");
if (arrayValues == null) {
return true;
}
Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
return !notThere.isPresent();
}).findFirst();

Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent());
Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
}


Expand Down Expand Up @@ -475,7 +491,7 @@ public void testEndToEndComplexRecord() throws SamzaSqlValidatorException {
}

@Test
public void testEndToEndWithFloatToStringConversion() throws Exception {
public void testEndToEndWithFloatToStringConversion() {
int numMessages = 20;

TestAvroSystemFactory.messages.clear();
Expand Down Expand Up @@ -609,16 +625,34 @@ public void testEndToEndFlattenWithUdf() throws Exception {

List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);

int expectedMessages = 0;
// Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
int expectedMessages = (numMessages * (numMessages - 1)) / 2;
// Flatten de-normalizes the data. So there is separate record for each entry in the array.
for (int index = 1; index < numMessages; index++) {
expectedMessages = expectedMessages + Math.max(1, index);
}
Assert.assertEquals(expectedMessages, outMessages.size());

// check that values are actually not null and within the expected range
Optional<GenericRecord> nullValueRecord = outMessages.stream()
.map(x -> (GenericRecord) x.getMessage())
.filter(x -> x.get("id") == null)
.findFirst();
Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent());
//TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
/* // The String value column is result of dot product thus must be present in the Array column
Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
String value = (String) x.get("string_value");
List<Object> arrayValues = (List<Object>) x.get("array_values");
if (arrayValues == null) {
return true;
}
Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
return !notThere.isPresent();
}).findFirst();
Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
*/
}

@Test
public void testEndToEndSubQuery() throws Exception {
public void testEndToEndSubQuery() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
Expand All @@ -635,12 +669,32 @@ public void testEndToEndSubQuery() throws Exception {

List<OutgoingMessageEnvelope> outMessages = new ArrayList<>(TestAvroSystemFactory.messages);

int expectedMessages = 0;
// Test invariant for each input Row with rank i will contain a column array_values with i elements $\sum_1^n{i}$.
int expectedMessages = (numMessages * (numMessages - 1)) / 2;
// Flatten de-normalizes the data. So there is separate record for each entry in the array.
for (int index = 1; index < numMessages; index++) {
expectedMessages = expectedMessages + Math.max(1, index);
}
Assert.assertEquals(expectedMessages, outMessages.size());

// check that values are actually not null and within the expected range
Optional<GenericRecord> nullValueRecord = outMessages.stream()
.map(x -> (GenericRecord) x.getMessage())
.filter(x -> x.get("id") == null)
.findFirst();
Assert.assertFalse("Null value " + nullValueRecord.orElse(null), nullValueRecord.isPresent());

//TODO this is failing for now and that is because of udf weak type system, fixing it will be beyond this work.
/* // The String value column is result of dot product thus must be present in the Array column
Optional<GenericRecord> missingValue = outMessages.stream().map(x -> (GenericRecord) x.getMessage()).filter(x -> {
String value = (String) x.get("string_value");
List<Object> arrayValues = (List<Object>) x.get("array_values");
if (arrayValues == null) {
return true;
}
Optional<Object> notThere = arrayValues.stream().filter(v -> v.toString().equalsIgnoreCase(value)).findAny();
return !notThere.isPresent();
}).findFirst();
Assert.assertFalse("Absent Value " + missingValue.orElse(null), missingValue.isPresent());
*/

}

@Test
Expand Down