Skip to content

Commit

Permalink
NIFI-5514: Fixed bugs in MergeRecord around minimum thresholds not be…
Browse files Browse the repository at this point in the history
…ing honored and validation not being performed to ensure that minimum threshold is smaller than max threshold (would previously allow min record = 100, max records = 2 as a valid configuration)

NIFI-5514: Do not rely on ProcessSession.getQueueSize() to return a queue size of 0 objects because if the processor is holding onto data, the queue size won't be 0.

Signed-off-by: Pierre Villard <[email protected]>

This closes apache#2954.
  • Loading branch information
markap14 authored and pvillard31 committed Sep 24, 2018
1 parent f5e9ea6 commit 2a96468
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@

package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
Expand All @@ -43,6 +32,8 @@
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
Expand All @@ -64,6 +55,18 @@
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;


@SideEffectFree
@TriggerWhenEmpty
Expand Down Expand Up @@ -261,6 +264,34 @@ public final void resetState() {
binManager.set(null);
}

@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();

final Integer minRecords = validationContext.getProperty(MIN_RECORDS).asInteger();
final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).asInteger();
if (minRecords != null && maxRecords != null && maxRecords < minRecords) {
results.add(new ValidationResult.Builder()
.subject("Max Records")
.input(String.valueOf(maxRecords))
.valid(false)
.explanation("<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property")
.build());
}

final Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
final Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
if (minSize != null && maxSize != null && maxSize < minSize) {
results.add(new ValidationResult.Builder()
.subject("Max Size")
.input(validationContext.getProperty(MAX_SIZE).getValue())
.valid(false)
.explanation("<Maximum Bin Size> property cannot be smaller than <Minimum Bin Size> property")
.build());
}

return results;
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
Expand Down Expand Up @@ -304,13 +335,24 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
session.commit();
}

// If there is no more data queued up, complete any bin that meets our minimum threshold
int completedBins = 0;
if (flowFiles.isEmpty()) {
try {
completedBins += manager.completeFullEnoughBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
}
}

// Complete any bins that have reached their expiration date
try {
manager.completeExpiredBins();
completedBins += manager.completeExpiredBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
}

if (flowFiles.isEmpty()) {
if (completedBins == 0 && flowFiles.isEmpty()) {
getLogger().debug("No FlowFiles to bin; will yield");
context.yield();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@

package org.apache.nifi.processors.standard.merge;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.stream.io.ByteCountingOutputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
Expand All @@ -32,21 +45,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.stream.io.ByteCountingOutputStream;

public class RecordBin {
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
Expand Down Expand Up @@ -96,8 +94,7 @@ public boolean isComplete() {
return complete;
}

public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block)
throws IOException, MalformedRecordException, SchemaNotFoundException {
public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException {

if (isComplete()) {
logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,6 @@

package org.apache.nifi.processors.standard.merge;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
Expand All @@ -43,6 +30,20 @@
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

public class RecordBinManager {

private final ProcessContext context;
Expand Down Expand Up @@ -237,8 +238,16 @@ public void completeOldestBin() throws IOException {
}


public void completeExpiredBins() throws IOException {
public int completeExpiredBins() throws IOException {
final long maxNanos = maxBinAgeNanos.get();
return handleCompletedBins(bin -> bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS));
}

public int completeFullEnoughBins() throws IOException {
return handleCompletedBins(bin -> bin.isFullEnough());
}

private int handleCompletedBins(final Predicate<RecordBin> completionTest) throws IOException {
final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();

lock.lock();
Expand All @@ -248,7 +257,7 @@ public void completeExpiredBins() throws IOException {
final List<RecordBin> bins = entry.getValue();

for (final RecordBin bin : bins) {
if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) {
if (completionTest.test(bin)) {
final List<RecordBin> expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
expiredBinsForKey.add(bin);
}
Expand All @@ -258,19 +267,24 @@ public void completeExpiredBins() throws IOException {
lock.unlock();
}

int completed = 0;
for (final Map.Entry<String, List<RecordBin>> entry : expiredBinMap.entrySet()) {
final String key = entry.getKey();
final List<RecordBin> expiredBins = entry.getValue();

for (final RecordBin bin : expiredBins) {
logger.debug("Completing Bin {} because it has expired");
bin.complete("Bin has reached Max Bin Age");
completed++;
}

removeBins(key, expiredBins);
}

return completed;
}


private void removeBins(final String key, final List<RecordBin> bins) {
lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@

package org.apache.nifi.processors.standard;

import static org.junit.Assert.assertEquals;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
Expand All @@ -34,6 +28,12 @@
import org.junit.Ignore;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;

public class TestMergeRecord {
private TestRunner runner;
private CommaSeparatedRecordReader readerService;
Expand Down Expand Up @@ -202,11 +202,24 @@ public void testMinSize() {
}

@Test
public void testMinRecords() {
public void testValidation() {
runner.setProperty(MergeRecord.MIN_RECORDS, "103");
runner.setProperty(MergeRecord.MAX_RECORDS, "2");
runner.setProperty(MergeRecord.MIN_SIZE, "500 B");

runner.assertNotValid();

runner.setProperty(MergeRecord.MIN_RECORDS, "2");
runner.setProperty(MergeRecord.MAX_RECORDS, "103");
runner.assertValid();
}

@Test
public void testMinRecords() {
runner.setProperty(MergeRecord.MIN_RECORDS, "103");
runner.setProperty(MergeRecord.MAX_RECORDS, "110");
runner.setProperty(MergeRecord.MIN_SIZE, "500 B");

runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Age\nJane, 34");

Expand All @@ -221,7 +234,7 @@ public void testMinRecords() {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);

runner.enqueue("Name, Age\nJohn, 35");
runner.run();
runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
}
Expand All @@ -240,6 +253,8 @@ public void testMaxRecords() {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 30);

assertEquals(4, runner.getQueueSize().getObjectCount());

runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).stream().forEach(ff -> ff.assertAttributeEquals("record.count", "10"));
}

@Test
Expand Down

0 comments on commit 2a96468

Please sign in to comment.