pvillard31 commented on code in PR #11079:
URL: https://github.com/apache/nifi/pull/11079#discussion_r3012234375
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -101,6 +129,7 @@ public void removeConnector(final String connectorId) {
connectorNode.verifyCanDelete();
if (configurationProvider != null) {
+ configurationProvider.verifyDelete(connectorId);
Review Comment:
`verifyDelete` is already called by `StandardConnectorDAO.verifyDelete()`
before this method is invoked, is it necessary to have it here as well?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -451,29 +480,35 @@ private void syncFromProvider(final ConnectorNode
connector) {
}
final String connectorId = connector.getIdentifier();
+
+ final Optional<ConnectorMetadata> metadata =
configurationProvider.loadMetadata(connectorId);
+ metadata.ifPresent(m -> {
+ if (m.getName() != null) {
+ connector.setName(m.getName());
+ }
+ });
Review Comment:
Metadata and configuration are now loaded in two separate calls. The
previous single `load()` was atomic. If the external store changes between
these two calls, the connector could get metadata from one version and
configuration from another. Is that acceptable?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java:
##########
@@ -1042,36 +1042,54 @@ private Set<String> getAssetIds(final Parameter
parameter) {
}
private void inheritConnectors(final FlowController flowController, final
VersionedDataflow dataflow) {
- // TODO: We need to delete any Connectors that are no longer part of
the flow.
- // This means we need to drain the Connector first, then stop
it, then delete it. If unable to drain, we must fail...
- // perhaps we need a DRAINING state? Or do we just delete
it and drop the data?
final ConnectorRepository connectorRepository =
flowController.getConnectorRepository();
final Set<String> proposedConnectorIds = new HashSet<>();
if (dataflow.getConnectors() != null) {
for (final VersionedConnector versionedConnector :
dataflow.getConnectors()) {
proposedConnectorIds.add(versionedConnector.getInstanceIdentifier());
- final ConnectorNode existingConnector =
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
- if (existingConnector == null) {
- logger.info("Connector {} of type {} with name {} is not
in the current flow. Will add Connector.",
- versionedConnector.getInstanceIdentifier(),
versionedConnector.getType(), versionedConnector.getName());
+ // Ensure the connector exists locally before any provider
interaction so that
+ // if verifySyncable or any subsequent step fails, we have a
local node to mark invalid.
+ ConnectorNode connectorNode =
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
+ final boolean isNewConnector = connectorNode == null;
+ if (isNewConnector) {
+ final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, versionedConnector.getBundle(),
versionedConnector.getType());
+ connectorNode =
flowController.getFlowManager().createConnector(
+ versionedConnector.getType(),
versionedConnector.getInstanceIdentifier(), coordinate, false, true);
+ connectorRepository.restoreConnector(connectorNode);
+ }
- addConnector(versionedConnector, connectorRepository,
flowController.getFlowManager());
- } else if (isConnectorConfigurationUpdated(existingConnector,
versionedConnector)) {
- logger.info("{} configuration has changed, updating
configuration", existingConnector);
- updateConnector(versionedConnector, connectorRepository);
- } else {
- logger.debug("{} configuration is up to date, no update
necessary", existingConnector);
+ try {
+
connectorRepository.verifySyncable(versionedConnector.getInstanceIdentifier());
Review Comment:
Shouldn't this be even before the connectorRepository.getConnector() a few
lines above?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java:
##########
@@ -1162,18 +1180,15 @@ private boolean equals(final
VersionedConnectorValueReference versionedReference
};
}
- private void addConnector(final VersionedConnector versionedConnector,
final ConnectorRepository connectorRepository, final FlowManager flowManager) {
- final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, versionedConnector.getBundle(),
versionedConnector.getType());
- final ConnectorNode connectorNode =
flowManager.createConnector(versionedConnector.getType(),
versionedConnector.getInstanceIdentifier(), coordinate, false, true);
- connectorRepository.restoreConnector(connectorNode);
- updateConnector(versionedConnector, connectorRepository);
- }
-
-
- private void updateConnector(final VersionedConnector versionedConnector,
final ConnectorRepository connectorRepository) {
- final ConnectorNode connectorNode =
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
-
- connectorRepository.updateConnector(connectorNode,
versionedConnector.getName());
+ private void syncConnector(final VersionedConnector versionedConnector,
final ConnectorNode connectorNode, final ConnectorRepository
connectorRepository) {
+ try {
+ connectorRepository.updateConnector(connectorNode,
versionedConnector.getName());
Review Comment:
Should we call verifyUpdate or is verifySyncable (called before) enough?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]