markap14 commented on code in PR #11079:
URL: https://github.com/apache/nifi/pull/11079#discussion_r3023935120


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java:
##########
@@ -47,4 +49,14 @@ default ConnectorConfigurationProvider 
getConnectorConfigurationProvider() {
         return null;
     }
 
+    /**
+     * Returns the maximum time to wait for a connector in a transient state 
(STARTING, STOPPING, PURGING)
+     * to reach a stable state during flow synchronization.
+     *
+     * @return the sync timeout duration, defaults to 15 minutes
+     */
+    default Duration getConnectorSyncTimeout() {
+        return Duration.ofMinutes(15);

Review Comment:
   This feels like a rather excessive default, no?



##########
nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorSyncDirective.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.flow.ScheduledState;
+
+/**
+ * Directive returned by {@link 
ConnectorConfigurationProvider#verifySyncable(String, ScheduledState)}
+ * indicating how the connector repository should handle synchronization for a 
connector during
+ * flow inheritance.
+ */
+public class ConnectorSyncDirective {
+
+    /**
+     * The action the connector repository should take for this connector 
during flow sync.
+     */
+    public enum Action {
+        /**
+         * Proceed with synchronization. The directive may optionally include a
+         * {@link ScheduledState} override and/or a {@link 
ConnectorWorkingConfiguration}
+         * containing the provider's working config and name.
+         */
+        ALLOW,
+
+        /**
+         * Do not synchronize this connector. The connector should be created 
locally
+         * (if not already present) and marked invalid so that a background 
repair
+         * process can attempt synchronization later when conditions improve.
+         */
+        REJECT,
+
+        /**
+         * This connector should not exist on this node. If it exists locally, 
remove
+         * it from the repository. If it does not exist, do not create it. 
This is used
+         * when the external system indicates the connector is being deleted 
or has been deleted.
+         */
+        REMOVE
+    }
+
+    private static final ConnectorSyncDirective ALLOW_DEFAULT = new 
ConnectorSyncDirective(Action.ALLOW, null, null);
+    private static final ConnectorSyncDirective REJECT_DIRECTIVE = new 
ConnectorSyncDirective(Action.REJECT, null, null);
+    private static final ConnectorSyncDirective REMOVE_DIRECTIVE = new 
ConnectorSyncDirective(Action.REMOVE, null, null);
+
+    private final Action action;
+    private final ScheduledState scheduledStateOverride;
+    private final ConnectorWorkingConfiguration workingConfiguration;
+
+    private ConnectorSyncDirective(final Action action, final ScheduledState 
scheduledStateOverride,
+                                   final ConnectorWorkingConfiguration 
workingConfiguration) {
+        this.action = action;
+        this.scheduledStateOverride = scheduledStateOverride;
+        this.workingConfiguration = workingConfiguration;
+    }
+
+    /**
+     * Returns an ALLOW directive with no overrides. The connector repository 
will use the
+     * versioned flow's name, working config, and ScheduledState as-is. This 
is the default
+     * behavior when no {@link ConnectorConfigurationProvider} is configured 
(Apache NiFi).
+     */
+    public static ConnectorSyncDirective allow() {
+        return ALLOW_DEFAULT;
+    }
+
+    /**
+     * Returns an ALLOW directive with the provider's working configuration 
(name + working
+     * config steps) and no ScheduledState override.
+     *
+     * @param workingConfiguration the provider's working configuration 
including name
+     */
+    public static ConnectorSyncDirective allow(final 
ConnectorWorkingConfiguration workingConfiguration) {
+        return new ConnectorSyncDirective(Action.ALLOW, null, 
workingConfiguration);
+    }
+
+    /**
+     * Returns an ALLOW directive with the provider's working configuration 
and a
+     * ScheduledState override. The override replaces the versioned flow's 
ScheduledState,
+     * which may be stale due to in-flight DPS tasks.
+     *
+     * @param workingConfiguration the provider's working configuration 
including name
+     * @param scheduledStateOverride the ScheduledState to use instead of the 
versioned flow's value
+     */
+    public static ConnectorSyncDirective allow(final 
ConnectorWorkingConfiguration workingConfiguration,
+                                               final ScheduledState 
scheduledStateOverride) {
+        return new ConnectorSyncDirective(Action.ALLOW, 
scheduledStateOverride, workingConfiguration);
+    }
+
+    /**
+     * Returns a REJECT directive. The connector will be created locally (if 
not already
+     * present) and marked invalid for background repair.
+     */
+    public static ConnectorSyncDirective reject() {
+        return REJECT_DIRECTIVE;
+    }
+
+    /**
+     * Returns a REMOVE directive. The connector should not exist on this node.
+     */
+    public static ConnectorSyncDirective remove() {
+        return REMOVE_DIRECTIVE;
+    }
+
+    public Action getAction() {
+        return action;
+    }
+
+    /**
+     * Returns the ScheduledState override, or {@code null} if the versioned 
flow's
+     * ScheduledState should be used.
+     */
+    public ScheduledState getScheduledStateOverride() {
+        return scheduledStateOverride;
+    }
+
+    /**
+     * Returns the provider's working configuration (name + working config 
steps),
+     * or {@code null} if the versioned flow's values should be used.
+     */
+    public ConnectorWorkingConfiguration getWorkingConfiguration() {
+        return workingConfiguration;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new 
StringBuilder("ConnectorSyncDirective[action=").append(action);
+        if (scheduledStateOverride != null) {
+            sb.append(", 
scheduledStateOverride=").append(scheduledStateOverride);
+        }
+        if (workingConfiguration != null) {
+            sb.append(", hasWorkingConfig=true");
+        }
+        sb.append(']');
+        return sb.toString();

Review Comment:
   No need for a StringBuilder here



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -91,6 +107,288 @@ public void restoreConnector(final ConnectorNode 
connector) {
         logger.debug("Successfully restored {}", connector);
     }
 
+    @Override
+    public ConnectorSyncResult syncConnector(final VersionedConnector 
versionedConnector) {
+        final String connectorId = versionedConnector.getInstanceIdentifier();
+        final ScheduledState proposedScheduledState = 
versionedConnector.getScheduledState();
+        logger.debug("syncConnector called for connector [{}]", connectorId);
+
+        // Consult the provider for external state checks and working config
+        final ConnectorSyncDirective directive;
+        if (configurationProvider != null) {
+            try {
+                directive = configurationProvider.verifySyncable(connectorId, 
proposedScheduledState);
+            } catch (final Exception e) {
+                logger.error("Configuration provider threw exception during 
verifySyncable for connector [{}]: {}", connectorId, e.getMessage(), e);
+                final ConnectorNode existingNode = 
ensureConnectorNodeExists(versionedConnector);
+                existingNode.markInvalid("Flow Synchronization Failure",
+                        "Configuration provider error during synchronization: 
" + e.getMessage());
+                return ConnectorSyncResult.FAILED;
+            }
+        } else {
+            directive = ConnectorSyncDirective.allow();
+        }
+
+        logger.debug("Connector [{}] sync directive: {}", connectorId, 
directive);
+
+        // Handle REMOVE: connector should not exist on this node
+        if (directive.getAction() == ConnectorSyncDirective.Action.REMOVE) {
+            final ConnectorNode existingNode = connectors.remove(connectorId);

Review Comment:
   I don't think the removal logic here is sufficient. We need to ensure that 
the Connector gets stopped and then purge its data, cleanup Class Loaders, call 
`@OnRemoved` if necessary, etc.



##########
nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java:
##########
@@ -99,6 +101,35 @@ public interface ConnectorConfigurationProvider {
      */
     void verifyCreate(String connectorId);
 
+    /**
+     * Determines how the connector repository should handle synchronization 
for the given
+     * connector during flow inheritance (cluster join). The provider examines 
the external
+     * state of the connector and returns a {@link ConnectorSyncDirective} 
indicating whether
+     * to allow, reject, or remove the connector.
+     *
+     * <p>When the directive action is {@link 
ConnectorSyncDirective.Action#ALLOW}, the
+     * directive may optionally include:</p>
+     * <ul>
+     *   <li>A {@link ConnectorWorkingConfiguration} with the provider's 
working config and name
+     *       (overriding the potentially stale values from the versioned 
flow)</li>
+     *   <li>A {@link ScheduledState} override (correcting stale run intent 
from the versioned flow)</li>
+     * </ul>
+     *
+     * <p>This method combines the verify and load operations into a single 
call to avoid
+     * redundant round-trips to the external store.</p>
+     *
+     * <p>The default implementation returns {@link 
ConnectorSyncDirective#allow()} with no
+     * overrides, meaning the versioned flow's values are used for everything. 
This is the
+     * behavior for Apache NiFi when no provider is configured.</p>
+     *
+     * @param connectorId the identifier of the connector to check
+     * @param proposedScheduledState the ScheduledState from the versioned flow
+     * @return a directive indicating how to handle this connector during sync
+     */
+    default ConnectorSyncDirective verifySyncable(final String connectorId, 
final ScheduledState proposedScheduledState) {

Review Comment:
   Not a huge deal, but `verify*` methods in the framework generally are named 
`verifyCan*`. E.g., `verifyCanSync` - and also typically are `void` method that 
throw an `IllegalStateException` if not allowed. Perhaps consider another 
naming convention, such as `determineSyncStatus`, `determineSyncEligibility`, 
`getSyncDirective`?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -39,34 +45,44 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class StandardConnectorRepository implements ConnectorRepository {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardConnectorRepository.class);
+    private static final Duration DEFAULT_SYNC_TIMEOUT = 
Duration.ofMinutes(15);
+    private static final Duration SYNC_POLL_INTERVAL = Duration.ofSeconds(2);
 
     private final Map<String, ConnectorNode> connectors = new 
ConcurrentHashMap<>();
     private final FlowEngine lifecycleExecutor = new FlowEngine(8, "NiFi 
Connector Lifecycle");
 
+    private volatile FlowManager flowManager;
     private volatile ExtensionManager extensionManager;
     private volatile ConnectorRequestReplicator requestReplicator;
     private volatile SecretsManager secretsManager;
     private volatile AssetManager assetManager;
     private volatile ConnectorConfigurationProvider configurationProvider;
+    private volatile Duration syncTimeout = DEFAULT_SYNC_TIMEOUT;

Review Comment:
   Seems unnecessary to initialize this to a default value (and to even define 
the default value) if the value will always be set in the initialization, no?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorSyncResult.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+/**
+ * Result of a connector synchronization attempt during flow inheritance.
+ */
+public enum ConnectorSyncResult {
+
+    /**
+     * Configuration was applied and the connector's run state was updated
+     * to match the proposed ScheduledState.
+     */
+    SYNCED,
+
+    /**
+     * Configuration was already up to date; the connector's run state was
+     * updated if it differed from the proposed ScheduledState.
+     */
+    SYNCED_NO_CHANGES,

Review Comment:
   Seems weird to refer to this as `SYNCED_NO_CHANGES` if the Connector's Run 
State is updated. Perhaps `SYNCED_CONFIG_UNCHANGED` makes more sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to