This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 4f48c99cad20d682ee5c5640b74fbe83cd449d66
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Fri Jun 3 21:01:49 2022 +0200

    Fix #2177: Make sure to properly reinitialize integration
    
    - Reinitialize integration when operator id annotation has changed
    - Remove IntegrationKit reference for integration that has a new operator 
id annotation
    - Add operator id annotation to relevant fields that determine the 
integration digest hash
    - Add common utility method to get the operator id annotation from a 
resource
    - Allow default operator (id="camel-k") to handle legacy resources that are 
missing proper operator id annotations
    - Allow operators that use a proper id to reconcile resources on any 
namespace (regardless of operator mode global vs. local and regardless of 
existing namespace lease)
---
 pkg/apis/camel/v1/common_types_support.go          | 15 ++++
 pkg/controller/build/build_controller.go           |  2 +-
 .../integration/integration_controller.go          | 14 ++--
 pkg/controller/integration/monitor.go              | 18 +++--
 .../integrationkit/integrationkit_controller.go    |  2 +-
 .../integrationplatform_controller.go              |  2 +-
 pkg/controller/kamelet/kamelet_controller.go       |  2 +-
 .../kameletbinding/kamelet_binding_controller.go   |  2 +-
 pkg/platform/operator.go                           | 93 ++++++++++++++++++----
 pkg/trait/platform.go                              |  4 +-
 pkg/util/digest/digest.go                          |  6 ++
 11 files changed, 125 insertions(+), 35 deletions(-)

diff --git a/pkg/apis/camel/v1/common_types_support.go 
b/pkg/apis/camel/v1/common_types_support.go
index aec43a183..bb93121b8 100644
--- a/pkg/apis/camel/v1/common_types_support.go
+++ b/pkg/apis/camel/v1/common_types_support.go
@@ -21,6 +21,8 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
 func (in *Artifact) String() string {
@@ -67,5 +69,18 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
        return nil
 }
 
+// GetOperatorIDAnnotation to safely get the operator id annotation value
+func GetOperatorIDAnnotation(obj metav1.Object) string {
+       if obj == nil || obj.GetAnnotations() == nil {
+               return ""
+       }
+
+       if operatorId, ok := obj.GetAnnotations()[OperatorIDAnnotation]; ok {
+               return operatorId
+       }
+
+       return ""
+}
+
 var _ json.Marshaler = (*RawMessage)(nil)
 var _ json.Unmarshaler = (*RawMessage)(nil)
diff --git a/pkg/controller/build/build_controller.go 
b/pkg/controller/build/build_controller.go
index 51269b2c8..117ccaa1e 100644
--- a/pkg/controller/build/build_controller.go
+++ b/pkg/controller/build/build_controller.go
@@ -136,7 +136,7 @@ func (r *reconcileBuild) Reconcile(ctx context.Context, 
request reconcile.Reques
        }
 
        // Only process resources assigned to the operator
-       if !platform.IsOperatorHandler(&instance) {
+       if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, 
request.Namespace, &instance) {
                rlog.Info("Ignoring request because resource is not assigned to 
current operator")
                return reconcile.Result{}, nil
        }
diff --git a/pkg/controller/integration/integration_controller.go 
b/pkg/controller/integration/integration_controller.go
index 11eb81641..c17a1babf 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -288,7 +288,7 @@ func (r *reconcileIntegration) Reconcile(ctx 
context.Context, request reconcile.
        }
 
        // Only process resources assigned to the operator
-       if !platform.IsOperatorHandler(&instance) {
+       if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, 
request.Namespace, &instance) {
                rlog.Info("Ignoring request because resource is not assigned to 
current operator")
                return reconcile.Result{}, nil
        }
@@ -317,9 +317,9 @@ func (r *reconcileIntegration) Reconcile(ctx 
context.Context, request reconcile.
                        }
 
                        if newTarget != nil {
-                               if res, err := r.update(ctx, &instance, 
newTarget); err != nil {
+                               if err := r.update(ctx, &instance, newTarget); 
err != nil {
                                        camelevent.NotifyIntegrationError(ctx, 
r.client, r.recorder, &instance, newTarget, err)
-                                       return res, err
+                                       return reconcile.Result{}, err
                                }
 
                                if newTarget.Status.Phase != 
instance.Status.Phase {
@@ -341,16 +341,14 @@ func (r *reconcileIntegration) Reconcile(ctx 
context.Context, request reconcile.
        return reconcile.Result{}, nil
 }
 
-func (r *reconcileIntegration) update(ctx context.Context, base 
*v1.Integration, target *v1.Integration) (reconcile.Result, error) {
+func (r *reconcileIntegration) update(ctx context.Context, base 
*v1.Integration, target *v1.Integration) error {
        d, err := digest.ComputeForIntegration(target)
        if err != nil {
-               return reconcile.Result{}, err
+               return err
        }
 
        target.Status.Digest = d
        target.Status.ObservedGeneration = base.Generation
 
-       err = r.client.Status().Patch(ctx, target, ctrl.MergeFrom(base))
-
-       return reconcile.Result{}, err
+       return r.client.Status().Patch(ctx, target, ctrl.MergeFrom(base))
 }
diff --git a/pkg/controller/integration/monitor.go 
b/pkg/controller/integration/monitor.go
index 3e68a3a24..412c07cef 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -69,6 +69,11 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
                return nil, fmt.Errorf("no kit set on integration %s", 
integration.Name)
        }
 
+       kit, err := kubernetes.GetIntegrationKit(ctx, action.client, 
integration.Status.IntegrationKit.Name, 
integration.Status.IntegrationKit.Namespace)
+       if err != nil {
+               return nil, fmt.Errorf("unable to find integration kit %s/%s: 
%w", integration.Status.IntegrationKit.Namespace, 
integration.Status.IntegrationKit.Name, err)
+       }
+
        // Check if the Integration requires a rebuild
        hash, err := digest.ComputeForIntegration(integration)
        if err != nil {
@@ -76,7 +81,13 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
        }
 
        if hash != integration.Status.Digest {
-               action.L.Info("Integration needs a rebuild")
+               action.L.Info("Monitor: Integration needs a rebuild")
+
+               if v1.GetOperatorIDAnnotation(integration) != "" &&
+                       (v1.GetOperatorIDAnnotation(integration) != 
v1.GetOperatorIDAnnotation(kit)) {
+                       // Operator to reconcile the integration has changed. 
Reset integration kit so new operator can handle the kit reference
+                       integration.SetIntegrationKit(nil)
+               }
 
                integration.Initialize()
                integration.Status.Digest = hash
@@ -84,11 +95,6 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
                return integration, nil
        }
 
-       kit, err := kubernetes.GetIntegrationKit(ctx, action.client, 
integration.Status.IntegrationKit.Name, 
integration.Status.IntegrationKit.Namespace)
-       if err != nil {
-               return nil, fmt.Errorf("unable to find integration kit %s/%s: 
%w", integration.Status.IntegrationKit.Namespace, 
integration.Status.IntegrationKit.Name, err)
-       }
-
        // Check if an IntegrationKit with higher priority is ready
        priority, ok := kit.Labels[v1.IntegrationKitPriorityLabel]
        if !ok {
diff --git a/pkg/controller/integrationkit/integrationkit_controller.go 
b/pkg/controller/integrationkit/integrationkit_controller.go
index bc4225cf1..c1f839636 100644
--- a/pkg/controller/integrationkit/integrationkit_controller.go
+++ b/pkg/controller/integrationkit/integrationkit_controller.go
@@ -219,7 +219,7 @@ func (r *reconcileIntegrationKit) Reconcile(ctx 
context.Context, request reconci
        }
 
        // Only process resources assigned to the operator
-       if !platform.IsOperatorHandler(&instance) {
+       if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, 
request.Namespace, &instance) {
                rlog.Info("Ignoring request because resource is not assigned to 
current operator")
                return reconcile.Result{}, nil
        }
diff --git 
a/pkg/controller/integrationplatform/integrationplatform_controller.go 
b/pkg/controller/integrationplatform/integrationplatform_controller.go
index 9768be424..44257e0a3 100644
--- a/pkg/controller/integrationplatform/integrationplatform_controller.go
+++ b/pkg/controller/integrationplatform/integrationplatform_controller.go
@@ -155,7 +155,7 @@ func (r *reconcileIntegrationPlatform) Reconcile(ctx 
context.Context, request re
        }
 
        // Only process resources assigned to the operator
-       if !platform.IsOperatorHandler(&instance) {
+       if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, 
request.Namespace, &instance) {
                rlog.Info("Ignoring request because resource is not assigned to 
current operator")
                return reconcile.Result{}, nil
        }
diff --git a/pkg/controller/kamelet/kamelet_controller.go 
b/pkg/controller/kamelet/kamelet_controller.go
index 19a1375c0..5abee9855 100644
--- a/pkg/controller/kamelet/kamelet_controller.go
+++ b/pkg/controller/kamelet/kamelet_controller.go
@@ -143,7 +143,7 @@ func (r *reconcileKamelet) Reconcile(ctx context.Context, 
request reconcile.Requ
        }
 
        // Only process resources assigned to the operator
-       if !platform.IsOperatorHandler(&instance) {
+       if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, 
request.Namespace, &instance) {
                rlog.Info("Ignoring request because resource is not assigned to 
current operator")
                return reconcile.Result{}, nil
        }
diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go 
b/pkg/controller/kameletbinding/kamelet_binding_controller.go
index 2ace3da23..8cd8bebb1 100644
--- a/pkg/controller/kameletbinding/kamelet_binding_controller.go
+++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go
@@ -160,7 +160,7 @@ func (r *ReconcileKameletBinding) Reconcile(ctx 
context.Context, request reconci
        }
 
        // Only process resources assigned to the operator
-       if !platform.IsOperatorHandler(&instance) {
+       if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, 
request.Namespace, &instance) {
                rlog.Info("Ignoring request because resource is not assigned to 
current operator")
                return reconcile.Result{}, nil
        }
diff --git a/pkg/platform/operator.go b/pkg/platform/operator.go
index 602a99105..95e3bad68 100644
--- a/pkg/platform/operator.go
+++ b/pkg/platform/operator.go
@@ -81,27 +81,41 @@ func IsNamespaceLocked(ctx context.Context, c ctrl.Reader, 
namespace string) (bo
                return false, nil
        }
 
-       var operatorLockName string
-       if defaults.OperatorID() != "" {
-               operatorLockName = fmt.Sprintf("%s-lock", defaults.OperatorID())
-       } else {
-               operatorLockName = OperatorLockName
+       platforms, err := ListPrimaryPlatforms(ctx, c, namespace)
+       if err != nil {
+               return true, err
        }
 
-       lease := coordination.Lease{}
-       if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: 
operatorLockName}, &lease); err != nil && k8serrors.IsNotFound(err) {
-               return false, nil
-       } else if err != nil {
-               return true, err
+       for _, platform := range platforms.Items {
+               lease := coordination.Lease{}
+
+               var operatorLockName string
+               if platform.Name != "" {
+                       operatorLockName = fmt.Sprintf("%s-lock", platform.Name)
+               } else {
+                       operatorLockName = OperatorLockName
+               }
+
+               if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: 
operatorLockName}, &lease); err == nil || !k8serrors.IsNotFound(err) {
+                       return true, err
+               }
        }
-       return true, nil
+
+       return false, nil
 }
 
 // IsOperatorAllowedOnNamespace returns true if the current operator is 
allowed to react on changes in the given namespace.
 func IsOperatorAllowedOnNamespace(ctx context.Context, c ctrl.Reader, 
namespace string) (bool, error) {
+       // allow all local operators
        if !IsCurrentOperatorGlobal() {
                return true, nil
        }
+
+       // allow global operators that use a proper operator id
+       if defaults.OperatorID() != "" {
+               return true, nil
+       }
+
        operatorNamespace := GetOperatorNamespace()
        if operatorNamespace == namespace {
                // Global operator is allowed on its own namespace
@@ -114,13 +128,64 @@ func IsOperatorAllowedOnNamespace(ctx context.Context, c 
ctrl.Reader, namespace
        return !alreadyOwned, nil
 }
 
+// IsOperatorHandler checks on resource operator id annotation and this 
operator instance id.
+// Operators matching the annotation operator id are allowed to reconcile.
+// For legacy resources that are missing a proper operator id annotation the 
default global operator or the local
+// operator in this namespace are candidates for reconciliation.
 func IsOperatorHandler(object ctrl.Object) bool {
        if object == nil {
                return true
        }
-       resourceID := object.GetAnnotations()[camelv1.OperatorIDAnnotation]
+       resourceID := camelv1.GetOperatorIDAnnotation(object)
        operatorID := defaults.OperatorID()
-       return resourceID == operatorID
+
+       // allow operator with matching id to handle the resource
+       if resourceID == operatorID {
+               return true
+       }
+
+       // check if we are dealing with resource that is missing a proper 
operator id annotation
+       if resourceID == "" {
+               // allow default global operator to handle legacy resources 
(missing proper operator id annotations)
+               if operatorID == DefaultPlatformName {
+                       return true
+               }
+
+               // allow local operators to handle legacy resources (missing 
proper operator id annotations)
+               if !IsCurrentOperatorGlobal() {
+                       return true
+               }
+       }
+
+       return false
+}
+
+// IsOperatorHandlerConsideringLock uses normal IsOperatorHandler checks and 
adds additional check for legacy resources
+// that are missing a proper operator id annotation. In general two kind of 
operators race for reconcile these legacy resources.
+// The local operator for this namespace and the default global operator 
instance. Based on the existence of a namespace
+// lock the current local operator has precedence. When no lock exists the 
default global operator should reconcile.
+func IsOperatorHandlerConsideringLock(ctx context.Context, c ctrl.Reader, 
namespace string, object ctrl.Object) bool {
+       isHandler := IsOperatorHandler(object)
+       if !isHandler {
+               return false
+       }
+
+       resourceID := camelv1.GetOperatorIDAnnotation(object)
+       // add additional check on resources missing an operator id
+       if resourceID == "" {
+               operatorNamespace := GetOperatorNamespace()
+               if operatorNamespace == namespace {
+                       // Global operator is allowed on its own namespace
+                       return true
+               }
+
+               if locked, err := IsNamespaceLocked(ctx, c, namespace); err != 
nil || locked {
+                       // namespace is locked so local operators do have 
precedence
+                       return !IsCurrentOperatorGlobal()
+               }
+       }
+
+       return true
 }
 
 // FilteringFuncs do preliminary checks to determine if certain events should 
be handled by the controller
@@ -165,7 +230,7 @@ func (f FilteringFuncs) Update(e event.UpdateEvent) bool {
                return false
        }
        if e.ObjectOld != nil && e.ObjectNew != nil &&
-               e.ObjectOld.GetAnnotations()[camelv1.OperatorIDAnnotation] != 
e.ObjectNew.GetAnnotations()[camelv1.OperatorIDAnnotation] {
+               camelv1.GetOperatorIDAnnotation(e.ObjectOld) != 
camelv1.GetOperatorIDAnnotation(e.ObjectNew) {
                // Always force reconciliation when the object becomes managed 
by the current operator
                return true
        }
diff --git a/pkg/trait/platform.go b/pkg/trait/platform.go
index 4cf7d5b65..263b76add 100644
--- a/pkg/trait/platform.go
+++ b/pkg/trait/platform.go
@@ -111,11 +111,11 @@ func (t *platformTrait) getOrCreatePlatform(e 
*Environment) (*v1.IntegrationPlat
                        }
                        
defaultPlatform.Labels["camel.apache.org/platform.generated"] = True
                        // Cascade the operator id in charge to reconcile the 
Integration
-                       if e.Integration.Annotations != nil && 
e.Integration.Annotations[v1.OperatorIDAnnotation] != "" {
+                       if v1.GetOperatorIDAnnotation(e.Integration) != "" {
                                if defaultPlatform.Annotations == nil {
                                        defaultPlatform.Annotations = 
make(map[string]string)
                                }
-                               
defaultPlatform.Annotations[v1.OperatorIDAnnotation] = 
e.Integration.Annotations[v1.OperatorIDAnnotation]
+                               
defaultPlatform.SetOperatorID(v1.GetOperatorIDAnnotation(e.Integration))
                        }
                        pl = &defaultPlatform
                        e.Resources.Add(pl)
diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go
index 1ddda2059..3653b6ed9 100644
--- a/pkg/util/digest/digest.go
+++ b/pkg/util/digest/digest.go
@@ -46,6 +46,12 @@ func ComputeForIntegration(integration *v1.Integration) 
(string, error) {
        if _, err := hash.Write([]byte(integration.Status.Version)); err != nil 
{
                return "", err
        }
+
+       // Integration operator id is relevant
+       if _, err := 
hash.Write([]byte(v1.GetOperatorIDAnnotation(integration))); err != nil {
+               return "", err
+       }
+
        // Integration Kit is relevant
        if integration.Spec.IntegrationKit != nil {
                if _, err := hash.Write([]byte(fmt.Sprintf("%s/%s", 
integration.Spec.IntegrationKit.Namespace, 
integration.Spec.IntegrationKit.Name))); err != nil {

Reply via email to