Skip to content

Commit

Permalink
NIFI-8261: When gathering the states of affected components, make sur…
Browse files Browse the repository at this point in the history
…e that we properly obtain the state of Input Ports and Output Ports

Signed-off-by: Pierre Villard <[email protected]>

This closes apache#4844.
  • Loading branch information
markap14 authored and pvillard31 committed Feb 25, 2021
1 parent 68d38dd commit 82eedea
Showing 1 changed file with 33 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
Expand Down Expand Up @@ -4769,12 +4770,22 @@ public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final Stri
state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name();
break;
case REMOTE_INPUT_PORT:
final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name();
final InstantiatedVersionedRemoteGroupPort remoteInputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
state = remoteProcessGroupDAO.getRemoteProcessGroup(remoteInputPort.getInstanceGroupId()).getInputPort(remoteInputPort.getInstanceId()).getScheduledState().name();
break;
case REMOTE_OUTPUT_PORT:
final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name();
final InstantiatedVersionedRemoteGroupPort remoteOutputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
state = remoteProcessGroupDAO.getRemoteProcessGroup(remoteOutputPort.getInstanceGroupId()).getOutputPort(remoteOutputPort.getInstanceId()).getScheduledState().name();
break;
case INPUT_PORT:
final InstantiatedVersionedPort versionedInputPort = (InstantiatedVersionedPort) localComponent;
final Port inputPort = getInputPort(versionedInputPort);
state = inputPort == null ? null : inputPort.getScheduledState().name();
break;
case OUTPUT_PORT:
final InstantiatedVersionedPort versionedOutputPort = (InstantiatedVersionedPort) localComponent;
final Port outputPort = getOutputPort(versionedOutputPort);
state = outputPort == null ? null : outputPort.getScheduledState().name();
break;
default:
state = null;
Expand Down Expand Up @@ -4908,6 +4919,24 @@ public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final Stri
return affectedComponents;
}

private Port getInputPort(final InstantiatedVersionedPort port) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(port.getInstanceGroupId());
if (processGroup == null) {
return null;
}

return processGroup.getInputPort(port.getInstanceId());
}

private Port getOutputPort(final InstantiatedVersionedPort port) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(port.getInstanceGroupId());
if (processGroup == null) {
return null;
}

return processGroup.getOutputPort(port.getInstanceId());
}

private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
for (final Connectable connectable : connectables) {
final Optional<String> versionedIdOption = connectable.getVersionedComponentId();
Expand Down

0 comments on commit 82eedea

Please sign in to comment.