From f253655c997a251087c8c281cec18c5ef2e84db5 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 18 May 2015 11:44:23 +0200 Subject: [PATCH] [FLINK-2025] add support for booleans in csv parser The following values are parsed as booleans: "true" or "1" -> true "false" or "0" -> false All checks are performed case-insensitive. This closes #685. --- docs/apis/programming_guide.md | 1 + .../flink/types/parser/BooleanParser.java | 96 +++++++++++++++++++ .../types/parser/BooleanValueParser.java | 47 +++++++++ .../flink/types/parser/FieldParser.java | 18 ++-- .../flink/types/parser/StringParser.java | 2 +- .../flink/types/parser/BooleanParserTest.java | 61 ++++++++++++ .../types/parser/BooleanValueParserTest.java | 64 +++++++++++++ 7 files changed, 282 insertions(+), 7 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java create mode 100644 flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java create mode 100644 flink-core/src/test/java/org/apache/flink/types/parser/BooleanParserTest.java create mode 100644 flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md index 14e51639f15f7..e9417dd9cefea 100644 --- a/docs/apis/programming_guide.md +++ b/docs/apis/programming_guide.md @@ -1656,6 +1656,7 @@ DataSet dbData = Flink offers a number of configuration options for CSV parsing: - `types(Class ... types)` specifies the types of the fields to parse. **It is mandatory to configure the types of the parsed fields.** + In case of the type class Boolean.class, "True" (case-insensitive), "False" (case-insensitive), "1" and "0" are treated as booleans. - `lineDelimiter(String del)` specifies the delimiter of individual records. The default line delimiter is the new-line character `'\n'`. diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java new file mode 100644 index 0000000000000..58bd1d2d8507d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.parser; + +public class BooleanParser extends FieldParser { + + private boolean result; + + /** Values for true and false respectively. Must be lower case. */ + private static final byte[][] TRUE = new byte[][] { + "true".getBytes(), + "1".getBytes() + }; + private static final byte[][] FALSE = new byte[][] { + "false".getBytes(), + "0".getBytes() + }; + + @Override + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, Boolean reuse) { + + final int delimLimit = limit - delim.length + 1; + + int i = startPos; + + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delim)) { + break; + } + i++; + } + + for (byte[] aTRUE : TRUE) { + if (byteArrayEquals(bytes, startPos, i - startPos, aTRUE)) { + result = true; + return (i == limit) ? limit : i + delim.length; + } + } + + for (byte[] aFALSE : FALSE) { + if (byteArrayEquals(bytes, startPos, i - startPos, aFALSE)) { + result = false; + return (i == limit) ? limit : i + delim.length; + } + } + + setErrorState(ParseErrorState.BOOLEAN_INVALID); + return -1; + } + + @Override + public Boolean getLastResult() { + return result; + } + + @Override + public Boolean createValue() { + return false; + } + + /** + * Checks if a part of a byte array matches another byte array with chars (case-insensitive). + * @param source The source byte array. + * @param start The offset into the source byte array. + * @param length The length of the match. + * @param other The byte array which is fully compared to the part of the source array. + * @return true if other can be found in the specified part of source, false otherwise. + */ + private static boolean byteArrayEquals(byte[] source, int start, int length, byte[] other) { + if (length != other.length) { + return false; + } + for (int i = 0; i < other.length; i++) { + if (Character.toLowerCase(source[i + start]) != other[i]) { + return false; + } + } + return true; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java new file mode 100644 index 0000000000000..d4ffafbe6f84d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.parser; + +import org.apache.flink.types.BooleanValue; + +public class BooleanValueParser extends FieldParser { + + private BooleanParser parser = new BooleanParser(); + + private BooleanValue result; + + @Override + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, BooleanValue reuse) { + int returnValue = parser.parseField(bytes, startPos, limit, delim, reuse.getValue()); + setErrorState(parser.getErrorState()); + reuse.setValue(parser.getLastResult()); + result = reuse; + return returnValue; + } + + @Override + public BooleanValue getLastResult() { + return result; + } + + @Override + public BooleanValue createValue() { + return new BooleanValue(false); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 52faf32600ad5..faf1adb60cd7e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.flink.types.BooleanValue; import org.apache.flink.types.ByteValue; import org.apache.flink.types.DoubleValue; import org.apache.flink.types.FloatValue; @@ -47,16 +48,16 @@ public abstract class FieldParser { public static enum ParseErrorState { /** No error occurred. */ NONE, - + /** The domain of the numeric type is not large enough to hold the parsed value. */ NUMERIC_VALUE_OVERFLOW_UNDERFLOW, - + /** A stand-alone sign was encountered while parsing a numeric type. */ NUMERIC_VALUE_ORPHAN_SIGN, - + /** An illegal character was encountered while parsing a numeric type. */ NUMERIC_VALUE_ILLEGAL_CHARACTER, - + /** The field was not in a correct format for the numeric type. */ NUMERIC_VALUE_FORMAT_ERROR, @@ -65,9 +66,12 @@ public static enum ParseErrorState { /** The parser found characters between the end of the quoted string and the delimiter. */ UNQUOTED_CHARS_AFTER_QUOTED_STRING, - + /** The string is empty. */ - EMPTY_STRING + EMPTY_STRING, + + /** Invalid Boolean value **/ + BOOLEAN_INVALID } private ParseErrorState errorState = ParseErrorState.NONE; @@ -181,6 +185,7 @@ public static Class> getParserForType(Class type) { PARSERS.put(String.class, StringParser.class); PARSERS.put(Float.class, FloatParser.class); PARSERS.put(Double.class, DoubleParser.class); + PARSERS.put(Boolean.class, BooleanParser.class); // value types PARSERS.put(ByteValue.class, ByteValueParser.class); @@ -190,5 +195,6 @@ public static Class> getParserForType(Class type) { PARSERS.put(StringValue.class, StringValueParser.class); PARSERS.put(FloatValue.class, FloatValueParser.class); PARSERS.put(DoubleValue.class, DoubleValueParser.class); + PARSERS.put(BooleanValue.class, BooleanValueParser.class); } } diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 394aba9551c6f..27e49f5be7de6 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -42,7 +42,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S final int delimLimit = limit-delimiter.length+1; - if(quotedStringParsing == true && bytes[i] == quoteCharacter) { + if(quotedStringParsing && bytes[i] == quoteCharacter) { // quoted string parsing enabled and first character Vis a quote i++; diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanParserTest.java new file mode 100644 index 0000000000000..1acdad1c977eb --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanParserTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +public class BooleanParserTest extends ParserTestBase { + + + @Override + public String[] getValidTestValues() { + return new String[] { + "true", "false", "0", "1", "TRUE", "FALSE", "True", "False" + }; + } + + @Override + public Boolean[] getValidTestResults() { + return new Boolean[] { + true, false, false, true, true, false, true, false + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[]{ + "yes", "no", "2", "-1", "wahr", "falsch", "", "asdf" + }; + } + + @Override + public boolean allowsEmptyField() { + return false; + } + + @Override + public FieldParser getParser() { + return new BooleanParser(); + } + + @Override + public Class getTypeClass() { + return Boolean.class; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java new file mode 100644 index 0000000000000..ff1885d690891 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +import org.apache.flink.types.BooleanValue; + +public class BooleanValueParserTest extends ParserTestBase { + + + @Override + public String[] getValidTestValues() { + return new String[] { + "true", "false", "0", "1", "TRUE", "FALSE", "True", "False" + }; + } + + @Override + public BooleanValue[] getValidTestResults() { + return new BooleanValue[] { + new BooleanValue(true), new BooleanValue(false), new BooleanValue(false), new BooleanValue(true), + new BooleanValue(true), new BooleanValue(false), new BooleanValue(true), new BooleanValue(false) + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[]{ + "yes", "no", "2", "-1", "wahr", "falsch", "", "asdf" + }; + } + + @Override + public boolean allowsEmptyField() { + return false; + } + + @Override + public FieldParser getParser() { + return new BooleanValueParser(); + } + + @Override + public Class getTypeClass() { + return BooleanValue.class; + } +}