NIFI-4841 Fixing NPE when reverting local changes involving remote group ports. This...
[nifi.git] / nifi-nar-bundles / nifi-framework-bundle / nifi-framework / nifi-framework-core / src / main / java / org / apache / nifi / groups / StandardProcessGroup.java
index 15f2b5f..8b7dcd2 100644 (file)
@@ -110,6 +110,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FlowDifferenceFilters;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.util.SnippetUtils;
@@ -1649,6 +1650,18 @@ public final class StandardProcessGroup implements ProcessGroup {
             return funnel;
         }
 
+        for (final RemoteProcessGroup remoteProcessGroup : group.getRemoteProcessGroups()) {
+            final RemoteGroupPort remoteInputPort = remoteProcessGroup.getInputPort(identifier);
+            if (remoteInputPort != null) {
+                return remoteInputPort;
+            }
+
+            final RemoteGroupPort remoteOutputPort = remoteProcessGroup.getOutputPort(identifier);
+            if (remoteOutputPort != null) {
+                return remoteOutputPort;
+            }
+        }
+
         for (final ProcessGroup childGroup : group.getProcessGroups()) {
             final Connectable childGroupConnectable = findLocalConnectable(identifier, childGroup);
             if (childGroupConnectable != null) {
@@ -3257,6 +3270,11 @@ public final class StandardProcessGroup implements ProcessGroup {
                     continue;
                 }
 
+                // Ignore differences for adding a remote port
+                if (FlowDifferenceFilters.isAddedRemotePort(diff)) {
+                    continue;
+                }
+
                 // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
                 // and if so compare our VersionedControllerService to the existing service.
                 if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
@@ -3901,7 +3919,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 final String rpgId = connectableComponent.getGroupId();
                 final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
                     .filter(component -> component.getVersionedComponentId().isPresent())
-                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
                     .findAny();
 
                 if (!rpgOption.isPresent()) {
@@ -4197,8 +4215,9 @@ public final class StandardProcessGroup implements ProcessGroup {
         final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor());
         final FlowComparison comparison = flowComparator.compare();
         final Set<FlowDifference> differences = comparison.getDifferences().stream()
-            .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
-            .collect(Collectors.toCollection(HashSet::new));
+                .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
+                .filter(FlowDifferenceFilters.FILTER_ADDED_REMOTE_PORTS)
+                .collect(Collectors.toCollection(HashSet::new));
 
         LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
         return differences;