kevdoran commented on code in PR #11079:
URL: https://github.com/apache/nifi/pull/11079#discussion_r3025758999
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java:
##########
@@ -1042,36 +1042,54 @@ private Set<String> getAssetIds(final Parameter
parameter) {
}
private void inheritConnectors(final FlowController flowController, final
VersionedDataflow dataflow) {
- // TODO: We need to delete any Connectors that are no longer part of
the flow.
- // This means we need to drain the Connector first, then stop
it, then delete it. If unable to drain, we must fail...
- // perhaps we need a DRAINING state? Or do we just delete
it and drop the data?
final ConnectorRepository connectorRepository =
flowController.getConnectorRepository();
final Set<String> proposedConnectorIds = new HashSet<>();
if (dataflow.getConnectors() != null) {
for (final VersionedConnector versionedConnector :
dataflow.getConnectors()) {
proposedConnectorIds.add(versionedConnector.getInstanceIdentifier());
- final ConnectorNode existingConnector =
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
- if (existingConnector == null) {
- logger.info("Connector {} of type {} with name {} is not
in the current flow. Will add Connector.",
- versionedConnector.getInstanceIdentifier(),
versionedConnector.getType(), versionedConnector.getName());
+ // Ensure the connector exists locally before any provider
interaction so that
+ // if verifySyncable or any subsequent step fails, we have a
local node to mark invalid.
+ ConnectorNode connectorNode =
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
+ final boolean isNewConnector = connectorNode == null;
+ if (isNewConnector) {
+ final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, versionedConnector.getBundle(),
versionedConnector.getType());
+ connectorNode =
flowController.getFlowManager().createConnector(
+ versionedConnector.getType(),
versionedConnector.getInstanceIdentifier(), coordinate, false, true);
+ connectorRepository.restoreConnector(connectorNode);
+ }
- addConnector(versionedConnector, connectorRepository,
flowController.getFlowManager());
- } else if (isConnectorConfigurationUpdated(existingConnector,
versionedConnector)) {
- logger.info("{} configuration has changed, updating
configuration", existingConnector);
- updateConnector(versionedConnector, connectorRepository);
- } else {
- logger.debug("{} configuration is up to date, no update
necessary", existingConnector);
+ try {
+
connectorRepository.verifySyncable(versionedConnector.getInstanceIdentifier());
Review Comment:
OBE
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]