diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index fd02c82788c21..5c8dfc149a1f1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -20,17 +20,18 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; @@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple */ private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class); - /** The default charset to convert strings to bytes */ - private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); + // The charset used to convert strings to bytes + private String charsetName = "UTF-8"; + + // Charset is not serializable + private transient Charset charset; /** * The default read buffer size = 1MB. @@ -157,9 +161,12 @@ protected static void loadConfigParameters(Configuration parameters) { // -------------------------------------------------------------------------------------------- // The configuration parameters. Configured on the instance and serialized to be shipped. // -------------------------------------------------------------------------------------------- - + + // The delimiter may be set with a byte-sequence or a String. In the latter + // case the byte representation is updated consistent with current charset. private byte[] delimiter = new byte[] {'\n'}; - + private String delimiterString = null; + private int lineLengthLimit = Integer.MAX_VALUE; private int bufferSize = -1; @@ -182,8 +189,42 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration) { } loadConfigParameters(configuration); } - - + + /** + * Get the character set used for the row delimiter. This is also used by + * subclasses to interpret field delimiters, comment strings, and for + * configuring {@link FieldParser}s. + * + * @return the charset + */ + @PublicEvolving + public Charset getCharset() { + if (this.charset == null) { + this.charset = Charset.forName(charsetName); + } + return this.charset; + } + + /** + * Set the name of the character set used for the row delimiter. This is + * also used by subclasses to interpret field delimiters, comment strings, + * and for configuring {@link FieldParser}s. + * + * These fields are interpreted when set. Changing the charset thereafter + * may cause unexpected results. + * + * @param charset name of the charset + */ + @PublicEvolving + public void setCharset(String charset) { + this.charsetName = Preconditions.checkNotNull(charset); + this.charset = null; + + if (this.delimiterString != null) { + this.delimiter = delimiterString.getBytes(getCharset()); + } + } + public byte[] getDelimiter() { return delimiter; } @@ -193,6 +234,7 @@ public void setDelimiter(byte[] delimiter) { throw new IllegalArgumentException("Delimiter must not be null"); } this.delimiter = delimiter; + this.delimiterString = null; } public void setDelimiter(char delimiter) { @@ -203,7 +245,8 @@ public void setDelimiter(String delimiter) { if (delimiter == null) { throw new IllegalArgumentException("Delimiter must not be null"); } - this.delimiter = delimiter.getBytes(UTF_8_CHARSET); + this.delimiter = delimiter.getBytes(getCharset()); + this.delimiterString = delimiter; } public int getLineLengthLimit() { @@ -264,7 +307,7 @@ public void setNumLineSamples(int numLineSamples) { // -------------------------------------------------------------------------------------------- /** - * Configures this input format by reading the path to the file from the configuration andge the string that + * Configures this input format by reading the path to the file from the configuration and the string that * defines the record delimiter. * * @param parameters The configuration object to read the parameters from. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 0ced22b64e62c..20c643ea17ead 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -25,12 +25,10 @@ import org.apache.flink.types.parser.StringParser; import org.apache.flink.types.parser.StringValueParser; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Map; import java.util.TreeMap; @@ -46,9 +44,6 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class); - /** The charset used to convert strings to bytes */ - private Charset charset = Charset.forName("UTF-8"); - private static final Class[] EMPTY_TYPES = new Class[0]; private static final boolean[] EMPTY_INCLUDED = new boolean[0]; @@ -79,9 +74,12 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat private Class[] fieldTypes = EMPTY_TYPES; protected boolean[] fieldIncluded = EMPTY_INCLUDED; - + + // The byte representation of the delimiter is updated consistent with + // current charset. private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER; - + private String fieldDelimString = null; + private boolean lenient; private boolean skipFirstLineAsHeader; @@ -90,7 +88,10 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat private byte quoteCharacter; + // The byte representation of the comment prefix is updated consistent with + // current charset. protected byte[] commentPrefix = null; + private String commentPrefixString = null; // -------------------------------------------------------------------------------------------- @@ -105,11 +106,6 @@ protected GenericCsvInputFormat(Path filePath) { super(filePath, null); } - protected GenericCsvInputFormat(Path filePath, Charset charset) { - this(filePath); - this.charset = Preconditions.checkNotNull(charset); - } - // -------------------------------------------------------------------------------------------- public int getNumberOfFieldsTotal() { @@ -120,43 +116,43 @@ public int getNumberOfNonNullFields() { return this.fieldTypes.length; } + @Override + public void setCharset(String charset) { + super.setCharset(charset); + + if (this.fieldDelimString != null) { + this.fieldDelim = fieldDelimString.getBytes(getCharset()); + } + + if (this.commentPrefixString != null) { + this.commentPrefix = commentPrefixString.getBytes(getCharset()); + } + } + public byte[] getCommentPrefix() { return commentPrefix; } public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, charset); - } - - private void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException("Charset must not be null"); - } if (commentPrefix != null) { - this.commentPrefix = commentPrefix.getBytes(charset); + this.commentPrefix = commentPrefix.getBytes(getCharset()); } else { this.commentPrefix = null; } + this.commentPrefixString = commentPrefix; } public byte[] getFieldDelimiter() { return fieldDelim; } - public void setFieldDelimiter(byte[] delimiter) { + public void setFieldDelimiter(String delimiter) { if (delimiter == null) { throw new IllegalArgumentException("Delimiter must not be null"); } - this.fieldDelim = delimiter; - } - - public void setFieldDelimiter(char delimiter) { - setFieldDelimiter(String.valueOf(delimiter)); - } - - public void setFieldDelimiter(String delimiter) { - this.fieldDelim = delimiter.getBytes(charset); + this.fieldDelim = delimiter.getBytes(getCharset()); + this.fieldDelimString = delimiter; } public boolean isLenient() { @@ -296,25 +292,6 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } - /** - * Gets the character set for the parser. Default is set to UTF-8. - * - * @return The charset for the parser. - */ - Charset getCharset() { - return this.charset; - } - - /** - * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset - * when doing a parse. - * - * @param charset The character set to set. - */ - public void setCharset(Charset charset) { - this.charset = Preconditions.checkNotNull(charset); - } - // -------------------------------------------------------------------------------------------- // Runtime methods // -------------------------------------------------------------------------------------------- @@ -322,7 +299,7 @@ public void setCharset(Charset charset) { @Override public void open(FileInputSplit split) throws IOException { super.open(split); - + // instantiate the parsers FieldParser[] parsers = new FieldParser[fieldTypes.length]; @@ -335,7 +312,7 @@ public void open(FileInputSplit split) throws IOException { FieldParser p = InstantiationUtil.instantiate(parserType, FieldParser.class); - p.setCharset(this.getCharset()); + p.setCharset(getCharset()); if (this.quotedStringParsing) { if (p instanceof StringParser) { ((StringParser)p).enableQuotedStringParsing(this.quoteCharacter); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index d9eeecc074b64..cf3c83dcb5612 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -32,6 +32,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -79,7 +80,7 @@ public static enum ParseErrorState { BOOLEAN_INVALID } - private Charset charset = Charset.forName("UTF-8"); + private Charset charset = StandardCharsets.UTF_8; private ParseErrorState errorState = ParseErrorState.NONE; @@ -105,9 +106,7 @@ public int resetErrorStateAndParse(byte[] bytes, int startPos, int limit, byte[] /** * Each parser's logic should be implemented inside this method - * - * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)} - * */ + */ protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); /** @@ -221,20 +220,19 @@ protected static final int nextStringLength(byte[] bytes, int startPos, int leng return limitedLength; } - /* - * Gets the Charset for the parser.Default is set to ASCII + /** + * Gets the character set used for this parser. * - * @return The charset for the parser. + * @return the charset used for this parser. */ public Charset getCharset() { return this.charset; } /** - * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset - * when doing a parse. + * Sets the character set used for this parser. * - * @param charset The charset to set. + * @param charset charset used for this parser. */ public void setCharset(Charset charset) { this.charset = charset; diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 93d5f9fca01a3..219365a350872 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -18,12 +18,13 @@ package org.apache.flink.api.common.io; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -33,17 +34,17 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStreamWriter; +import java.io.Writer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class DelimitedInputFormatTest { @@ -200,6 +201,45 @@ public void testReadCustomDelimiter() { } } + @Test + public void testReadCustomDelimiterWithCharset() throws IOException { + // Unicode row fragments + String[] records = new String[]{"\u020e\u021f\u05c0\u020b\u020f", "Apache", "\nFlink", "\u0000", "\u05c0"}; + + // Unicode delimiter + String delimiter = "\u05c0\u05c0"; + + String fileContent = StringUtils.join(records, delimiter); + + for (final String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) { + // use charset when instantiating the record String + DelimitedInputFormat format = new DelimitedInputFormat() { + @Override + public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) throws IOException { + return new String(bytes, offset, numBytes, charset); + } + }; + format.setFilePath("file:///some/file/that/will/not/be/read"); + + final FileInputSplit split = createTempFile(fileContent, charset); + + format.setDelimiter(delimiter); + // use the same encoding to parse the file as used to read the file; + // the delimiter is reinterpreted when the charset is set + format.setCharset(charset); + format.configure(new Configuration()); + format.open(split); + + for (String record : records) { + String value = format.nextRecord(null); + assertEquals(record, value); + } + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + } + } + /** * Tests that the records are read correctly when the split boundary is in the middle of a record. */ @@ -363,19 +403,29 @@ public void testReadRecordsLargerThanBuffer() { fail(e.getMessage()); } } - - private static FileInputSplit createTempFile(String contents) throws IOException { + + static FileInputSplit createTempFile(String contents) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); - - OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); - wrt.write(contents); - wrt.close(); - + + try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile))) { + out.write(contents); + } + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); } - - + + static FileInputSplit createTempFile(String contents, String charset) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + + try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) { + out.write(contents); + } + + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); + } + protected static final class MyTextInputFormat extends DelimitedInputFormat { private static final long serialVersionUID = 1L; diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index d063ddc8315eb..c11a573cfc4eb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -18,21 +18,7 @@ package org.apache.flink.api.common.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.zip.DeflaterOutputStream; -import java.util.zip.GZIPOutputStream; - +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; @@ -41,15 +27,29 @@ import org.apache.flink.types.LongValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.Arrays; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.GZIPOutputStream; + +import static org.apache.flink.api.common.io.DelimitedInputFormatTest.createTempFile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class GenericCsvInputFormatTest { - private File tempFile; - private TestCsvInputFormat format; // -------------------------------------------------------------------------------------------- @@ -65,9 +65,6 @@ public void setdown() throws Exception { if (this.format != null) { this.format.close(); } - if (this.tempFile != null) { - this.tempFile.delete(); - } } @Test @@ -87,7 +84,7 @@ public void testSparseFieldArray() { public void testReadNoPosAll() throws IOException { try { final String fileContent = "111|222|333|444|555\n666|777|888|999|000|"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); @@ -485,8 +482,7 @@ public void testReadInvalidContents() throws IOException { format.nextRecord(values); fail("Input format accepted on invalid input."); } - catch (ParseException e) { - // all good + catch (ParseException ignored) { } } catch (Exception ex) { @@ -551,40 +547,64 @@ public void testReadInvalidContentsLenientWithSkipping() { @Test public void testReadWithCharset() throws IOException { - try { - final String fileContent = "\u00bf|Flink|\u00f1"; - final FileInputSplit split = createTempFile(fileContent); + // Unicode row fragments + String[] records = new String[]{"\u020e\u021f", "Flink", "\u020b\u020f"}; - final Configuration parameters = new Configuration(); + // Unicode delimiter + String delimiter = "\u05c0\u05c0"; - format.setCharset(Charset.forName("UTF-8")); - format.setFieldDelimiter("|"); - format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class); + String fileContent = StringUtils.join(records, delimiter); - format.configure(parameters); - format.open(split); + // StringValueParser does not use charset so rely on StringParser + GenericCsvInputFormat format = new GenericCsvInputFormat() { + @Override + public String[] readRecord(String[] target, byte[] bytes, int offset, int numBytes) throws IOException { + return parseRecord(target, bytes, offset, numBytes) ? target : null; + } + }; + format.setFilePath("file:///some/file/that/will/not/be/read"); - Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()}; + for (String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + // write string with proper encoding + try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) { + out.write(fileContent); + } + + FileInputSplit split = new FileInputSplit(0, new Path(tempFile.toURI().toString()), + 0, tempFile.length(), new String[]{ "localhost" }); + + format.setFieldDelimiter(delimiter); + format.setFieldTypesGeneric(String.class, String.class, String.class); + // use the same encoding to parse the file as used to read the file; + // the field delimiter is reinterpreted when the charset is set + format.setCharset(charset); + format.configure(new Configuration()); + format.open(split); + + String[] values = new String[]{ "", "", "" }; values = format.nextRecord(values); + + // validate results assertNotNull(values); - assertEquals("\u00bf", ((StringValue) values[0]).getValue()); - assertEquals("Flink", ((StringValue) values[1]).getValue()); - assertEquals("\u00f1", ((StringValue) values[2]).getValue()); + for (int i = 0 ; i < records.length ; i++) { + assertEquals(records[i], values[i]); + } assertNull(format.nextRecord(values)); assertTrue(format.reachedEnd()); } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } + + format.close(); } @Test public void readWithEmptyField() { try { final String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); @@ -721,37 +741,26 @@ public void readWithHeaderLineAndInvalidIntermediate() { } } - private FileInputSplit createTempFile(String content) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp"); - this.tempFile.deleteOnExit(); - - DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile)); - dos.writeBytes(content); - dos.close(); - - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); - } - private FileInputSplit createTempDeflateFile(String content) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp.deflate"); - this.tempFile.deleteOnExit(); + File tempFile = File.createTempFile("test_contents", "tmp.deflate"); + tempFile.deleteOnExit(); DataOutputStream dos = new DataOutputStream(new DeflaterOutputStream(new FileOutputStream(tempFile))); dos.writeBytes(content); dos.close(); - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); } private FileInputSplit createTempGzipFile(String content) throws IOException { - this.tempFile = File.createTempFile("test_contents", "tmp.gz"); - this.tempFile.deleteOnExit(); + File tempFile = File.createTempFile("test_contents", "tmp.gz"); + tempFile.deleteOnExit(); DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(tempFile))); dos.writeBytes(content); dos.close(); - return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); } private Value[] createIntValues(int num) { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 1c5579ef72572..718274ef5c3c3 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -19,13 +19,15 @@ package org.apache.flink.types.parser; -import static org.junit.Assert.assertTrue; - import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; import org.junit.Test; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class VarLengthStringParserTest { @@ -200,7 +202,7 @@ public void testParseInvalidQuotedStrings() { @Test public void testParseValidMixedStringsWithCharset() { - Charset charset = Charset.forName("US-ASCII"); + Charset charset = StandardCharsets.US_ASCII; this.parser = new StringValueParser(); this.parser.enableQuotedStringParsing((byte) '@'); @@ -211,7 +213,7 @@ public void testParseValidMixedStringsWithCharset() { int startPos = 0; parser.setCharset(charset); startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s); - assertTrue(startPos == 11); - assertTrue(s.getValue().equals("abcde|gh")); + assertEquals(11, startPos); + assertEquals("abcde|gh", s.getValue()); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index b13b8aab64706..cbac386b37606 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -18,24 +18,22 @@ package org.apache.flink.api.java.io; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; - import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; +//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator +import org.apache.flink.api.java.tuple.*; +//CHECKSTYLE.ON: AvoidStarImport import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Preconditions; -//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator -import org.apache.flink.api.java.tuple.*; -//CHECKSTYLE.ON: AvoidStarImport +import java.util.ArrayList; +import java.util.Arrays; /** * A builder class to instantiate a CSV parsing data source. The CSV reader configures the field types, @@ -66,7 +64,7 @@ public class CsvReader { protected boolean ignoreInvalidLines = false; - private Charset charset = Charset.forName("UTF-8"); + private String charset = "UTF-8"; // -------------------------------------------------------------------------------------------- @@ -162,11 +160,12 @@ public CsvReader ignoreComments(String commentPrefix) { } /** - * Gets the character set for the reader. Default is set to UTF-8. + * Gets the character set for the reader. Default is UTF-8. * * @return The charset for the reader. */ - public Charset getCharset() { + @PublicEvolving + public String getCharset() { return this.charset; } @@ -175,7 +174,8 @@ public Charset getCharset() { * * @param charset The character set to set. */ - public void setCharset(Charset charset) { + @PublicEvolving + public void setCharset(String charset) { this.charset = Preconditions.checkNotNull(charset); } @@ -356,12 +356,12 @@ public DataSource tupleType(Class targetType) { // -------------------------------------------------------------------------------------------- private void configureInputFormat(CsvInputFormat format) { + format.setCharset(this.charset); format.setDelimiter(this.lineDelimiter); format.setFieldDelimiter(this.fieldDelimiter); format.setCommentPrefix(this.commentPrefix); format.setSkipFirstLineAsHeader(skipFirstLineAsHeader); format.setLenient(ignoreInvalidLines); - format.setCharset(this.charset); if (this.parseQuotedStrings) { format.enableQuotedStringParsing(this.quoteCharacter); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java index e1c8023eda0fc..de57e5c547966 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java @@ -18,15 +18,9 @@ package org.apache.flink.api.java.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Arrays; - import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; @@ -47,7 +41,12 @@ import org.apache.flink.types.Value; import org.junit.Assert; import org.junit.Test; -import org.apache.flink.api.java.ExecutionEnvironment; + +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; /** * Tests for the CSV reader builder. @@ -80,11 +79,11 @@ public void testIgnoreComments() { @Test public void testCharset() { CsvReader reader = getCsvReader(); - assertEquals(reader.getCharset(), Charset.forName("UTF-8")); - reader.setCharset(Charset.forName("US-ASCII")); - assertEquals(reader.getCharset(), Charset.forName("US-ASCII")); + assertEquals("UTF-8", reader.getCharset()); + reader.setCharset("US-ASCII"); + assertEquals("US-ASCII", reader.getCharset()); } - + @Test public void testIncludeFieldsDense() { CsvReader reader = getCsvReader(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index 54f226c72ec94..cc0d5bc017bc2 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -771,7 +771,7 @@ public void testParserCorrectness() throws Exception { final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); format.setSkipFirstLineAsHeader(true); - format.setFieldDelimiter(','); + format.setFieldDelimiter(","); format.configure(new Configuration()); format.open(split); @@ -1077,7 +1077,7 @@ public void testQuotedStringParsingWithIncludeFields() throws Exception { CsvInputFormat> inputFormat = new TupleCsvInputFormat>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true}); inputFormat.enableQuotedStringParsing('"'); - inputFormat.setFieldDelimiter('|'); + inputFormat.setFieldDelimiter("|"); inputFormat.setDelimiter('\n'); inputFormat.configure(new Configuration()); @@ -1107,7 +1107,7 @@ public void testQuotedStringParsingWithEscapedQuotes() throws Exception { CsvInputFormat> inputFormat = new TupleCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); inputFormat.enableQuotedStringParsing('"'); - inputFormat.setFieldDelimiter('|'); + inputFormat.setFieldDelimiter("|"); inputFormat.setDelimiter('\n'); inputFormat.configure(new Configuration()); diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala index d176b796957e4..d72e7a8eba287 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala @@ -656,7 +656,7 @@ class RowCsvInputFormatTest { val format = new RowCsvInputFormat(PATH, typeInfo) format.setSkipFirstLineAsHeader(true) - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(split) @@ -745,7 +745,7 @@ class RowCsvInputFormatTest { rowTypeInfo = typeInfo, includedFieldsMask = Array(true, false, true)) inputFormat.enableQuotedStringParsing('"') - inputFormat.setFieldDelimiter('|') + inputFormat.setFieldDelimiter("|") inputFormat.setDelimiter('\n') inputFormat.configure(new Configuration) @@ -776,7 +776,7 @@ class RowCsvInputFormatTest { new Path(tempFile.toURI.toString), rowTypeInfo = typeInfo) inputFormat.enableQuotedStringParsing('"') - inputFormat.setFieldDelimiter('|') + inputFormat.setFieldDelimiter("|") inputFormat.setDelimiter('\n') inputFormat.configure(new Configuration) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index 24d86e7b2a13a..539a2578dba30 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -56,7 +56,7 @@ class CsvInputFormatTest { createTypeInformation[(String, Integer, Double)] .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]]) format.setDelimiter("\n") - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") format.setCommentPrefix("#") val parameters = new Configuration format.configure(parameters) @@ -98,7 +98,7 @@ class CsvInputFormatTest { createTypeInformation[(String, Integer, Double)] .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]]) format.setDelimiter("\n") - format.setFieldDelimiter('|') + format.setFieldDelimiter("|") format.setCommentPrefix("//") val parameters = new Configuration format.configure(parameters) @@ -443,7 +443,7 @@ class CsvInputFormatTest { val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -460,7 +460,7 @@ class CsvInputFormatTest { val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -477,7 +477,7 @@ class CsvInputFormatTest { PATH, typeInfo, Array("field2", "field1", "field3")) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -495,7 +495,7 @@ class CsvInputFormatTest { Array(true, true, false, true, false)) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile) @@ -511,7 +511,7 @@ class CsvInputFormatTest { val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo) format.setDelimiter('\n') - format.setFieldDelimiter(',') + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(tempFile)