diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java index 56227f2b0d22..ecebb8e0919c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -17,17 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -43,6 +32,8 @@ import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.FragmentAttributes; @@ -64,6 +55,18 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + @SideEffectFree @TriggerWhenEmpty @@ -261,6 +264,34 @@ public final void resetState() { binManager.set(null); } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(); + + final Integer minRecords = validationContext.getProperty(MIN_RECORDS).asInteger(); + final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).asInteger(); + if (minRecords != null && maxRecords != null && maxRecords < minRecords) { + results.add(new ValidationResult.Builder() + .subject("Max Records") + .input(String.valueOf(maxRecords)) + .valid(false) + .explanation(" property cannot be smaller than property") + .build()); + } + + final Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B); + final Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B); + if (minSize != null && maxSize != null && maxSize < minSize) { + results.add(new ValidationResult.Builder() + .subject("Max Size") + .input(validationContext.getProperty(MAX_SIZE).getValue()) + .valid(false) + .explanation(" property cannot be smaller than property") + .build()); + } + + return results; + } @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { @@ -304,13 +335,24 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory session.commit(); } + // If there is no more data queued up, complete any bin that meets our minimum threshold + int completedBins = 0; + if (flowFiles.isEmpty()) { + try { + completedBins += manager.completeFullEnoughBins(); + } catch (final Exception e) { + getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e); + } + } + + // Complete any bins that have reached their expiration date try { - manager.completeExpiredBins(); + completedBins += manager.completeExpiredBins(); } catch (final Exception e) { getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e); } - if (flowFiles.isEmpty()) { + if (completedBins == 0 && flowFiles.isEmpty()) { getLogger().debug("No FlowFiles to bin; will yield"); context.yield(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java index 6dc42474124a..23f5edffe451 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java @@ -17,6 +17,19 @@ package org.apache.nifi.processors.standard.merge; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processors.standard.MergeRecord; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -32,21 +45,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processors.standard.MergeRecord; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.WriteResult; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.stream.io.ByteCountingOutputStream; - public class RecordBin { public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age"; @@ -96,8 +94,7 @@ public boolean isComplete() { return complete; } - public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) - throws IOException, MalformedRecordException, SchemaNotFoundException { + public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException { if (isComplete()) { logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java index 8496a4d5d303..312832e0aedd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java @@ -17,19 +17,6 @@ package org.apache.nifi.processors.standard.merge; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.nifi.components.PropertyValue; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -43,6 +30,20 @@ import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; + public class RecordBinManager { private final ProcessContext context; @@ -237,8 +238,16 @@ public void completeOldestBin() throws IOException { } - public void completeExpiredBins() throws IOException { + public int completeExpiredBins() throws IOException { final long maxNanos = maxBinAgeNanos.get(); + return handleCompletedBins(bin -> bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)); + } + + public int completeFullEnoughBins() throws IOException { + return handleCompletedBins(bin -> bin.isFullEnough()); + } + + private int handleCompletedBins(final Predicate completionTest) throws IOException { final Map> expiredBinMap = new HashMap<>(); lock.lock(); @@ -248,7 +257,7 @@ public void completeExpiredBins() throws IOException { final List bins = entry.getValue(); for (final RecordBin bin : bins) { - if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) { + if (completionTest.test(bin)) { final List expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>()); expiredBinsForKey.add(bin); } @@ -258,6 +267,7 @@ public void completeExpiredBins() throws IOException { lock.unlock(); } + int completed = 0; for (final Map.Entry> entry : expiredBinMap.entrySet()) { final String key = entry.getKey(); final List expiredBins = entry.getValue(); @@ -265,12 +275,16 @@ public void completeExpiredBins() throws IOException { for (final RecordBin bin : expiredBins) { logger.debug("Completing Bin {} because it has expired"); bin.complete("Bin has reached Max Bin Age"); + completed++; } removeBins(key, expiredBins); } + + return completed; } + private void removeBins(final String key, final List bins) { lock.lock(); try { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java index 261f981975de..4ba57afa1125 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java @@ -17,12 +17,6 @@ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertEquals; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.CommaSeparatedRecordReader; @@ -34,6 +28,12 @@ import org.junit.Ignore; import org.junit.Test; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + public class TestMergeRecord { private TestRunner runner; private CommaSeparatedRecordReader readerService; @@ -202,11 +202,24 @@ public void testMinSize() { } @Test - public void testMinRecords() { + public void testValidation() { runner.setProperty(MergeRecord.MIN_RECORDS, "103"); runner.setProperty(MergeRecord.MAX_RECORDS, "2"); runner.setProperty(MergeRecord.MIN_SIZE, "500 B"); + runner.assertNotValid(); + + runner.setProperty(MergeRecord.MIN_RECORDS, "2"); + runner.setProperty(MergeRecord.MAX_RECORDS, "103"); + runner.assertValid(); + } + + @Test + public void testMinRecords() { + runner.setProperty(MergeRecord.MIN_RECORDS, "103"); + runner.setProperty(MergeRecord.MAX_RECORDS, "110"); + runner.setProperty(MergeRecord.MIN_SIZE, "500 B"); + runner.enqueue("Name, Age\nJohn, 35"); runner.enqueue("Name, Age\nJane, 34"); @@ -221,7 +234,7 @@ public void testMinRecords() { runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0); runner.enqueue("Name, Age\nJohn, 35"); - runner.run(); + runner.run(2); runner.assertTransferCount(MergeRecord.REL_MERGED, 1); runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4); } @@ -240,6 +253,8 @@ public void testMaxRecords() { runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 30); assertEquals(4, runner.getQueueSize().getObjectCount()); + + runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).stream().forEach(ff -> ff.assertAttributeEquals("record.count", "10")); } @Test