Skip to content

Commit

Permalink
Revert "NIFI-5448 Added failure relationship to UpdateAttributes to h…
Browse files Browse the repository at this point in the history
…andle bad expression language logic."

This reverts commit 32ee552.
  • Loading branch information
joewitt committed Oct 12, 2018
1 parent 6b77e7d commit e25b26e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
Expand Down Expand Up @@ -108,21 +107,17 @@ protected boolean removeEldestEntry(final Map.Entry eldest) {
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All successful FlowFiles are routed to this relationship").name("success").build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("All flowfiles that cannot be updated are routed to this relationship").name("failure").autoTerminateDefault(true).build();
public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder()
.description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build();

static {
Set<Relationship> tempStatelessSet = new HashSet<>();
tempStatelessSet.add(REL_SUCCESS);
tempStatelessSet.add(REL_FAILURE);

statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet);

Set<Relationship> tempStatefulSet = new HashSet<>();
tempStatefulSet.add(REL_SUCCESS);
tempStatefulSet.add(REL_FAILURE);
tempStatefulSet.add(REL_FAILED_SET_STATE);

statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet);
Expand Down Expand Up @@ -157,21 +152,6 @@ public ValidationResult validate(String subject, String input, ValidationContext
}
};

public static final AllowableValue FAIL_STOP = new AllowableValue("stop", "Penalize", "Penalize FlowFiles." +
"This is based on the original behavior of the processor to allow for a smooth transition.");
public static final AllowableValue FAIL_ROUTE = new AllowableValue("route", "Route to Failure Relationship",
"If chosen, failed FlowFiles will be routed to the failure relationship.");
public static final PropertyDescriptor FAILURE_BEHAVIOR = new PropertyDescriptor.Builder()
.name("update-attribute-failure-behavior")
.displayName("Failure Behavior")
.description("Control how to handle errors in Expression Language evaluation. The default behavior is to stop evaluation. It can be " +
"changed by the user to route to a failure relationship instead.")
.allowableValues(FAIL_STOP, FAIL_ROUTE)
.defaultValue(FAIL_STOP.getValue())
.required(true)
.build();


// static properties
public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression";
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -225,7 +205,6 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
descriptors.add(DELETE_ATTRIBUTES);
descriptors.add(STORE_STATE);
descriptors.add(STATEFUL_VARIABLES_INIT_VALUE);
descriptors.add(FAILURE_BEHAVIOR);
return Collections.unmodifiableList(descriptors);
}

Expand Down Expand Up @@ -473,51 +452,39 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
Map<String, Action> defaultActions = this.defaultActions;
List<FlowFile> flowFilesToTransfer = new LinkedList<>();

boolean routeToFailure = context.getProperty(FAILURE_BEHAVIOR).getValue().equals(FAIL_ROUTE.getValue());
try {
// if there is update criteria specified, evaluate it
if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
// apply the actions for each rule and transfer the flowfile
for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
FlowFile match = entry.getKey();
final List<Rule> rules = entry.getValue();
boolean updateWorking = incomingFlowFile.equals(match);

// execute each matching rule(s)
match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes);

if (updateWorking) {
incomingFlowFile = match;
}
// if there is update criteria specified, evaluate it
if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
// apply the actions for each rule and transfer the flowfile
for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
FlowFile match = entry.getKey();
final List<Rule> rules = entry.getValue();
boolean updateWorking = incomingFlowFile.equals(match);

if (debugEnabled) {
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
}
// execute each matching rule(s)
match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes);

// add the match to the list to transfer
flowFilesToTransfer.add(match);
if (updateWorking) {
incomingFlowFile = match;
}
} else {
// Either we're running without any rules or the FlowFile didn't match any
incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes);

if (debugEnabled) {
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
}

// add the flowfile to the list to transfer
flowFilesToTransfer.add(incomingFlowFile);
// add the match to the list to transfer
flowFilesToTransfer.add(match);
}
} catch (ProcessException pe) {
if (routeToFailure) {
session.transfer(incomingFlowFile, REL_FAILURE);
getLogger().error("Failed to update flowfile attribute(s).", pe);
return;
} else {
throw pe;
} else {
// Either we're running without any rules or the FlowFile didn't match any
incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes);

if (debugEnabled) {
logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
}
}

// add the flowfile to the list to transfer
flowFilesToTransfer.add(incomingFlowFile);
}

if (stateInitialAttributes != null) {
try {
Expand Down Expand Up @@ -779,8 +746,7 @@ private Map<String, Action> getDefaultActions(final Map<PropertyDescriptor, Stri
final Map<String, Action> defaultActions = new HashMap<>();

for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE
&& entry.getKey() != FAILURE_BEHAVIOR) {
if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) {
final Action action = new Action();
action.setAttribute(entry.getKey().getName());
action.setValue(entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,33 +1005,4 @@ public void testDataIsTooShort() {
}
}

@Test
public void testInvalidExpressionLanguage() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setVariable("test", "Squirrel!!1!");
runner.setProperty("bad_attr", "${test:toDate('yyyy-MM-dd')}");
runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_ROUTE);
runner.assertValid();

runner.enqueue("Test");
runner.run();

runner.assertTransferCount(UpdateAttribute.REL_SUCCESS, 0);
runner.assertTransferCount(UpdateAttribute.REL_FAILURE, 1);

runner.clearTransferState();

Throwable ex = null;
try {
runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_STOP);
runner.enqueue("Test");
runner.run();
} catch (Throwable t) {
ex = t;
} finally {
Assert.assertNotNull(ex);
Assert.assertTrue(ex.getCause() instanceof ProcessException);
runner.assertQueueNotEmpty();
}
}
}

0 comments on commit e25b26e

Please sign in to comment.