This is an automated email from the ASF dual-hosted git repository. astefanutti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 4f48c99cad20d682ee5c5640b74fbe83cd449d66 Author: Christoph Deppisch <cdeppi...@redhat.com> AuthorDate: Fri Jun 3 21:01:49 2022 +0200 Fix #2177: Make sure to properly reinitialize integration - Reinitialize integration when operator id annotation has changed - Remove IntegrationKit reference for integration that has a new operator id annotation - Add operator id annotation to relevant fields that determine the integration digest hash - Add common utility method to get the operator id annotation from a resource - Allow default operator (id="camel-k") to handle legacy resources that are missing proper operator id annotations - Allow operators that use a proper id to reconcile resources on any namespace (regardless of operator mode global vs. local and regardless of existing namespace lease) --- pkg/apis/camel/v1/common_types_support.go | 15 ++++ pkg/controller/build/build_controller.go | 2 +- .../integration/integration_controller.go | 14 ++-- pkg/controller/integration/monitor.go | 18 +++-- .../integrationkit/integrationkit_controller.go | 2 +- .../integrationplatform_controller.go | 2 +- pkg/controller/kamelet/kamelet_controller.go | 2 +- .../kameletbinding/kamelet_binding_controller.go | 2 +- pkg/platform/operator.go | 93 ++++++++++++++++++---- pkg/trait/platform.go | 4 +- pkg/util/digest/digest.go | 6 ++ 11 files changed, 125 insertions(+), 35 deletions(-) diff --git a/pkg/apis/camel/v1/common_types_support.go b/pkg/apis/camel/v1/common_types_support.go index aec43a183..bb93121b8 100644 --- a/pkg/apis/camel/v1/common_types_support.go +++ b/pkg/apis/camel/v1/common_types_support.go @@ -21,6 +21,8 @@ import ( "encoding/json" "errors" "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func (in *Artifact) String() string { @@ -67,5 +69,18 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error { return nil } +// GetOperatorIDAnnotation to safely get the operator id annotation value +func GetOperatorIDAnnotation(obj metav1.Object) string { + if obj == nil || obj.GetAnnotations() == nil { + return "" + } + + if operatorId, ok := obj.GetAnnotations()[OperatorIDAnnotation]; ok { + return operatorId + } + + return "" +} + var _ json.Marshaler = (*RawMessage)(nil) var _ json.Unmarshaler = (*RawMessage)(nil) diff --git a/pkg/controller/build/build_controller.go b/pkg/controller/build/build_controller.go index 51269b2c8..117ccaa1e 100644 --- a/pkg/controller/build/build_controller.go +++ b/pkg/controller/build/build_controller.go @@ -136,7 +136,7 @@ func (r *reconcileBuild) Reconcile(ctx context.Context, request reconcile.Reques } // Only process resources assigned to the operator - if !platform.IsOperatorHandler(&instance) { + if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, request.Namespace, &instance) { rlog.Info("Ignoring request because resource is not assigned to current operator") return reconcile.Result{}, nil } diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 11eb81641..c17a1babf 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -288,7 +288,7 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile. } // Only process resources assigned to the operator - if !platform.IsOperatorHandler(&instance) { + if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, request.Namespace, &instance) { rlog.Info("Ignoring request because resource is not assigned to current operator") return reconcile.Result{}, nil } @@ -317,9 +317,9 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile. } if newTarget != nil { - if res, err := r.update(ctx, &instance, newTarget); err != nil { + if err := r.update(ctx, &instance, newTarget); err != nil { camelevent.NotifyIntegrationError(ctx, r.client, r.recorder, &instance, newTarget, err) - return res, err + return reconcile.Result{}, err } if newTarget.Status.Phase != instance.Status.Phase { @@ -341,16 +341,14 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile. return reconcile.Result{}, nil } -func (r *reconcileIntegration) update(ctx context.Context, base *v1.Integration, target *v1.Integration) (reconcile.Result, error) { +func (r *reconcileIntegration) update(ctx context.Context, base *v1.Integration, target *v1.Integration) error { d, err := digest.ComputeForIntegration(target) if err != nil { - return reconcile.Result{}, err + return err } target.Status.Digest = d target.Status.ObservedGeneration = base.Generation - err = r.client.Status().Patch(ctx, target, ctrl.MergeFrom(base)) - - return reconcile.Result{}, err + return r.client.Status().Patch(ctx, target, ctrl.MergeFrom(base)) } diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 3e68a3a24..412c07cef 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -69,6 +69,11 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return nil, fmt.Errorf("no kit set on integration %s", integration.Name) } + kit, err := kubernetes.GetIntegrationKit(ctx, action.client, integration.Status.IntegrationKit.Name, integration.Status.IntegrationKit.Namespace) + if err != nil { + return nil, fmt.Errorf("unable to find integration kit %s/%s: %w", integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err) + } + // Check if the Integration requires a rebuild hash, err := digest.ComputeForIntegration(integration) if err != nil { @@ -76,7 +81,13 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra } if hash != integration.Status.Digest { - action.L.Info("Integration needs a rebuild") + action.L.Info("Monitor: Integration needs a rebuild") + + if v1.GetOperatorIDAnnotation(integration) != "" && + (v1.GetOperatorIDAnnotation(integration) != v1.GetOperatorIDAnnotation(kit)) { + // Operator to reconcile the integration has changed. Reset integration kit so new operator can handle the kit reference + integration.SetIntegrationKit(nil) + } integration.Initialize() integration.Status.Digest = hash @@ -84,11 +95,6 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return integration, nil } - kit, err := kubernetes.GetIntegrationKit(ctx, action.client, integration.Status.IntegrationKit.Name, integration.Status.IntegrationKit.Namespace) - if err != nil { - return nil, fmt.Errorf("unable to find integration kit %s/%s: %w", integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err) - } - // Check if an IntegrationKit with higher priority is ready priority, ok := kit.Labels[v1.IntegrationKitPriorityLabel] if !ok { diff --git a/pkg/controller/integrationkit/integrationkit_controller.go b/pkg/controller/integrationkit/integrationkit_controller.go index bc4225cf1..c1f839636 100644 --- a/pkg/controller/integrationkit/integrationkit_controller.go +++ b/pkg/controller/integrationkit/integrationkit_controller.go @@ -219,7 +219,7 @@ func (r *reconcileIntegrationKit) Reconcile(ctx context.Context, request reconci } // Only process resources assigned to the operator - if !platform.IsOperatorHandler(&instance) { + if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, request.Namespace, &instance) { rlog.Info("Ignoring request because resource is not assigned to current operator") return reconcile.Result{}, nil } diff --git a/pkg/controller/integrationplatform/integrationplatform_controller.go b/pkg/controller/integrationplatform/integrationplatform_controller.go index 9768be424..44257e0a3 100644 --- a/pkg/controller/integrationplatform/integrationplatform_controller.go +++ b/pkg/controller/integrationplatform/integrationplatform_controller.go @@ -155,7 +155,7 @@ func (r *reconcileIntegrationPlatform) Reconcile(ctx context.Context, request re } // Only process resources assigned to the operator - if !platform.IsOperatorHandler(&instance) { + if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, request.Namespace, &instance) { rlog.Info("Ignoring request because resource is not assigned to current operator") return reconcile.Result{}, nil } diff --git a/pkg/controller/kamelet/kamelet_controller.go b/pkg/controller/kamelet/kamelet_controller.go index 19a1375c0..5abee9855 100644 --- a/pkg/controller/kamelet/kamelet_controller.go +++ b/pkg/controller/kamelet/kamelet_controller.go @@ -143,7 +143,7 @@ func (r *reconcileKamelet) Reconcile(ctx context.Context, request reconcile.Requ } // Only process resources assigned to the operator - if !platform.IsOperatorHandler(&instance) { + if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, request.Namespace, &instance) { rlog.Info("Ignoring request because resource is not assigned to current operator") return reconcile.Result{}, nil } diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go b/pkg/controller/kameletbinding/kamelet_binding_controller.go index 2ace3da23..8cd8bebb1 100644 --- a/pkg/controller/kameletbinding/kamelet_binding_controller.go +++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go @@ -160,7 +160,7 @@ func (r *ReconcileKameletBinding) Reconcile(ctx context.Context, request reconci } // Only process resources assigned to the operator - if !platform.IsOperatorHandler(&instance) { + if !platform.IsOperatorHandlerConsideringLock(ctx, r.client, request.Namespace, &instance) { rlog.Info("Ignoring request because resource is not assigned to current operator") return reconcile.Result{}, nil } diff --git a/pkg/platform/operator.go b/pkg/platform/operator.go index 602a99105..95e3bad68 100644 --- a/pkg/platform/operator.go +++ b/pkg/platform/operator.go @@ -81,27 +81,41 @@ func IsNamespaceLocked(ctx context.Context, c ctrl.Reader, namespace string) (bo return false, nil } - var operatorLockName string - if defaults.OperatorID() != "" { - operatorLockName = fmt.Sprintf("%s-lock", defaults.OperatorID()) - } else { - operatorLockName = OperatorLockName + platforms, err := ListPrimaryPlatforms(ctx, c, namespace) + if err != nil { + return true, err } - lease := coordination.Lease{} - if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: operatorLockName}, &lease); err != nil && k8serrors.IsNotFound(err) { - return false, nil - } else if err != nil { - return true, err + for _, platform := range platforms.Items { + lease := coordination.Lease{} + + var operatorLockName string + if platform.Name != "" { + operatorLockName = fmt.Sprintf("%s-lock", platform.Name) + } else { + operatorLockName = OperatorLockName + } + + if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: operatorLockName}, &lease); err == nil || !k8serrors.IsNotFound(err) { + return true, err + } } - return true, nil + + return false, nil } // IsOperatorAllowedOnNamespace returns true if the current operator is allowed to react on changes in the given namespace. func IsOperatorAllowedOnNamespace(ctx context.Context, c ctrl.Reader, namespace string) (bool, error) { + // allow all local operators if !IsCurrentOperatorGlobal() { return true, nil } + + // allow global operators that use a proper operator id + if defaults.OperatorID() != "" { + return true, nil + } + operatorNamespace := GetOperatorNamespace() if operatorNamespace == namespace { // Global operator is allowed on its own namespace @@ -114,13 +128,64 @@ func IsOperatorAllowedOnNamespace(ctx context.Context, c ctrl.Reader, namespace return !alreadyOwned, nil } +// IsOperatorHandler checks on resource operator id annotation and this operator instance id. +// Operators matching the annotation operator id are allowed to reconcile. +// For legacy resources that are missing a proper operator id annotation the default global operator or the local +// operator in this namespace are candidates for reconciliation. func IsOperatorHandler(object ctrl.Object) bool { if object == nil { return true } - resourceID := object.GetAnnotations()[camelv1.OperatorIDAnnotation] + resourceID := camelv1.GetOperatorIDAnnotation(object) operatorID := defaults.OperatorID() - return resourceID == operatorID + + // allow operator with matching id to handle the resource + if resourceID == operatorID { + return true + } + + // check if we are dealing with resource that is missing a proper operator id annotation + if resourceID == "" { + // allow default global operator to handle legacy resources (missing proper operator id annotations) + if operatorID == DefaultPlatformName { + return true + } + + // allow local operators to handle legacy resources (missing proper operator id annotations) + if !IsCurrentOperatorGlobal() { + return true + } + } + + return false +} + +// IsOperatorHandlerConsideringLock uses normal IsOperatorHandler checks and adds additional check for legacy resources +// that are missing a proper operator id annotation. In general two kind of operators race for reconcile these legacy resources. +// The local operator for this namespace and the default global operator instance. Based on the existence of a namespace +// lock the current local operator has precedence. When no lock exists the default global operator should reconcile. +func IsOperatorHandlerConsideringLock(ctx context.Context, c ctrl.Reader, namespace string, object ctrl.Object) bool { + isHandler := IsOperatorHandler(object) + if !isHandler { + return false + } + + resourceID := camelv1.GetOperatorIDAnnotation(object) + // add additional check on resources missing an operator id + if resourceID == "" { + operatorNamespace := GetOperatorNamespace() + if operatorNamespace == namespace { + // Global operator is allowed on its own namespace + return true + } + + if locked, err := IsNamespaceLocked(ctx, c, namespace); err != nil || locked { + // namespace is locked so local operators do have precedence + return !IsCurrentOperatorGlobal() + } + } + + return true } // FilteringFuncs do preliminary checks to determine if certain events should be handled by the controller @@ -165,7 +230,7 @@ func (f FilteringFuncs) Update(e event.UpdateEvent) bool { return false } if e.ObjectOld != nil && e.ObjectNew != nil && - e.ObjectOld.GetAnnotations()[camelv1.OperatorIDAnnotation] != e.ObjectNew.GetAnnotations()[camelv1.OperatorIDAnnotation] { + camelv1.GetOperatorIDAnnotation(e.ObjectOld) != camelv1.GetOperatorIDAnnotation(e.ObjectNew) { // Always force reconciliation when the object becomes managed by the current operator return true } diff --git a/pkg/trait/platform.go b/pkg/trait/platform.go index 4cf7d5b65..263b76add 100644 --- a/pkg/trait/platform.go +++ b/pkg/trait/platform.go @@ -111,11 +111,11 @@ func (t *platformTrait) getOrCreatePlatform(e *Environment) (*v1.IntegrationPlat } defaultPlatform.Labels["camel.apache.org/platform.generated"] = True // Cascade the operator id in charge to reconcile the Integration - if e.Integration.Annotations != nil && e.Integration.Annotations[v1.OperatorIDAnnotation] != "" { + if v1.GetOperatorIDAnnotation(e.Integration) != "" { if defaultPlatform.Annotations == nil { defaultPlatform.Annotations = make(map[string]string) } - defaultPlatform.Annotations[v1.OperatorIDAnnotation] = e.Integration.Annotations[v1.OperatorIDAnnotation] + defaultPlatform.SetOperatorID(v1.GetOperatorIDAnnotation(e.Integration)) } pl = &defaultPlatform e.Resources.Add(pl) diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go index 1ddda2059..3653b6ed9 100644 --- a/pkg/util/digest/digest.go +++ b/pkg/util/digest/digest.go @@ -46,6 +46,12 @@ func ComputeForIntegration(integration *v1.Integration) (string, error) { if _, err := hash.Write([]byte(integration.Status.Version)); err != nil { return "", err } + + // Integration operator id is relevant + if _, err := hash.Write([]byte(v1.GetOperatorIDAnnotation(integration))); err != nil { + return "", err + } + // Integration Kit is relevant if integration.Spec.IntegrationKit != nil { if _, err := hash.Write([]byte(fmt.Sprintf("%s/%s", integration.Spec.IntegrationKit.Namespace, integration.Spec.IntegrationKit.Name))); err != nil {