Skip to content

Commit

Permalink
NIFI-5592: If an Exception is thrown by RecordReader.read() from Cons…
Browse files Browse the repository at this point in the history
…umeKafkaRecord, route Record to parse.failure relationship

Signed-off-by: Pierre Villard <[email protected]>

This closes apache#3001.
  • Loading branch information
markap14 authored and pvillard31 committed Sep 15, 2018
1 parent 5a84d65 commit c8fc132
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.xml.bind.DatatypeConverter;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -54,14 +30,39 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
Expand Down Expand Up @@ -530,51 +531,56 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
continue;
}

Record record;
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);

BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);

final OutputStream rawOut = session.write(flowFile);
try {
Record record;
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);

BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);

final OutputStream rawOut = session.write(flowFile);

final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);

rollback(topicPartition);
yield();

throw new ProcessException(e);
}

writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer.beginRecordSet();

tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}

final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);

rollback(topicPartition);
yield();

throw new ProcessException(e);
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}

writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer.beginRecordSet();

tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}

try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", 1L, false);
}

tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", 1L, false);
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
handleParseFailure(consumerRecord, session, e);
continue;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import javax.xml.bind.DatatypeConverter;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -54,14 +30,39 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
Expand Down Expand Up @@ -530,51 +531,56 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
continue;
}

Record record;
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);

BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);

final OutputStream rawOut = session.write(flowFile);
try {
Record record;
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);

BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);

final OutputStream rawOut = session.write(flowFile);

final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);

rollback(topicPartition);
yield();

throw new ProcessException(e);
}

writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer.beginRecordSet();

tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}

final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);

rollback(topicPartition);
yield();

throw new ProcessException(e);
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}

writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer.beginRecordSet();

tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}

try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", 1L, false);
}

tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", 1L, false);
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
handleParseFailure(consumerRecord, session, e);
continue;
}
}
}
Expand Down

0 comments on commit c8fc132

Please sign in to comment.