Skip to content

Commit

Permalink
NIFI-8205: Documentation improvements for the Wait processor
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <[email protected]>

This closes apache#4808.
  • Loading branch information
pgyori authored and pvillard31 committed Feb 23, 2021
1 parent 528fce2 commit d049bca
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Routes incoming FlowFiles to the 'wait' relationship until a matching release signal "
+ "is stored in the distributed cache from a corresponding Notify processor. "
+ "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, "
+ "with attributes copied from the FlowFile that produced the release signal from the Notify processor. "
+ "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+ "When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. "
+ "The release signal entry is then removed from the cache. "
+ "The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex "
+ "property of the corresponding Notify processor is set properly. "
+ "If there are multiple release signals in the cache identified by the Release Signal Identifier, "
+ "and the Notify processor is configured to copy the FlowFile attributes to the cache, "
+ "then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles "
+ "that produced the release signals in the cache (identified by Release Signal Identifier). "
+ "Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "

+ "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
+ "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. "
Expand All @@ -88,8 +94,9 @@
@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. "
+ "This attribute is not written when the FlowFile is transferred to failure, expired or success"),
@WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
+ "each count value in the signal is copied.")
@WritesAttribute(attribute = "wait.counter.<counterName>", description = "The name of each counter for which at least one signal "
+ "has been present in the cache since the last time the cache was empty "
+ "gets copied to the current FlowFile as an attribute.")
})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.Notify"})
Expand All @@ -110,8 +117,9 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
.name("release-signal-id")
.displayName("Release Signal Identifier")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the release signal cache key")
.description("A value that specifies the key to a specific release signal cache. "
+ "To decide whether the FlowFile that is being processed by the Wait processor should be sent to the 'success' "
+ "or the 'wait' relationship, the processor checks the signals in the cache specified by this key.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
Expand All @@ -120,11 +128,12 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new PropertyDescriptor.Builder()
.name("target-signal-count")
.displayName("Target Signal Count")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the target signal count. " +
"This processor checks whether the signal count has reached this number. " +
"If Signal Counter Name is specified, this processor checks a particular counter, " +
"otherwise checks against total count in a signal.")
.description("The number of signals that need to be in the cache (specified by the Release Signal Identifier) "
+ "in order for the FlowFile processed by the Wait processor to be sent to the ‘success’ relationship. "
+ "If the number of signals in the cache has reached this number, the FlowFile is routed to the "
+ "'success' relationship and the number of signals in the cache is decreased by this value. "
+ "If Signal Counter Name is specified, this processor checks a particular counter, "
+ "otherwise checks against the total number of signals in the cache.")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
Expand All @@ -134,9 +143,9 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("signal-counter-name")
.displayName("Signal Counter Name")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter name. " +
"If not specified, this processor checks the total count in a signal.")
.description("Within the cache (specified by the Release Signal Identifier) the signals may belong to different counters. "
+ "If this property is specified, the processor checks the number of signals in the cache that belong to this particular counter. "
+ "If not specified, the processor checks the total number of signals in the cache.")
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ValidateCsv</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<meta charset="utf-8"/>
<title>Wait</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
<style>
table td:first-child {text-align: center;}
</style>
Expand All @@ -32,10 +32,10 @@ <h2>Best practices to handle multiple signal ids at a Wait processor</h2>

Also, you will need to have high level understanding on how Wait processor works:
<ul>
<li>Wait processor only process a single signal id at a time</li>
<li>How frequent Wait processor runs is defined at 'Run Schedule'</li>
<li>Which FlowFile is processed is determined by Prioritizer</li>
<li>Not limited to Wait processor, but for all processors, the order of queued FlowFiles in a connection is undefined if no Prioritizer is set</li>
<li>The Wait processor only processes a single signal id at a time</li>
<li>How frequently the Wait processor runs is defined in the 'Run Schedule'</li>
<li>Which FlowFile is processed is determined by a Prioritizer</li>
<li>Not limited to the Wait processor, but for all processors, the order of queued FlowFiles in a connection is undefined if no Prioritizer is set</li>
</ul>


Expand Down Expand Up @@ -273,5 +273,101 @@ <h4 id="run-duration">The importance of 'Run Duration' when 'Wait Penalty Durati
To mitigate such limitation, increasing 'Run Duration' is recommended. By increasing 'Run Duration', Wait processor can keep being scheduled for that duration. For example, with 'Run Duration' 500 ms, Wait should be able to loop through all 5 queued FlowFiles at a single run.
</p>

<h2>Using counters</h2>
<p>
A counter is basically a label to differentiate signals within the cache.
(A cache in this context is a "container" that contains signals that have the same signal identifier.)
</p>
<p>
Let's suppose that there are the following signals in the cache
(note, that these are not FlowFiles on the incoming (or wait) connection of the Wait processor, like
in the examples above, but release signals stored in the cache.)
</p>
<table>
<tr>
<th>Signal ID</th>
<th>Signal Counter Name</th>
</tr>
<tr>
<td>A</td>
<td>counter_1</td>
</tr>
<tr>
<td>A</td>
<td>counter_1</td>
</tr>
<tr>
<td>A</td>
<td>counter_2</td>
</tr>
</table>
<p>
In this state, the following FlowFile gets processed by the Wait processor,
(the FlowFile has a signal_counter_name attribute and the Wait processor is configured
to use the value of this attribute as the value of the Signal Counter Name property):
</p>
<table>
<tr>
<th>FlowFile UUID</th>
<th>Signal ID</th>
<th>signal_counter_name</th>
</tr>
<tr>
<td>a-1</td>
<td>A</td>
<td>counter_3</td>
</tr>
</table>
<p>
Despite the fact that the cache identified by Signal ID "A" has signals in it,
the FlowFile above will be sent to the 'wait' relationship, since there is no signal
in the cache that belongs to the counter named "counter_3".
</p>
<p>
Let's suppose, that the state of the cache is the same as above, and the following
FlowFile gets processed by the Wait processor:
</p>
<table>
<tr>
<th>FlowFile UUID</th>
<th>Signal ID</th>
<th>signal_counter_name</th>
</tr>
<tr>
<td>a-2</td>
<td>A</td>
<td>counter_1</td>
</tr>
</table>
<p>
The FlowFile is transmitted to the 'success' relationship, since cache "A" has
signals in it and there are signals that belong to "counter_1". The outgoing FlowFile
will have the following attributes and their values appended to it:
</p>
<ul>
<li>wait.counter.counter_1 : 2</li>
<li>wait.counter.counter_2 : 1</li>
<li>wait.counter.total : 3</li>
</ul>
<p>
The key point here is that counters can be used to differentiate between signals within the
cache. If counters are used, a new attribute will be appended to the FlowFile
passing the Wait processor for each counter. If a large number of counters are used
within a cache, the FlowFile passing the Wait processor will have a large number of
attributes appended to it. To avoid that, it is recommended to use multiple caches with
a few counters in each, instead of one cache with many counters.
</p>
<p>
For example:
</p>
<ul>
<li>Cache identified by Release Signal ID "A" has counters: "counter_1" and "counter_2"</li>
<li>Cache identified by Release Signal ID "B" has counters: "counter_3" and "counter_4"</li>
<li>Cache identified by Release Signal ID "C" has counters: "counter_5" and "counter_6"</li>
</ul>
<p>
(Counter names do not need to be unique between caches, the counter name(s) used in cache "A"
could be reused in cache "B" and "C" as well.)
</p>
</body>
</html>

0 comments on commit d049bca

Please sign in to comment.