Skip to content

Commit

Permalink
Make RecordExtractor preserve empty array/map and map entries with em…
Browse files Browse the repository at this point in the history
…pty value (#14547)
  • Loading branch information
Jackie-Jiang authored Dec 3, 2024
1 parent 7977c28 commit 71dfa7d
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
*/
package org.apache.pinot.plugin.inputformat.avro;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -48,13 +46,12 @@ public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig r
AvroRecordExtractorConfig config = (AvroRecordExtractorConfig) recordExtractorConfig;
if (config != null) {
_applyLogicalTypes = config.isEnableLogicalTypes();
_differentiateNullAndEmptyForMV = config.isDifferentiateNullAndEmptyForMV();
}
if (fields == null || fields.isEmpty()) {
_extractAll = true;
_fields = Collections.emptySet();
_fields = Set.of();
} else {
_fields = ImmutableSet.copyOf(fields);
_fields = Set.copyOf(fields);
}
}

Expand Down Expand Up @@ -108,22 +105,15 @@ protected boolean isRecord(Object value) {
* without checking
*/
@Override
@Nullable
protected Object convertRecord(Object value) {
protected Map<Object, Object> convertRecord(Object value) {
GenericRecord record = (GenericRecord) value;
List<Schema.Field> fields = record.getSchema().getFields();
if (fields.isEmpty()) {
return null;
}

Map<Object, Object> convertedMap = new HashMap<>();
Map<Object, Object> convertedMap = Maps.newHashMapWithExpectedSize(fields.size());
for (Schema.Field field : fields) {
String fieldName = field.name();
Object fieldValue = record.get(fieldName);
if (fieldValue != null) {
fieldValue = transformValue(fieldValue, field);
}
convertedMap.put(fieldName, fieldValue);
Object convertedValue = fieldValue != null ? transformValue(fieldValue, field) : null;
convertedMap.put(fieldName, convertedValue);
}
return convertedMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@
*/
public class AvroRecordExtractorConfig implements RecordExtractorConfig {
private boolean _enableLogicalTypes = false;
private boolean _differentiateNullAndEmptyForMV = false;

@Override
public void init(Map<String, String> props) {
_enableLogicalTypes = Boolean.parseBoolean(props.get("enableLogicalTypes"));
_differentiateNullAndEmptyForMV = Boolean.parseBoolean(props.get("differentiateNullAndEmptyForMV"));
}

public boolean isEnableLogicalTypes() {
Expand All @@ -42,12 +40,4 @@ public boolean isEnableLogicalTypes() {
public void setEnableLogicalTypes(boolean enableLogicalTypes) {
_enableLogicalTypes = enableLogicalTypes;
}

public boolean isDifferentiateNullAndEmptyForMV() {
return _differentiateNullAndEmptyForMV;
}

public void setDifferentiateNullAndEmptyForMV(boolean differentiateNullAndEmptyForMV) {
_differentiateNullAndEmptyForMV = differentiateNullAndEmptyForMV;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,100 +18,139 @@
*/
package org.apache.pinot.plugin.inputformat.avro;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertEqualsDeep;
import static org.testng.Assert.assertTrue;


public class AvroRecordToPinotRowGeneratorTest {

@Test
public void testIncomingTimeColumn()
throws Exception {
List<Schema.Field> avroFields =
Collections.singletonList(new Schema.Field("incomingTime", Schema.create(Schema.Type.LONG), null, null));
Schema avroSchema = Schema.createRecord(avroFields);
GenericData.Record avroRecord = new GenericData.Record(avroSchema);
public void testIncomingTimeColumn() {
Schema schema =
SchemaBuilder.record("test").fields().name("incomingTime").type().longType().noDefault().endRecord();
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("incomingTime", 12345L);

Set<String> sourceFields = Sets.newHashSet("incomingTime", "outgoingTime");
Set<String> sourceFields = Set.of("incomingTime", "outgoingTime");

AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(sourceFields, null);
GenericRow genericRow = new GenericRow();
avroRecordExtractor.extract(avroRecord, genericRow);

Assert.assertTrue(
genericRow.getFieldToValueMap().keySet().containsAll(Arrays.asList("incomingTime", "outgoingTime")));
Assert.assertEquals(genericRow.getValue("incomingTime"), 12345L);
assertTrue(genericRow.getFieldToValueMap().keySet().containsAll(Arrays.asList("incomingTime", "outgoingTime")));
assertEquals(genericRow.getValue("incomingTime"), 12345L);
}

@Test
public void testNoDifferentiateNullAndEmptyForMultiValueFields() {
AvroRecordExtractorConfig config = new AvroRecordExtractorConfig();
public void testMultiValueField() {
Schema schema = SchemaBuilder.record("test").fields().name("intMV").type().array().items().intType().noDefault()
.name("stringMV").type().array().items().stringType().noDefault().endRecord();
GenericRecord genericRecord = new GenericData.Record(schema);

AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(null, config);
avroRecordExtractor.init(null, null);
GenericRow genericRow = new GenericRow();

Schema schema =
SchemaBuilder.record("GenericRow").fields().name("arrField1").type().array().items().stringType().noDefault()
.name("arrField2").type().array().items().stringType().noDefault().name("arrField3").type().array().items()
.stringType().noDefault().endRecord();
// List
genericRecord.put("intMV", List.of(1, 2, 3));
genericRecord.put("stringMV", List.of("value1", "value2", "value3"));
avroRecordExtractor.extract(genericRecord, genericRow);
assertEqualsDeep(genericRow.getFieldToValueMap(),
Map.of("intMV", new Object[]{1, 2, 3}, "stringMV", new Object[]{"value1", "value2", "value3"}));

GenericRecord genericRecord = new GenericData.Record(schema);
List<String> arrayData1 = Arrays.asList("value1", "value2", "value3");
List<String> arrayData2 = null;
List<String> arrayData3 = new ArrayList<>();
// Object[]
genericRow.clear();
genericRecord.put("intMV", new Object[]{1, 2, 3});
genericRecord.put("stringMV", new Object[]{"value1", "value2", "value3"});
avroRecordExtractor.extract(genericRecord, genericRow);
assertEqualsDeep(genericRow.getFieldToValueMap(),
Map.of("intMV", new Object[]{1, 2, 3}, "stringMV", new Object[]{"value1", "value2", "value3"}));

GenericRow genericRow = new GenericRow();
genericRecord.put("arrField1", arrayData1);
genericRecord.put("arrField2", arrayData2);
genericRecord.put("arrField3", arrayData3);
// Integer[] and String[]
genericRow.clear();
genericRecord.put("intMV", new Integer[]{1, 2, 3});
genericRecord.put("stringMV", new String[]{"value1", "value2", "value3"});
avroRecordExtractor.extract(genericRecord, genericRow);
assertEqualsDeep(genericRow.getFieldToValueMap(),
Map.of("intMV", new Object[]{1, 2, 3}, "stringMV", new Object[]{"value1", "value2", "value3"}));

// Primitive array
genericRow.clear();
genericRecord.put("intMV", new int[]{1, 2, 3});
genericRecord.put("stringMV", new String[]{"value1", "value2", "value3"});
avroRecordExtractor.extract(genericRecord, genericRow);
Assert.assertTrue(
genericRow.getFieldToValueMap().keySet().containsAll(Arrays.asList("arrField1", "arrField2", "arrField3")));
Assert.assertEquals(genericRow.getValue("arrField1"), arrayData1.toArray());
Assert.assertEquals(genericRow.getValue("arrField2"), null);
Assert.assertEquals(genericRow.getValue("arrField3"), null);
}
assertEqualsDeep(genericRow.getFieldToValueMap(),
Map.of("intMV", new Object[]{1, 2, 3}, "stringMV", new Object[]{"value1", "value2", "value3"}));

@Test
public void testDifferentiateNullAndEmptyForMultiValueFields() {
AvroRecordExtractorConfig config = new AvroRecordExtractorConfig();
config.setDifferentiateNullAndEmptyForMV(true);
AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(null, config);
// Empty List
genericRow.clear();
genericRecord.put("intMV", List.of());
genericRecord.put("stringMV", List.of());
avroRecordExtractor.extract(genericRecord, genericRow);
assertEqualsDeep(genericRow.getFieldToValueMap(), Map.of("intMV", new Object[0], "stringMV", new Object[0]));

Schema schema =
SchemaBuilder.record("GenericRow").fields().name("arrField1").type().array().items().stringType().noDefault()
.name("arrField2").type().array().items().stringType().noDefault().name("arrField3").type().array().items()
.stringType().noDefault().endRecord();
// Empty array
genericRow.clear();
genericRecord.put("intMV", new int[0]);
genericRecord.put("stringMV", new String[0]);
avroRecordExtractor.extract(genericRecord, genericRow);
assertEqualsDeep(genericRow.getFieldToValueMap(), Map.of("intMV", new Object[0], "stringMV", new Object[0]));

// null
genericRow.clear();
genericRecord.put("intMV", null);
genericRecord.put("stringMV", null);
avroRecordExtractor.extract(genericRecord, genericRow);
Map<String, Object> expected = new HashMap<>();
expected.put("intMV", null);
expected.put("stringMV", null);
assertEqualsDeep(genericRow.getFieldToValueMap(), expected);
}

@Test
public void testMapField() {
Schema schema = SchemaBuilder.record("test").fields().name("intMap").type().map().values().intType().noDefault()
.name("stringMap").type().map().values().stringType().noDefault().endRecord();
GenericRecord genericRecord = new GenericData.Record(schema);
List<String> arrayData1 = Arrays.asList("value1", "value2", "value3");
List<String> arrayData2 = null;
List<String> arrayData3 = new ArrayList<>();

AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(null, null);
GenericRow genericRow = new GenericRow();
genericRecord.put("arrField1", arrayData1);
genericRecord.put("arrField2", arrayData2);
genericRecord.put("arrField3", arrayData3);

Map<String, Integer> intMap = Map.of("v1", 1, "v2", 2, "v3", 3);
genericRecord.put("intMap", intMap);
Map<String, String> stringMap = Map.of("v1", "value1", "v2", "value2", "v3", "value3");
genericRecord.put("stringMap", stringMap);
avroRecordExtractor.extract(genericRecord, genericRow);
assertEqualsDeep(genericRow.getFieldToValueMap(), Map.of("intMap", intMap, "stringMap", stringMap));

// Map with null
genericRow.clear();
intMap = new HashMap<>();
intMap.put("v1", 1);
intMap.put("v2", null);
intMap.put("v3", null);
genericRecord.put("intMap", intMap);
stringMap = new HashMap<>();
stringMap.put("v1", null);
stringMap.put("v2", null);
stringMap.put("v3", "value3");
genericRecord.put("stringMap", stringMap);
avroRecordExtractor.extract(genericRecord, genericRow);
Assert.assertTrue(
genericRow.getFieldToValueMap().keySet().containsAll(Arrays.asList("arrField1", "arrField2", "arrField3")));
Assert.assertEquals(genericRow.getValue("arrField1"), arrayData1.toArray());
Assert.assertEquals(genericRow.getValue("arrField2"), null);
Assert.assertEquals(genericRow.getValue("arrField3"), new String[0]);
assertEqualsDeep(genericRow.getFieldToValueMap(), Map.of("intMap", intMap, "stringMap", stringMap));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.yscope.clp.compressorfrontend.EncodedMessage;
import com.yscope.clp.compressorfrontend.MessageEncoder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -103,7 +102,7 @@ public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtra
_config = (CLPLogRecordExtractorConfig) recordExtractorConfig;
if (fields == null || fields.isEmpty()) {
_extractAll = true;
_fields = Collections.emptySet();
_fields = Set.of();
} else {
_fields = new HashSet<>(fields);
// Remove the fields to be CLP-encoded to make it easier to work with them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.plugin.inputformat.csv;

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.csv.CSVRecord;
Expand All @@ -42,7 +41,7 @@ public void init(Set<String> fields, RecordExtractorConfig recordExtractorConfig
if (fields == null || fields.isEmpty()) {
_fields = csvRecordExtractorConfig.getColumnNames();
} else {
_fields = ImmutableSet.copyOf(fields);
_fields = Set.copyOf(fields);
}
_multiValueDelimiter = csvRecordExtractorConfig.getMultiValueDelimiter();
}
Expand All @@ -56,11 +55,8 @@ public GenericRow extract(CSVRecord from, GenericRow to) {
return to;
}

@Override
@Nullable
public Object convert(@Nullable Object value) {
String stringValue = (String) value;
if (stringValue == null || StringUtils.isEmpty(stringValue)) {
private Object convert(@Nullable String value) {
if (value == null || StringUtils.isEmpty(value)) {
return null;
// NOTE about CSV behavior for empty string e.g. foo,bar,,zoo or foo,bar,"",zoo. These both are equivalent to a
// CSVParser
Expand All @@ -70,11 +66,11 @@ public Object convert(@Nullable Object value) {

// If the delimiter is not set, then return the value as is
if (_multiValueDelimiter == null) {
return stringValue;
return value;
}

final String[] stringValues = StringUtils.split(stringValue, _multiValueDelimiter);
final int numValues = stringValues.length;
String[] stringValues = StringUtils.split(value, _multiValueDelimiter);
int numValues = stringValues.length;

// NOTE about CSV behavior for multi value column - cannot distinguish between multi value column with just 1
// entry vs single value
Expand All @@ -88,7 +84,7 @@ public Object convert(@Nullable Object value) {
}
return array;
} else {
return stringValue;
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pinot.plugin.inputformat.json;

import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
Expand All @@ -40,9 +38,9 @@ public class JSONRecordExtractor extends BaseRecordExtractor<Map<String, Object>
public void init(Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
if (fields == null || fields.isEmpty()) {
_extractAll = true;
_fields = Collections.emptySet();
_fields = Set.of();
} else {
_fields = ImmutableSet.copyOf(fields);
_fields = Set.copyOf(fields);
}
}

Expand Down
Loading

0 comments on commit 71dfa7d

Please sign in to comment.