Skip to content

Commit

Permalink
NIFI-7959 Handling node disconnection in MonitorActivity processor
Browse files Browse the repository at this point in the history
- Make reporting in clustered scope to dependent of expected cluster state in order to prevent unexpected flow file emission

This closes apache#4642.

Signed-off-by: Mark Payne <[email protected]>
  • Loading branch information
simonbence authored and markap14 committed Nov 4, 2020
1 parent 0805670 commit 59e00c4
Show file tree
Hide file tree
Showing 20 changed files with 421 additions and 57 deletions.
30 changes: 30 additions & 0 deletions nifi-api/src/main/java/org/apache/nifi/context/ClusterContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.context;

/**
* A context for retrieving information about the state of the cluster.
*/
public interface ClusterContext {

/**
* Retrieves the current state of the cluster connection of this node.
*
* @return True if this node is connected to the cluster. False otherwise.
*/
boolean isConnectedToCluster();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,26 @@
public interface NodeTypeProvider {

/**
* @return true if this instance is clustered, false otherwise.
* @return true if this instance is clustered, false otherwise.MockProcessContext
* Clustered means that a node is either connected or trying to connect to the cluster.
*/
boolean isClustered();

/**
* @return true if the expected state of clustering is true, false otherwise. Contrary to {{@link #isClustered()}}
* this does not dynamically change with the state of this node.
*/
default boolean isConfiguredForClustering() {
return false;
}

/**
* @return true if this instances is clustered and connected to the cluster.
*/
default boolean isConnected() {
return false;
}

/**
* @return true if this instance is the primary node in the cluster; false otherwise
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.context.ClusterContext;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.scheduling.ExecutionNode;
Expand All @@ -35,7 +36,7 @@
* thread-safe.
* </p>
*/
public interface ProcessContext extends PropertyContext {
public interface ProcessContext extends PropertyContext, ClusterContext {

/**
* Retrieves the current value set for the given descriptor, if a value is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
private volatile Set<Relationship> unavailableRelationships = new HashSet<>();

private volatile boolean isClustered;
private volatile boolean isConfiguredForClustering;
private volatile boolean isPrimaryNode;
private volatile boolean isConnected = true;

public MockProcessContext(final ConfigurableComponent component) {
this(component, null);
Expand Down Expand Up @@ -524,6 +526,11 @@ public boolean isClustered() {
return isClustered;
}

@Override
public boolean isConfiguredForClustering() {
return isConfiguredForClustering;
}

@Override
public boolean isPrimary() {
return isPrimaryNode;
Expand All @@ -533,9 +540,13 @@ public void setClustered(boolean clustered) {
isClustered = clustered;
}

public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
this.isConfiguredForClustering = isConfiguredForClustering;
}

public void setPrimaryNode(boolean primaryNode) {
if (!isClustered && primaryNode) {
throw new IllegalArgumentException("Primary node is only available in cluster. Use setClustered(true) first.");
if (!isConfiguredForClustering && primaryNode) {
throw new IllegalArgumentException("Primary node is only available in cluster. Use setIsConfiguredForClustering(true) first.");
}
isPrimaryNode = primaryNode;
}
Expand All @@ -544,4 +555,13 @@ public void setPrimaryNode(boolean primaryNode) {
public InputRequirement getInputRequirement() {
return inputRequirement;
}

public void setConnected(boolean connected) {
isConnected = connected;
}

@Override
public boolean isConnectedToCluster() {
return isConnected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -928,11 +928,21 @@ public void setClustered(boolean clustered) {
context.setClustered(clustered);
}

@Override
public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
context.setIsConfiguredForClustering(isConfiguredForClustering);
}

@Override
public void setPrimaryNode(boolean primaryNode) {
context.setPrimaryNode(primaryNode);
}

@Override
public void setConnected(final boolean isConnected) {
context.setConnected(isConnected);
}

@Override
public String getVariableValue(final String name) {
Objects.requireNonNull(name);
Expand Down
11 changes: 11 additions & 0 deletions nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -933,11 +933,22 @@ public interface TestRunner {
*/
void setClustered(boolean clustered);

/**
* @param isConfiguredForClustering Specify if this test emulates running in an environment where the expected
* cluster state equals with the argument.
*/
void setIsConfiguredForClustering(boolean isConfiguredForClustering);

/**
* @param primaryNode Specify if this test emulates running as a primary node
*/
void setPrimaryNode(boolean primaryNode);

/**
* @param isConnected Specify if this test emulates ongoing cluster connection
*/
void setConnected(boolean isConnected);

/**
* Sets the value of the variable with the given name to be the given value. This exposes the variable
* for use by the Expression Language.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,8 @@ private FlowController(
controllerServiceProvider = new StandardControllerServiceProvider(this, processScheduler, bulletinRepository);

eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider,
eventDrivenWorkerQueue, repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager);
eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue,
repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager, this);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);

final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
Expand Down Expand Up @@ -2266,6 +2266,7 @@ public Optional<String> getCurrentNode() {
}
}

@Override
public boolean isConfiguredForClustering() {
return configuredForClustering;
}
Expand Down Expand Up @@ -2815,6 +2816,7 @@ public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}

@Override
public boolean isConnected() {
rwLock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void reload(final ProcessorNode existingNode, final String newType, final
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
flowController.getEncryptor(), stateManager, () -> false);
flowController.getEncryptor(), stateManager, () -> false, flowController);

ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,9 @@ public StateManager getStateManager() {
public String getName() {
return connectable.getName();
}

@Override
public boolean isConnectedToCluster() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.EventBasedWorker;
import org.apache.nifi.controller.EventDrivenWorkerQueue;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final StringEncryptor encryptor;
private final ExtensionManager extensionManager;
private final NodeTypeProvider nodeTypeProvider;

private volatile String adminYieldDuration = "1 sec";

Expand All @@ -75,7 +77,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {

public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
final EventDrivenWorkerQueue workerQueue, final RepositoryContextFactory contextFactory, final int maxThreadCount,
final StringEncryptor encryptor, final ExtensionManager extensionManager) {
final StringEncryptor encryptor, final ExtensionManager extensionManager, final NodeTypeProvider nodeTypeProvider) {
super(flowEngine);
this.serviceProvider = serviceProvider;
this.stateManagerProvider = stateManagerProvider;
Expand All @@ -84,6 +86,7 @@ public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerS
this.maxThreadCount = new AtomicInteger(maxThreadCount);
this.encryptor = encryptor;
this.extensionManager = extensionManager;
this.nodeTypeProvider = nodeTypeProvider;

for (int i = 0; i < maxThreadCount; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
Expand Down Expand Up @@ -205,7 +208,8 @@ public void run() {
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StateManager stateManager = new TaskTerminationAwareStateManager(getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, stateManager, scheduleState::isTerminated);
final StandardProcessContext standardProcessContext = new StandardProcessContext(
procNode, serviceProvider, encryptor, stateManager, scheduleState::isTerminated, nodeTypeProvider);

final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode p
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);

final Supplier<ProcessContext> processContextFactory = () -> new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);

final CompletableFuture<Void> future = new CompletableFuture<>();
final SchedulingAgentCallback callback = new SchedulingAgentCallback() {
Expand Down Expand Up @@ -344,7 +344,7 @@ public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode pr
final LifecycleState lifecycleState = getLifecycleState(procNode, false);

StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated, flowController);

LOG.info("Stopping {}", procNode);
return procNode.stop(this, this.componentLifeCycleThreadPool, processContext, getSchedulingAgent(procNode), lifecycleState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable

final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
processContext = new StandardProcessContext((ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated);
processContext = new StandardProcessContext(
(ProcessorNode) connectable, flowController.getControllerServiceProvider(), encryptor, stateManager, scheduleState::isTerminated, flowController);
} else {
processContext = new ConnectableProcessContext(connectable, encryptor, stateManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ private StateManager getStateManager(final String componentId) {
private void shutdown(final ProcessGroup procGroup) {
for (final ProcessorNode node : procGroup.getProcessors()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), node.getProcessor().getClass(), node.getIdentifier())) {
final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false);
final StandardProcessContext processContext = new StandardProcessContext(
node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false, flowController);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
}
}
Expand Down Expand Up @@ -993,7 +994,8 @@ public void removeProcessor(final ProcessorNode processor) {
}

try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), processor.getProcessor().getClass(), processor.getIdentifier())) {
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false);
final StandardProcessContext processContext = new StandardProcessContext(
processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false, flowController);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of processor with id " + processor.getIdentifier(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.lifecycle.TaskTermination;
import org.apache.nifi.controller.service.ControllerServiceProvider;
Expand All @@ -51,15 +52,17 @@ public class StandardProcessContext implements ProcessContext, ControllerService
private final StringEncryptor encryptor;
private final StateManager stateManager;
private final TaskTermination taskTermination;
private final NodeTypeProvider nodeTypeProvider;
private final Map<PropertyDescriptor, String> properties;

public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager,
final TaskTermination taskTermination) {
public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor,
final StateManager stateManager, final TaskTermination taskTermination, final NodeTypeProvider nodeTypeProvider) {
this.procNode = processorNode;
this.controllerServiceProvider = controllerServiceProvider;
this.encryptor = encryptor;
this.stateManager = stateManager;
this.taskTermination = taskTermination;
this.nodeTypeProvider = nodeTypeProvider;

properties = Collections.unmodifiableMap(processorNode.getEffectivePropertyValues());

Expand Down Expand Up @@ -290,4 +293,8 @@ public String getName() {
return procNode.getName();
}

@Override
public boolean isConnectedToCluster() {
return nodeTypeProvider.isConnected();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testStart() throws InterruptedException {
new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger());
final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true);

final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false);
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false, null);
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
@Override
public void onTaskComplete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void testSensitivePropertyReferenceParameterSupportsEL() {
usernamePassword.setProperties(properties);

final ProcessContext processContext = new StandardProcessContext(usernamePassword, getFlowController().getControllerServiceProvider(), getFlowController().getEncryptor(),
getFlowController().getStateManagerProvider().getStateManager(usernamePassword.getIdentifier()), () -> false);
getFlowController().getStateManagerProvider().getStateManager(usernamePassword.getIdentifier()), () -> false, getFlowController());
final PropertyDescriptor descriptor = usernamePassword.getPropertyDescriptor("password");
final PropertyValue propertyValue = processContext.getProperty(descriptor);
final PropertyValue evaluatedPropertyValue = propertyValue.evaluateAttributeExpressions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,9 @@ public StateManager getStateManager() {
public String getName() {
return null;
}

@Override
public boolean isConnectedToCluster() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,9 @@ public File getKerberosServiceKeytab() {
public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
}

@Override
public boolean isConnectedToCluster() {
return false;
}
}
Loading

0 comments on commit 59e00c4

Please sign in to comment.