Skip to content

Commit

Permalink
NIFI-7953: Updated ConsumeKafka_2_0/ConsumeKafkaRecord_2_0/ConsumeKaf…
Browse files Browse the repository at this point in the history
…ka_2_6/ConsumeKafkaRecord_2_6 to allow separating records by key
  • Loading branch information
markap14 committed Oct 27, 2020
1 parent 7496899 commit c610aab
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public void assertAttributeNotExists(final String attributeName) {
}

public void assertAttributeEquals(final String attributeName, final String expectedValue) {
Assert.assertEquals(expectedValue, attributes.get(attributeName));
Assert.assertEquals("Expected attribute " + attributeName + " to be " + expectedValue + " but instead it was " + attributes.get(attributeName),
expectedValue, attributes.get(attributeName));
}

public void assertAttributeNotEquals(final String attributeName, final String expectedValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
Expand All @@ -57,6 +56,12 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.0 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_0. Please note that, at this time, the Processor assumes that "
+ "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
Expand All @@ -77,7 +82,7 @@
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
expressionLanguageScope = VARIABLE_REGISTRY)
@SeeAlso({ConsumeKafka_2_0.class, PublishKafka_2_0.class, PublishKafkaRecord_2_0.class})
public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {

Expand All @@ -93,7 +98,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();

static final PropertyDescriptor TOPIC_TYPE = new Builder()
Expand All @@ -110,7 +115,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(NONE)
.required(true)
.build();

Expand All @@ -119,7 +124,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(NONE)
.required(true)
.build();

Expand All @@ -129,7 +134,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
.description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();

static final PropertyDescriptor AUTO_OFFSET_RESET = new Builder()
Expand Down Expand Up @@ -179,7 +184,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
+ "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
+ "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
+ "for the producer to finish its entire transaction instead of pulling as the messages become available.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
Expand All @@ -203,8 +208,25 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
+ "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.expressionLanguageSupported(NONE)
.required(false)
.build();
static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
.name("separate-by-key")
.displayName("Separate By Key")
.description("If true, two Records will only be added to the same FlowFile if both of the Kafka Messages have identical keys.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
.name("key-attribute-encoding")
.displayName("Key Attribute Encoding")
.description("If the <Separate By Key> property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY +
"'. This property dictates how the value of the attribute should be encoded.")
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE)
.build();

static final Relationship REL_SUCCESS = new Relationship.Builder()
Expand Down Expand Up @@ -242,6 +264,8 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(GROUP_ID);
descriptors.add(SEPARATE_BY_KEY);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
Expand Down Expand Up @@ -283,7 +307,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
.name(propertyDescriptorName)
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
}

Expand Down Expand Up @@ -328,6 +352,9 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);

final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();

if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
Expand All @@ -337,11 +364,11 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
}

return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern);
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern);
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();

static final PropertyDescriptor SEPARATE_BY_KEY = new PropertyDescriptor.Builder()
.name("separate-by-key")
.displayName("Separate By Key")
.description("If true, and the <Message Demarcator> property is set, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
.build();

static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
Expand Down Expand Up @@ -234,6 +244,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(SEPARATE_BY_KEY);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
descriptors.add(MAX_POLL_RECORDS);
Expand Down Expand Up @@ -315,6 +326,8 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);

final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();

if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
Expand All @@ -323,11 +336,11 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
}
}

return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -83,6 +84,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private final RecordReaderFactory readerFactory;
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
private final boolean separateByKey;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
Expand All @@ -103,7 +105,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordSetWriterFactory writerFactory,
final ComponentLog logger,
final Charset headerCharacterSet,
final Pattern headerNamePattern) {
final Pattern headerNamePattern,
final boolean separateByKey) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
Expand All @@ -115,6 +118,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
this.logger = logger;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
}

/**
Expand Down Expand Up @@ -164,7 +168,7 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
/**
/*
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
Expand Down Expand Up @@ -202,7 +206,7 @@ boolean commit() {
return false;
}
try {
/**
/*
* Committing the nifi session then the offsets means we have an at
* least once guarantee here. If we reversed the order we'd have at
* most once.
Expand Down Expand Up @@ -412,7 +416,7 @@ private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte
private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
// Group the Records by their BundleInformation
final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream()
.collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec))));
.collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ? rec.key() : null)));

for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) {
final BundleInformation bundleInfo = entry.getKey();
Expand Down Expand Up @@ -538,7 +542,7 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);

BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
Expand Down Expand Up @@ -626,9 +630,16 @@ private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);

// If we have a kafka key, we will add it as an attribute only if
// the FlowFile contains a single Record, or if the Records have been separated by Key,
// because we then know that even though there are multiple Records, they all have the same key.
if (tracker.key != null && (tracker.totalRecords == 1 || separateByKey)) {
if (!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue())) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
}

kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
Expand All @@ -647,8 +658,8 @@ private void populateAttributes(final BundleTracker tracker) {
tracker.updateFlowFile(newFlowFile);
}

private static class BundleTracker {

private static class BundleTracker {
final long initialOffset;
final long initialTimestamp;
final int partition;
Expand Down Expand Up @@ -678,23 +689,24 @@ private void incrementRecordCount(final long count) {
private void updateFlowFile(final FlowFile flowFile) {
this.flowFile = flowFile;
}

}

private static class BundleInformation {
private final TopicPartition topicPartition;
private final RecordSchema schema;
private final Map<String, String> attributes;
private final byte[] messageKey;

public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes) {
public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes, final byte[] messageKey) {
this.topicPartition = topicPartition;
this.schema = schema;
this.attributes = attributes;
this.messageKey = messageKey;
}

@Override
public int hashCode() {
return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode());
return 41 + Objects.hash(topicPartition, schema, attributes) + 37 * Arrays.hashCode(messageKey);
}

@Override
Expand All @@ -710,7 +722,8 @@ public boolean equals(final Object obj) {
}

final BundleInformation other = (BundleInformation) obj;
return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes);
return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes)
&& Arrays.equals(this.messageKey, other.messageKey);
}
}
}
Loading

0 comments on commit c610aab

Please sign in to comment.