Skip to content

Commit

Permalink
NIFI-8232 CSV Parsers optionally allow/reject duplicate header names
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <[email protected]>

This closes apache#4828.
  • Loading branch information
ChrisSamo632 authored and pvillard31 committed Feb 23, 2021
1 parent 418e2cc commit 3cb26ae
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class CSVUtils {

private static Logger LOG = LoggerFactory.getLogger(CSVUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(CSVUtils.class);

public static final AllowableValue CUSTOM = new AllowableValue("custom", "Custom Format",
"The format of the CSV is configured by using the properties of this Controller Service, such as Value Separator");
Expand Down Expand Up @@ -136,6 +136,20 @@ public class CSVUtils {
.defaultValue("UTF-8")
.required(true)
.build();
public static final PropertyDescriptor ALLOW_DUPLICATE_HEADER_NAMES = new PropertyDescriptor.Builder()
.name("csvutils-allow-duplicate-header-names")
.displayName("Allow Duplicate Header Names")
.description("Whether duplicate header names are allowed. Header names are case-sensitive, for example \"name\" and \"Name\" are treated as separate fields. " +
"Handling of duplicate header names is CSV Parser specific (where applicable):\n" +
"* Apache Commons CSV - duplicate headers will result in column data \"shifting\" right with new fields " +
"created for \"unknown_field_index_X\" where \"X\" is the CSV column index number\n" +
"* Jackson CSV - duplicate headers will be de-duplicated with the field value being that of the right-most " +
"duplicate CSV column")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(false)
.build();

// CSV Format fields for writers only
public static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character.");
Expand Down Expand Up @@ -177,6 +191,10 @@ public class CSVUtils {
.required(true)
.build();

private CSVUtils() {
// intentionally blank, prevents instantiation
}

public static boolean isDynamicCSVFormat(final PropertyContext context) {
final String formatName = context.getProperty(CSV_FORMAT).getValue();
return formatName.equalsIgnoreCase(CUSTOM.getValue())
Expand Down Expand Up @@ -208,8 +226,8 @@ public static CSVFormat createCSVFormat(final PropertyContext context, final Map
}
}

private static Character getCharUnescapedJava(final PropertyContext context, final PropertyDescriptor property, final Map<String, String> variables) {
String value = context.getProperty(property).evaluateAttributeExpressions(variables).getValue();
private static Character getValueSeparatorCharUnescapedJava(final PropertyContext context, final Map<String, String> variables) {
String value = context.getProperty(VALUE_SEPARATOR).evaluateAttributeExpressions(variables).getValue();

if (value != null) {
String unescaped = unescapeJava(value);
Expand All @@ -218,13 +236,9 @@ private static Character getCharUnescapedJava(final PropertyContext context, fin
}
}

LOG.warn("'{}' property evaluated to an invalid value: \"{}\". It must be a single character. The property value will be ignored.", property.getName(), value);
LOG.warn("'{}' property evaluated to an invalid value: \"{}\". It must be a single character. The property value will be ignored.", VALUE_SEPARATOR.getName(), value);

if (property.getDefaultValue() != null) {
return property.getDefaultValue().charAt(0);
} else {
return null;
}
return VALUE_SEPARATOR.getDefaultValue().charAt(0);
}

private static Character getCharUnescaped(final PropertyContext context, final PropertyDescriptor property, final Map<String, String> variables) {
Expand All @@ -247,7 +261,7 @@ private static Character getCharUnescaped(final PropertyContext context, final P
}

private static CSVFormat buildCustomFormat(final PropertyContext context, final Map<String, String> variables) {
final Character valueSeparator = getCharUnescapedJava(context, VALUE_SEPARATOR, variables);
final Character valueSeparator = getValueSeparatorCharUnescapedJava(context, variables);
CSVFormat format = CSVFormat.newFormat(valueSeparator)
.withAllowMissingColumnNames()
.withIgnoreEmptyLines();
Expand Down Expand Up @@ -293,6 +307,11 @@ private static CSVFormat buildCustomFormat(final PropertyContext context, final
format = format.withRecordSeparator(separator);
}

final PropertyValue allowDuplicateHeaderNames = context.getProperty(ALLOW_DUPLICATE_HEADER_NAMES);
if (allowDuplicateHeaderNames != null && allowDuplicateHeaderNames.isSet()) {
format = format.withAllowDuplicateHeaderNames(allowDuplicateHeaderNames.asBoolean());
}

return format;
}

Expand All @@ -306,7 +325,7 @@ public static String unescapeJava(String input) {

public static String unescape(final String input) {
if (input == null) {
return input;
return null;
}

return input.replace("\\t", "\t")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class CSVUtilsTest {

@Test
public void testIsDynamicCSVFormatWithStaticProperties() {
PropertyContext context = createContext("|", "'", "^", "~");
PropertyContext context = createContext("|", "'", "^", "~", "true");

boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);

Expand All @@ -45,7 +45,7 @@ public void testIsDynamicCSVFormatWithStaticProperties() {

@Test
public void testIsDynamicCSVFormatWithDynamicValueSeparator() {
PropertyContext context = createContext("${csv.delimiter}", "'", "^", "~");
PropertyContext context = createContext("${csv.delimiter}", "'", "^", "~", "true");

boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);

Expand All @@ -54,7 +54,7 @@ public void testIsDynamicCSVFormatWithDynamicValueSeparator() {

@Test
public void testIsDynamicCSVFormatWithDynamicQuoteCharacter() {
PropertyContext context = createContext("|", "${csv.quote}", "^", "~");
PropertyContext context = createContext("|", "${csv.quote}", "^", "~", "true");

boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);

Expand All @@ -63,7 +63,7 @@ public void testIsDynamicCSVFormatWithDynamicQuoteCharacter() {

@Test
public void testIsDynamicCSVFormatWithDynamicEscapeCharacter() {
PropertyContext context = createContext("|", "'", "${csv.escape}", "~");
PropertyContext context = createContext("|", "'", "${csv.escape}", "~", "true");

boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);

Expand All @@ -72,7 +72,7 @@ public void testIsDynamicCSVFormatWithDynamicEscapeCharacter() {

@Test
public void testIsDynamicCSVFormatWithDynamicCommentMarker() {
PropertyContext context = createContext("|", "'", "^", "${csv.comment}");
PropertyContext context = createContext("|", "'", "^", "${csv.comment}", "true");

boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);

Expand All @@ -81,19 +81,20 @@ public void testIsDynamicCSVFormatWithDynamicCommentMarker() {

@Test
public void testCustomFormat() {
PropertyContext context = createContext("|", "'", "^", "~");
PropertyContext context = createContext("|", "'", "^", "~", "true");

CSVFormat csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());

assertEquals('|', csvFormat.getDelimiter());
assertEquals('\'', (char) csvFormat.getQuoteCharacter());
assertEquals('^', (char) csvFormat.getEscapeCharacter());
assertEquals('~', (char) csvFormat.getCommentMarker());
assertTrue(csvFormat.getAllowDuplicateHeaderNames());
}

@Test
public void testCustomFormatWithEL() {
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}", "false");

Map<String, String> attributes = new HashMap<>();
attributes.put("csv.delimiter", "|");
Expand All @@ -107,11 +108,12 @@ public void testCustomFormatWithEL() {
assertEquals('\'', (char) csvFormat.getQuoteCharacter());
assertEquals('^', (char) csvFormat.getEscapeCharacter());
assertEquals('~', (char) csvFormat.getCommentMarker());
assertFalse(csvFormat.getAllowDuplicateHeaderNames());
}

@Test
public void testCustomFormatWithELEmptyValues() {
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}", "true");

CSVFormat csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());

Expand All @@ -123,7 +125,7 @@ public void testCustomFormatWithELEmptyValues() {

@Test
public void testCustomFormatWithELInvalidValues() {
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}", "true");

Map<String, String> attributes = new HashMap<>();
attributes.put("csv.delimiter", "invalid");
Expand All @@ -139,13 +141,14 @@ public void testCustomFormatWithELInvalidValues() {
assertNull(csvFormat.getCommentMarker());
}

private PropertyContext createContext(String valueSeparator, String quoteChar, String escapeChar, String commentMarker) {
private PropertyContext createContext(String valueSeparator, String quoteChar, String escapeChar, String commentMarker, String allowDuplicateHeaderNames) {
Map<PropertyDescriptor, String> properties = new HashMap<>();

properties.put(CSVUtils.VALUE_SEPARATOR, valueSeparator);
properties.put(CSVUtils.QUOTE_CHAR, quoteChar);
properties.put(CSVUtils.ESCAPE_CHAR, escapeChar);
properties.put(CSVUtils.COMMENT_MARKER, commentMarker);
properties.put(CSVUtils.ALLOW_DUPLICATE_HEADER_NAMES, allowDuplicateHeaderNames);

return new MockConfigurationContext(properties, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
properties.add(CSVUtils.NULL_STRING);
properties.add(CSVUtils.TRIM_FIELDS);
properties.add(CSVUtils.CHARSET);
properties.add(CSVUtils.ALLOW_DUPLICATE_HEADER_NAMES);
return properties;
}

Expand Down Expand Up @@ -146,17 +147,17 @@ public RecordReader createRecordReader(final Map<String, String> variables, fina
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null);
in.reset();

CSVFormat csvFormat;
final CSVFormat format;
if (this.csvFormat != null) {
csvFormat = this.csvFormat;
format = this.csvFormat;
} else {
csvFormat = CSVUtils.createCSVFormat(context, variables);
format = CSVUtils.createCSVFormat(context, variables);
}

if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
return new CSVRecordReader(in, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
} else if(JACKSON_CSV.getValue().equals(csvParser)) {
return new JacksonCSVRecordReader(in, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
if (APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
return new CSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
} else if (JACKSON_CSV.getValue().equals(csvParser)) {
return new JacksonCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
} else {
throw new IOException("Parser not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.io.input.BOMInputStream;
Expand All @@ -49,6 +51,7 @@
public class JacksonCSVRecordReader extends AbstractCSVRecordReader {
private final MappingIterator<String[]> recordStream;
private List<String> rawFieldNames = null;
private boolean allowDuplicateHeaderNames;

private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);

Expand All @@ -75,6 +78,7 @@ public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, f
csvSchemaBuilder = csvSchemaBuilder.setSkipFirstDataRow(true);
}
}
allowDuplicateHeaderNames = csvFormat.getAllowDuplicateHeaderNames();

CsvSchema csvSchema = csvSchemaBuilder.build();

Expand Down Expand Up @@ -108,6 +112,17 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie
rawFieldNames = schema.getFieldNames();
} else {
rawFieldNames = Arrays.asList(csvRecord);
if (rawFieldNames.size() > schema.getFieldCount() && !allowDuplicateHeaderNames) {
final Set<String> deDupe = new HashSet<>(schema.getFieldCount());
for (final String name : rawFieldNames) {
if (!deDupe.add(name)) {
throw new IllegalArgumentException(String.format(
"The header contains a duplicate name: \"%s\" in %s. If this is valid then use CSVFormat.withAllowDuplicateHeaderNames().",
name, rawFieldNames
));
}
}
}

// Advance the stream to keep the record count correct
if (recordStream.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,37 @@ public void testSimple() throws SchemaNotFoundException, IOException {
.allMatch(field -> field.getDataType().equals(RecordFieldType.STRING.getDataType())));
}

@Test
public void testContainsDuplicateHeaderNames() throws SchemaNotFoundException, IOException {
final String headerLine = "a, a, b";
final byte[] headerBytes = headerLine.getBytes();

final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(CSVUtils.CSV_FORMAT, CSVUtils.CUSTOM.getValue());
properties.put(CSVUtils.COMMENT_MARKER, "#");
properties.put(CSVUtils.VALUE_SEPARATOR, ",");
properties.put(CSVUtils.TRIM_FIELDS, "true");
properties.put(CSVUtils.QUOTE_CHAR, "\"");
properties.put(CSVUtils.ESCAPE_CHAR, "\\");

final ConfigurationContext context = new MockConfigurationContext(properties, null);
final CSVHeaderSchemaStrategy strategy = new CSVHeaderSchemaStrategy(context);

final RecordSchema schema;
try (final InputStream bais = new ByteArrayInputStream(headerBytes)) {
schema = strategy.getSchema(null, bais, null);
}

final List<String> expectedFieldNames = Arrays.asList("a", "b");
assertEquals(expectedFieldNames, schema.getFieldNames());

assertTrue(schema.getFields().stream()
.allMatch(field -> field.getDataType().equals(RecordFieldType.STRING.getDataType())));
}

@Test
public void testWithEL() throws SchemaNotFoundException, IOException {
final String headerLine = "\'a\'; b; c; d; e^;z; f";
final String headerLine = "'a'; b; c; d; e^;z; f";
final byte[] headerBytes = headerLine.getBytes();

final Map<PropertyDescriptor, String> properties = new HashMap<>();
Expand Down
Loading

0 comments on commit 3cb26ae

Please sign in to comment.