Skip to content

Commit

Permalink
[FLINK-4248] [core] [table] CsvTableSource does not support reading S…
Browse files Browse the repository at this point in the history
…qlTimeTypeInfo types

This closes apache#2303.
  • Loading branch information
twalthr committed Sep 22, 2016
1 parent b516056 commit 3507d59
Show file tree
Hide file tree
Showing 18 changed files with 675 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,42 +35,27 @@ public class BigDecParser extends FieldParser<BigDecimal> {

@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigDecimal reusable) {
int i = startPos;

final int delimLimit = limit - delimiter.length + 1;

while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
break;
}
i++;
}

if (i > startPos &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
if (endPos < 0) {
return -1;
}

try {
final int length = i - startPos;
final int length = endPos - startPos;
if (reuse == null || reuse.length < length) {
reuse = new char[length];
}
for (int j = 0; j < length; j++) {
final byte b = bytes[startPos + j];
if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') {
throw new NumberFormatException();
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}
reuse[j] = (char) bytes[startPos + j];
}

this.result = new BigDecimal(reuse, 0, length);
return (i == limit) ? limit : i + delimiter.length;
return (endPos == limit) ? limit : endPos + delimiter.length;
} catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
Expand All @@ -96,7 +81,7 @@ public BigDecimal getLastResult() {
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
* @throws NumberFormatException Thrown when the value cannot be parsed because the text
* @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigDecimal parseField(byte[] bytes, int startPos, int length) {
Expand All @@ -113,7 +98,7 @@ public static final BigDecimal parseField(byte[] bytes, int startPos, int length
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
* @return The parsed value.
* @throws NumberFormatException Thrown when the value cannot be parsed because the text
* @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigDecimal parseField(byte[] bytes, int startPos, int length, char delimiter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,21 @@ public class BigIntParser extends FieldParser<BigInteger> {

@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigInteger reusable) {
int i = startPos;

final int delimLimit = limit - delimiter.length + 1;

while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
break;
}
i++;
final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
if (endPos < 0) {
return -1;
}

if (i > startPos &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
if (endPos > startPos &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}

String str = new String(bytes, startPos, i - startPos);
String str = new String(bytes, startPos, endPos - startPos);
try {
this.result = new BigInteger(str);
return (i == limit) ? limit : i + delimiter.length;
return (endPos == limit) ? limit : endPos + delimiter.length;
} catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
Expand All @@ -84,7 +74,7 @@ public BigInteger getLastResult() {
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
* @throws NumberFormatException Thrown when the value cannot be parsed because the text
* @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigInteger parseField(byte[] bytes, int startPos, int length) {
Expand All @@ -101,26 +91,18 @@ public static final BigInteger parseField(byte[] bytes, int startPos, int length
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
* @return The parsed value.
* @throws NumberFormatException Thrown when the value cannot be parsed because the text
* @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigInteger parseField(byte[] bytes, int startPos, int length, char delimiter) {
if (length <= 0) {
throw new NumberFormatException("Invalid input: Empty string");
}
int i = 0;
final byte delByte = (byte) delimiter;

while (i < length && bytes[startPos + i] != delByte) {
i++;
}
final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);

if (i > 0 &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
if (limitedLen > 0 &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}

String str = new String(bytes, startPos, i);
final String str = new String(bytes, startPos, limitedLen);
return new BigInteger(str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,21 @@ public class DoubleParser extends FieldParser<Double> {

@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) {
int i = startPos;

final int delimLimit = limit - delimiter.length + 1;

while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
break;
}
i++;
final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
if (endPos < 0) {
return -1;
}

if (i > startPos &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
if (endPos > startPos &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}

String str = new String(bytes, startPos, i - startPos);
String str = new String(bytes, startPos, endPos - startPos);
try {
this.result = Double.parseDouble(str);
return (i == limit) ? limit : i + delimiter.length;
return (endPos == limit) ? limit : endPos + delimiter.length;
} catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
Expand All @@ -83,7 +73,7 @@ public Double getLastResult() {
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
* @throws NumberFormatException Thrown when the value cannot be parsed because the text
* @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final double parseField(byte[] bytes, int startPos, int length) {
Expand All @@ -100,26 +90,18 @@ public static final double parseField(byte[] bytes, int startPos, int length) {
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
* @return The parsed value.
* @throws NumberFormatException Thrown when the value cannot be parsed because the text
* @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final double parseField(byte[] bytes, int startPos, int length, char delimiter) {
if (length <= 0) {
throw new NumberFormatException("Invalid input: Empty string");
}
int i = 0;
final byte delByte = (byte) delimiter;

while (i < length && bytes[startPos + i] != delByte) {
i++;
}
final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);

if (i > 0 &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
if (limitedLen > 0 &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}

String str = new String(bytes, startPos, i);
final String str = new String(bytes, startPos, limitedLen);
return Double.parseDouble(str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,23 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {

@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) {

int i = startPos;

final int delimLimit = limit - delimiter.length + 1;

while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
break;
}
i++;
final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
if (endPos < 0) {
return -1;
}
if (i > startPos &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {

if (endPos > startPos &&
(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}

String str = new String(bytes, startPos, i - startPos);
String str = new String(bytes, startPos, endPos - startPos);
try {
double value = Double.parseDouble(str);
reusable.setValue(value);
this.result = reusable;
return (i == limit) ? limit : i + delimiter.length;
return (endPos == limit) ? limit : endPos + delimiter.length;
}
catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,49 @@ protected void setErrorState(ParseErrorState error) {
public ParseErrorState getErrorState() {
return this.errorState;
}

/**
* Returns the end position of a string. Sets the error state if the column is empty.
*
* @return the end position of the string or -1 if an error occurred
*/
protected final int nextStringEndPos(byte[] bytes, int startPos, int limit, byte[] delimiter) {
int endPos = startPos;

final int delimLimit = limit - delimiter.length + 1;

while (endPos < limit) {
if (endPos < delimLimit && delimiterNext(bytes, endPos, delimiter)) {
if (endPos == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
break;
}
endPos++;
}

return endPos;
}

/**
* Returns the length of a string. Throws an exception if the column is empty.
*
* @return the length of the string
*/
protected static final int nextStringLength(byte[] bytes, int startPos, int length, char delimiter) {
if (length <= 0) {
throw new IllegalArgumentException("Invalid input: Empty string");
}
int limitedLength = 0;
final byte delByte = (byte) delimiter;

while (limitedLength < length && bytes[startPos + limitedLength] != delByte) {
limitedLength++;
}

return limitedLength;
}

// --------------------------------------------------------------------------------------------
// Mapping from types to parsers
Expand Down Expand Up @@ -222,5 +265,10 @@ public static <T> Class<FieldParser<T>> getParserForType(Class<T> type) {
PARSERS.put(FloatValue.class, FloatValueParser.class);
PARSERS.put(DoubleValue.class, DoubleValueParser.class);
PARSERS.put(BooleanValue.class, BooleanValueParser.class);

// SQL date/time types
PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
PARSERS.put(java.sql.Date.class, SqlDateParser.class);
PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
}
}
Loading

0 comments on commit 3507d59

Please sign in to comment.