Skip to content

Commit

Permalink
NIFI-5771: Ensure that we only increment claimant count for content c…
Browse files Browse the repository at this point in the history
…laim if we have a FlowFile that references it

This closes apache#3118.

Signed-off-by: Bryan Bende <[email protected]>
  • Loading branch information
markap14 authored and bbende committed Oct 31, 2018
1 parent 1f2cf4b commit 4c10b47
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,17 +285,14 @@ protected void receiveFlowFiles(final InputStream in, final OutputStream out, fi
if (contentClaim == null) {
contentClaim = contentRepository.create(false);
contentClaimOut = contentRepository.write(contentClaim);
} else {
contentRepository.incrementClaimaintCount(contentClaim);
}

final RemoteFlowFileRecord flowFile;
try {
flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);
} catch (final Exception e) {
contentRepository.decrementClaimantCount(contentClaim);
throw e;
}
final RemoteFlowFileRecord flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);

// The FlowFile's Content Claim will either be null or equal to the provided Content Claim.
// Incrementing the FlowFile's content claim will increment the count for the provided Content Claim, if it was
// assigned to the FlowFIle, or call incrementClaimantCount with an argument of null, which will do nothing.
contentRepository.incrementClaimaintCount(flowFile.getFlowFile().getContentClaim());

flowFilesReceived.add(flowFile);

Expand All @@ -307,15 +304,26 @@ protected void receiveFlowFiles(final InputStream in, final OutputStream out, fi
}
}

// When the Content Claim is created initially, it has a Claimaint Count of 1. We then increment the Claimant Count for each FlowFile that we add to the Content Claim,
// which means that the claimant count is currently 1 larger than it needs to be. So we will decrement the claimant count now. If that results in a count of 0, then
// we can go ahead and remove the Content Claim, since we know it's not being referenced.
final int count = contentRepository.decrementClaimantCount(contentClaim);

verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size());
completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue);

if (count == 0) {
contentRepository.remove(contentClaim);
}
} catch (final Exception e) {
// If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because
// they are no longer needed.
for (final RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) {
contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim());
}

contentRepository.remove(contentClaim);

throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ public void testDestinationNodeQueueFull() throws IOException, InterruptedExcept
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);

when(serverQueue.isFull()).thenReturn(true);
when(serverQueue.isLocalPartitionFull()).thenReturn(true);

// Create the server
final int timeoutMillis = 30000;
Expand Down Expand Up @@ -1266,7 +1266,7 @@ public void testDestinationNodeQueueFull() throws IOException, InterruptedExcept
assertEquals(2, flowFileQueue.size().getObjectCount());

// Enable data to be transferred
when(serverQueue.isFull()).thenReturn(false);
when(serverQueue.isLocalPartitionFull()).thenReturn(false);

while (clientRepoRecords.size() != 1) {
Thread.sleep(10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,9 @@ public void testBadChecksum() throws IOException {
Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
}

@Test
Expand Down Expand Up @@ -509,7 +511,9 @@ public void testEofWritingContent() throws IOException {
Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(0)).incrementClaimaintCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(0)).decrementClaimantCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
}

@Test
Expand Down Expand Up @@ -559,7 +563,9 @@ public void testAbortAfterChecksumConfirmation() throws IOException {
Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next());
Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
}

@Test
Expand Down

0 comments on commit 4c10b47

Please sign in to comment.