This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 2b63e754142b542194f4b10ed506840381c36e0a
Author: Antonin Stefanutti <anto...@stefanutti.fr>
AuthorDate: Thu Sep 30 16:38:20 2021 +0200

    feat: Consistent Integration scale status
---
 pkg/cmd/operator/operator.go                       |  16 +-
 .../integration/integration_controller.go          | 243 ++++++++-------------
 pkg/controller/integration/monitor.go              |  44 ++--
 3 files changed, 126 insertions(+), 177 deletions(-)

diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index d10bd66..3d046ac 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -21,7 +21,6 @@ import (
        "context"
        "flag"
        "fmt"
-       "k8s.io/klog/v2"
        "math/rand"
        "os"
        "runtime"
@@ -32,11 +31,13 @@ import (
        corev1 "k8s.io/api/core/v1"
        "k8s.io/client-go/tools/leaderelection/resourcelock"
        "k8s.io/client-go/tools/record"
+       "k8s.io/klog/v2"
 
-       ctrl "sigs.k8s.io/controller-runtime"
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/healthz"
        logf "sigs.k8s.io/controller-runtime/pkg/log"
        "sigs.k8s.io/controller-runtime/pkg/log/zap"
+       "sigs.k8s.io/controller-runtime/pkg/manager"
        "sigs.k8s.io/controller-runtime/pkg/manager/signals"
 
        "github.com/apache/camel-k/pkg/apis"
@@ -51,7 +52,6 @@ import (
 
 var log = logf.Log.WithName("cmd")
 
-// GitCommit --
 var GitCommit string
 
 func printVersion() {
@@ -124,7 +124,7 @@ func Run(healthPort, monitoringPort int32, leaderElection 
bool) {
                log.Info("Leader election is disabled!")
        }
 
-       mgr, err := ctrl.NewManager(c.GetConfig(), ctrl.Options{
+       mgr, err := manager.New(c.GetConfig(), manager.Options{
                Namespace:                     watchNamespace,
                EventBroadcaster:              broadcaster,
                LeaderElection:                leaderElection,
@@ -137,6 +137,14 @@ func Run(healthPort, monitoringPort int32, leaderElection 
bool) {
        })
        exitOnError(err, "")
 
+       exitOnError(
+               mgr.GetFieldIndexer().IndexField(context.Background(), 
&corev1.Pod{}, "status.phase",
+                       func(obj ctrl.Object) []string {
+                               return 
[]string{string(obj.(*corev1.Pod).Status.Phase)}
+                       }),
+               "unable to set up field indexer for status.phase: %v",
+       )
+
        log.Info("Configuring manager")
        exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable 
add liveness check")
        exitOnError(apis.AddToScheme(mgr.GetScheme()), "")
diff --git a/pkg/controller/integration/integration_controller.go 
b/pkg/controller/integration/integration_controller.go
index 187b6db..962a62c 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -20,16 +20,15 @@ package integration
 import (
        "context"
 
-       appsv1 "k8s.io/api/apps/v1"
-       "k8s.io/api/batch/v1beta1"
+       corev1 "k8s.io/api/core/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/tools/record"
 
+       "sigs.k8s.io/controller-runtime/pkg/builder"
        ctrl "sigs.k8s.io/controller-runtime/pkg/client"
-       "sigs.k8s.io/controller-runtime/pkg/controller"
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/handler"
        "sigs.k8s.io/controller-runtime/pkg/manager"
@@ -51,7 +50,7 @@ func Add(mgr manager.Manager) error {
        if err != nil {
                return err
        }
-       return add(mgr, newReconciler(mgr, c), c)
+       return add(mgr, newReconciler(mgr, c))
 }
 
 func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler {
@@ -69,109 +68,59 @@ func newReconciler(mgr manager.Manager, c client.Client) 
reconcile.Reconciler {
        )
 }
 
-func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error {
-       // Create a new controller
-       c, err := controller.New("integration-controller", mgr, 
controller.Options{Reconciler: r})
-       if err != nil {
-               return err
-       }
-
-       // Watch for changes to primary resource Integration
-       err = c.Watch(&source.Kind{Type: &v1.Integration{}},
-               &handler.EnqueueRequestForObject{},
-               predicate.Funcs{
-                       UpdateFunc: func(e event.UpdateEvent) bool {
-                               oldIntegration := e.ObjectOld.(*v1.Integration)
-                               newIntegration := e.ObjectNew.(*v1.Integration)
-                               // Ignore updates to the integration status in 
which case metadata.Generation does not change,
-                               // or except when the integration phase changes 
as it's used to transition from one phase
-                               // to another.
-                               return oldIntegration.Generation != 
newIntegration.Generation ||
-                                       oldIntegration.Status.Phase != 
newIntegration.Status.Phase
-                       },
-                       DeleteFunc: func(e event.DeleteEvent) bool {
-                               // Evaluates to false if the object has been 
confirmed deleted
-                               return !e.DeleteStateUnknown
-                       },
-               },
-       )
-       if err != nil {
-               return err
-       }
-
-       // Watch for IntegrationKit phase transitioning to ready or error, and
-       // enqueue requests for any integration that matches the kit, in 
building
-       // or running phase.
-       err = c.Watch(&source.Kind{Type: &v1.IntegrationKit{}},
-               handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
-                       kit := a.(*v1.IntegrationKit)
-                       var requests []reconcile.Request
-
-                       if kit.Status.Phase != v1.IntegrationKitPhaseReady && 
kit.Status.Phase != v1.IntegrationKitPhaseError {
-                               return requests
-                       }
-
-                       list := &v1.IntegrationList{}
-                       // Do global search in case of global operator (it may 
be using a global platform)
-                       var opts []ctrl.ListOption
-                       if !platform.IsCurrentOperatorGlobal() {
-                               opts = append(opts, 
ctrl.InNamespace(kit.Namespace))
-                       }
-                       if err := mgr.GetClient().List(context.Background(), 
list, opts...); err != nil {
-                               log.Error(err, "Failed to retrieve integration 
list")
-                               return requests
-                       }
-
-                       for _, integration := range list.Items {
-                               if match, err := 
integrationMatches(&integration, kit); err != nil {
-                                       log.Errorf(err, "Error matching 
integration %q with kit %q", integration.Name, kit.Name)
-                                       continue
-                               } else if !match {
-                                       continue
-                               }
-                               if integration.Status.Phase == 
v1.IntegrationPhaseBuildingKit ||
-                                       integration.Status.Phase == 
v1.IntegrationPhaseRunning {
-                                       log.Infof("Kit %s ready, notify 
integration: %s", kit.Name, integration.Name)
-                                       requests = append(requests, 
reconcile.Request{
-                                               NamespacedName: 
types.NamespacedName{
-                                                       Namespace: 
integration.Namespace,
-                                                       Name:      
integration.Name,
-                                               },
-                                       })
+func add(mgr manager.Manager, r reconcile.Reconciler) error {
+       return builder.ControllerManagedBy(mgr).
+               Named("integration-controller").
+               // Watch for changes to primary resource Integration
+               For(&v1.Integration{}, builder.WithPredicates(
+                       predicate.Funcs{
+                               UpdateFunc: func(e event.UpdateEvent) bool {
+                                       oldIntegration := 
e.ObjectOld.(*v1.Integration)
+                                       newIntegration := 
e.ObjectNew.(*v1.Integration)
+                                       // Ignore updates to the integration 
status in which case metadata.Generation does not change,
+                                       // or except when the integration phase 
changes as it's used to transition from one phase
+                                       // to another.
+                                       return oldIntegration.Generation != 
newIntegration.Generation ||
+                                               oldIntegration.Status.Phase != 
newIntegration.Status.Phase
+                               },
+                               DeleteFunc: func(e event.DeleteEvent) bool {
+                                       // Evaluates to false if the object has 
been confirmed deleted
+                                       return !e.DeleteStateUnknown
+                               },
+                       })).
+               // Watch for IntegrationKit phase transitioning to ready or 
error, and
+               // enqueue requests for any integration that matches the kit, 
in building
+               // or running phase.
+               Watches(&source.Kind{Type: &v1.IntegrationKit{}},
+                       handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
+                               kit := a.(*v1.IntegrationKit)
+                               var requests []reconcile.Request
+
+                               if kit.Status.Phase != 
v1.IntegrationKitPhaseReady && kit.Status.Phase != v1.IntegrationKitPhaseError {
+                                       return requests
                                }
-                       }
-
-                       return requests
-               }),
-       )
-       if err != nil {
-               return err
-       }
-
-       // Watch for IntegrationPlatform phase transitioning to ready and 
enqueue
-       // requests for any integrations that are in phase waiting for platform
-       err = c.Watch(&source.Kind{Type: &v1.IntegrationPlatform{}},
-               handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
-                       p := a.(*v1.IntegrationPlatform)
-                       var requests []reconcile.Request
 
-                       if p.Status.Phase == v1.IntegrationPlatformPhaseReady {
                                list := &v1.IntegrationList{}
-
                                // Do global search in case of global operator 
(it may be using a global platform)
                                var opts []ctrl.ListOption
                                if !platform.IsCurrentOperatorGlobal() {
-                                       opts = append(opts, 
ctrl.InNamespace(p.Namespace))
+                                       opts = append(opts, 
ctrl.InNamespace(kit.Namespace))
                                }
-
-                               if err := mgr.GetClient().List(context.TODO(), 
list, opts...); err != nil {
-                                       log.Error(err, "Failed to list 
integrations")
+                               if err := 
mgr.GetClient().List(context.Background(), list, opts...); err != nil {
+                                       log.Error(err, "Failed to retrieve 
integration list")
                                        return requests
                                }
 
                                for _, integration := range list.Items {
-                                       if integration.Status.Phase == 
v1.IntegrationPhaseWaitingForPlatform {
-                                               log.Infof("Platform %s ready, 
wake-up integration: %s", p.Name, integration.Name)
+                                       if match, err := 
integrationMatches(&integration, kit); err != nil {
+                                               log.Errorf(err, "Error matching 
integration %q with kit %q", integration.Name, kit.Name)
+                                               continue
+                                       } else if !match {
+                                               continue
+                                       }
+                                       if integration.Status.Phase == 
v1.IntegrationPhaseBuildingKit ||
+                                               integration.Status.Phase == 
v1.IntegrationPhaseRunning {
+                                               log.Infof("Kit %s ready, notify 
integration: %s", kit.Name, integration.Name)
                                                requests = append(requests, 
reconcile.Request{
                                                        NamespacedName: 
types.NamespacedName{
                                                                Namespace: 
integration.Namespace,
@@ -180,63 +129,59 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl 
client.Client) error {
                                                })
                                        }
                                }
-                       }
 
-                       return requests
-               }),
-       )
-       if err != nil {
-               return err
-       }
-
-       // Watch for ReplicaSet to reconcile replicas to the integration 
status. We cannot use
-       // the EnqueueRequestForOwner handler as the owner depends on the 
deployment strategy,
-       // either regular deployment or Knative service. In any case, the 
integration is not the
-       // direct owner of the ReplicaSet.
-       err = c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}},
-               handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
-                       rs := a.(*appsv1.ReplicaSet)
-                       var requests []reconcile.Request
-
-                       labels := rs.GetLabels()
-                       integrationName, ok := labels[v1.IntegrationLabel]
-                       if ok {
-                               requests = append(requests, reconcile.Request{
-                                       NamespacedName: types.NamespacedName{
-                                               Namespace: rs.Namespace,
-                                               Name:      integrationName,
-                                       },
-                               })
-                       }
+                               return requests
+                       })).
+               // Watch for IntegrationPlatform phase transitioning to ready 
and enqueue
+               // requests for any integrations that are in phase waiting for 
platform
+               Watches(&source.Kind{Type: &v1.IntegrationPlatform{}},
+                       handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
+                               p := a.(*v1.IntegrationPlatform)
+                               var requests []reconcile.Request
+
+                               if p.Status.Phase == 
v1.IntegrationPlatformPhaseReady {
+                                       list := &v1.IntegrationList{}
+
+                                       // Do global search in case of global 
operator (it may be using a global platform)
+                                       var opts []ctrl.ListOption
+                                       if !platform.IsCurrentOperatorGlobal() {
+                                               opts = append(opts, 
ctrl.InNamespace(p.Namespace))
+                                       }
 
-                       return requests
-               }),
-               predicate.Funcs{
-                       UpdateFunc: func(e event.UpdateEvent) bool {
-                               oldReplicaSet := 
e.ObjectOld.(*appsv1.ReplicaSet)
-                               newReplicaSet := 
e.ObjectNew.(*appsv1.ReplicaSet)
-                               // Ignore updates to the ReplicaSet other than 
the replicas ones,
-                               // that are used to reconcile the integration 
replicas.
-                               return oldReplicaSet.Status.Replicas != 
newReplicaSet.Status.Replicas ||
-                                       oldReplicaSet.Status.ReadyReplicas != 
newReplicaSet.Status.ReadyReplicas ||
-                                       oldReplicaSet.Status.AvailableReplicas 
!= newReplicaSet.Status.AvailableReplicas
-                       },
-               },
-       )
-       if err != nil {
-               return err
-       }
+                                       if err := 
mgr.GetClient().List(context.Background(), list, opts...); err != nil {
+                                               log.Error(err, "Failed to list 
integrations")
+                                               return requests
+                                       }
 
-       // Watch for CronJob to update the ready condition
-       err = c.Watch(&source.Kind{Type: &v1beta1.CronJob{}}, 
&handler.EnqueueRequestForOwner{
-               OwnerType:    &v1.Integration{},
-               IsController: false,
-       })
-       if err != nil {
-               return err
-       }
+                                       for _, integration := range list.Items {
+                                               if integration.Status.Phase == 
v1.IntegrationPhaseWaitingForPlatform {
+                                                       log.Infof("Platform %s 
ready, wake-up integration: %s", p.Name, integration.Name)
+                                                       requests = 
append(requests, reconcile.Request{
+                                                               NamespacedName: 
types.NamespacedName{
+                                                                       
Namespace: integration.Namespace,
+                                                                       Name:   
   integration.Name,
+                                                               },
+                                                       })
+                                               }
+                                       }
+                               }
 
-       return nil
+                               return requests
+                       })).
+               // Watch for the Integration Pods
+               Watches(&source.Kind{Type: &corev1.Pod{}},
+                       handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
+                               pod := a.(*corev1.Pod)
+                               return []reconcile.Request{
+                                       {
+                                               NamespacedName: 
types.NamespacedName{
+                                                       Namespace: 
pod.GetNamespace(),
+                                                       Name:      
pod.Labels[v1.IntegrationLabel],
+                                               },
+                                       },
+                               }
+                       })).
+               Complete(r)
 }
 
 var _ reconcile.Reconciler = &reconcileIntegration{}
diff --git a/pkg/controller/integration/monitor.go 
b/pkg/controller/integration/monitor.go
index a475abe..33911ae 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -36,7 +36,6 @@ import (
        "github.com/apache/camel-k/pkg/util/kubernetes"
 )
 
-// NewMonitorAction creates a new monitoring action for an integration
 func NewMonitorAction() Action {
        return &monitorAction{}
 }
@@ -115,25 +114,32 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
        // to list the pods owned by the integration.
        integration.Status.Selector = v1.IntegrationLabel + "=" + 
integration.Name
 
-       // Check replicas
-       replicaSets := &appsv1.ReplicaSetList{}
-       err = action.client.List(ctx, replicaSets,
+       // Update the replicas count
+       pendingPods := &corev1.PodList{}
+       err = action.client.List(ctx, pendingPods,
                ctrl.InNamespace(integration.Namespace),
-               ctrl.MatchingLabels{
-                       v1.IntegrationLabel: integration.Name,
-               })
+               ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
+               ctrl.MatchingFields{"status.phase": string(corev1.PodPending)})
        if err != nil {
                return nil, err
        }
-
-       // And update the scale status accordingly
-       if len(replicaSets.Items) > 0 {
-               replicaSet := findLatestReplicaSet(replicaSets)
-               replicas := replicaSet.Status.Replicas
-               if integration.Status.Replicas == nil || replicas != 
*integration.Status.Replicas {
-                       integration.Status.Replicas = &replicas
+       runningPods := &corev1.PodList{}
+       err = action.client.List(ctx, runningPods,
+               ctrl.InNamespace(integration.Namespace),
+               ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
+               ctrl.MatchingFields{"status.phase": string(corev1.PodRunning)})
+       if err != nil {
+               return nil, err
+       }
+       nonTerminatingPods := 0
+       for _, pod := range runningPods.Items {
+               if pod.DeletionTimestamp != nil {
+                       continue
                }
+               nonTerminatingPods++
        }
+       podCount := int32(len(pendingPods.Items) + nonTerminatingPods)
+       integration.Status.Replicas = &podCount
 
        // Reconcile Integration phase
        if integration.Status.Phase == v1.IntegrationPhaseDeploying {
@@ -218,16 +224,6 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
        return integration, nil
 }
 
-func findLatestReplicaSet(list *appsv1.ReplicaSetList) *appsv1.ReplicaSet {
-       latest := list.Items[0]
-       for i, rs := range list.Items[1:] {
-               if latest.CreationTimestamp.Before(&rs.CreationTimestamp) {
-                       latest = list.Items[i+1]
-               }
-       }
-       return &latest
-}
-
 func findHighestPriorityReadyKit(kits []v1.IntegrationKit) 
(*v1.IntegrationKit, error) {
        if len(kits) == 0 {
                return nil, nil

Reply via email to