Skip to content

Commit

Permalink
[FLINK-993] Primitive input format fails for invalid input. Adapted d…
Browse files Browse the repository at this point in the history
…ocumentation.
  • Loading branch information
fhueske committed Nov 19, 2014
1 parent d640b6c commit 3ac4df8
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/programming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,7 @@ File-based:
- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another char) delimited fields.
Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
types.
- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`.

Collection-based:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,16 @@ public void setNumLineSamples(int numLineSamples) {
// --------------------------------------------------------------------------------------------

/**
* This function parses the given byte array which represents a serialized records.
* The parsed content is then returned by setting the pair variables. If the
* byte array contains invalid content the record can be skipped by returning <tt>false</tt>.
* This function parses the given byte array which represents a serialized record.
* The function returns a valid record or throws an IOException.
*
* @param reuse An optionally reusable object.
* @param bytes Binary data of serialized records.
* @param offset The offset where to start to read the record data.
* @param numBytes The number of bytes that can be read starting at the offset position.
*
* @return returns whether the record was successfully deserialized or not.
* @return Returns the read record if it was successfully deserialized.
* @throws IOException if the record could not be read.
*/
public abstract OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public DataSource<StringValue> readTextFileWithValue(String filePath, String cha
public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass) {
Validate.notNull(filePath, "The file path may not be null.");

return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass));
return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
}

/**
Expand All @@ -299,7 +299,7 @@ public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClas
public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass) {
Validate.notNull(filePath, "The file path may not be null.");

return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass));
return new DataSource<X>(this, new PrimitiveInputFormat<X>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
}

// ----------------------------------- CSV Input Format ---------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ public void open(FileInputSplit split) throws IOException {
}

@Override
public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) {
//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
// Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
&& offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
numBytes -= 1;
}

//Null character as delimiter is used because there's only 1 field to be parsed
parser.parseField(bytes, offset, numBytes + offset, '\0', reuse);
return parser.getLastResult();
// Null character as delimiter is used because there's only 1 field to be parsed
if (parser.parseField(bytes, offset, numBytes + offset, '\0', reuse) >= 0) {
return parser.getLastResult();
} else {
String s = new String(bytes, offset, numBytes);
throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void testIntegerInput() throws IOException {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}


@Test
public void testDoubleInputLinewise() throws IOException {
Expand Down Expand Up @@ -152,6 +153,27 @@ public void testRemovingTrailingCR() {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}

@Test(expected = IOException.class)
public void testFailingInput() throws IOException {

final String fileContent = "111|222|asdf|17";
final FileInputSplit split = createInputSplit(fileContent);

final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);

format.configure(new Configuration());
format.open(split);

Integer result = null;
result = format.nextRecord(result);
assertEquals(Integer.valueOf(111), result);

result = format.nextRecord(result);
assertEquals(Integer.valueOf(222), result);

result = format.nextRecord(result);
}

private FileInputSplit createInputSplit(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
val datasource = new DataSource[T](
javaEnv,
new PrimitiveInputFormat(new Path(filePath), delimiter, typeInfo.getTypeClass),
typeInfo)
typeInfo,
getCallLocationName())
wrap(datasource)
}

Expand Down

0 comments on commit 3ac4df8

Please sign in to comment.