Skip to content

Commit

Permalink
NIFI-5264 - Added attribute for validation error message in ValidateCSV
Browse files Browse the repository at this point in the history
This closes apache#2769

Signed-off-by: zenfenan <[email protected]>
  • Loading branch information
pvillard31 authored and zenfenan committed Jun 8, 2018
1 parent 49228aa commit 6e06773
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@
@WritesAttributes({
@WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"),
@WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"),
@WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data")
@WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data"),
@WritesAttribute(attribute="validation.error.message", description="For flow files routed to invalid, message of the first validation error")
})
public class ValidateCsv extends AbstractProcessor {

Expand Down Expand Up @@ -455,6 +456,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
final AtomicReference<String> validationError = new AtomicReference<String>(null);

if(!isWholeFFValidation) {
invalidFF.set(session.create(flowFile));
Expand Down Expand Up @@ -514,6 +516,7 @@ public void process(OutputStream out) throws IOException {
} catch (final SuperCsvException e) {
valid.set(false);
if(isWholeFFValidation) {
validationError.set(e.getLocalizedMessage());
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
break;
} else {
Expand All @@ -528,6 +531,10 @@ public void process(OutputStream out) throws IOException {
if(isFirstLineInvalid.get()) {
isFirstLineInvalid.set(false);
}

if(validationError.get() == null) {
validationError.set(e.getLocalizedMessage());
}
}
} finally {
if(!isWholeFFValidation) {
Expand All @@ -554,6 +561,7 @@ public void process(OutputStream out) throws IOException {
session.transfer(flowFile, REL_VALID);
} else {
session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.putAttribute(flowFile, "validation.error.message", validationError.get());
session.transfer(flowFile, REL_INVALID);
}
} else {
Expand All @@ -578,13 +586,15 @@ public void process(OutputStream out) throws IOException {
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get())));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(flowFile);
} else {
logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()});
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(validFF.get());
session.remove(flowFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public void testValidDateOptionalDouble() {
runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
"'22/111954' could not be parsed as a Date");
}

@Test
Expand Down Expand Up @@ -197,6 +199,8 @@ public void testStrlenStrMinMaxStrRegex() {
runner.enqueue("test,test,testapache.org");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
"'testapache.org' does not match the regular expression '[a-z0-9\\._]+@[a-z0-9\\.]+'");
}

@Test
Expand Down

0 comments on commit 6e06773

Please sign in to comment.