From 82eedea659a08581f21d31c6dc4516d615b25e37 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 25 Feb 2021 11:26:24 -0500 Subject: [PATCH] NIFI-8261: When gathering the states of affected components, make sure that we properly obtain the state of Input Ports and Output Ports Signed-off-by: Pierre Villard This closes #4844. --- .../nifi/web/StandardNiFiServiceFacade.java | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 828c800bcab3..e7e13f7e1ca4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -142,6 +142,7 @@ import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort; @@ -4769,12 +4770,22 @@ public Set getComponentsAffectedByFlowUpdate(final Stri state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name(); break; case REMOTE_INPUT_PORT: - final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; - state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name(); + final InstantiatedVersionedRemoteGroupPort remoteInputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; + state = remoteProcessGroupDAO.getRemoteProcessGroup(remoteInputPort.getInstanceGroupId()).getInputPort(remoteInputPort.getInstanceId()).getScheduledState().name(); break; case REMOTE_OUTPUT_PORT: - final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; - state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name(); + final InstantiatedVersionedRemoteGroupPort remoteOutputPort = (InstantiatedVersionedRemoteGroupPort) localComponent; + state = remoteProcessGroupDAO.getRemoteProcessGroup(remoteOutputPort.getInstanceGroupId()).getOutputPort(remoteOutputPort.getInstanceId()).getScheduledState().name(); + break; + case INPUT_PORT: + final InstantiatedVersionedPort versionedInputPort = (InstantiatedVersionedPort) localComponent; + final Port inputPort = getInputPort(versionedInputPort); + state = inputPort == null ? null : inputPort.getScheduledState().name(); + break; + case OUTPUT_PORT: + final InstantiatedVersionedPort versionedOutputPort = (InstantiatedVersionedPort) localComponent; + final Port outputPort = getOutputPort(versionedOutputPort); + state = outputPort == null ? null : outputPort.getScheduledState().name(); break; default: state = null; @@ -4908,6 +4919,24 @@ public Set getComponentsAffectedByFlowUpdate(final Stri return affectedComponents; } + private Port getInputPort(final InstantiatedVersionedPort port) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(port.getInstanceGroupId()); + if (processGroup == null) { + return null; + } + + return processGroup.getInputPort(port.getInstanceId()); + } + + private Port getOutputPort(final InstantiatedVersionedPort port) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(port.getInstanceGroupId()); + if (processGroup == null) { + return null; + } + + return processGroup.getOutputPort(port.getInstanceId()); + } + private void mapToConnectableId(final Collection connectables, final Map> destination) { for (final Connectable connectable : connectables) { final Optional versionedIdOption = connectable.getVersionedComponentId();