This is an automated email from the ASF dual-hosted git repository. astefanutti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 2b63e754142b542194f4b10ed506840381c36e0a Author: Antonin Stefanutti <anto...@stefanutti.fr> AuthorDate: Thu Sep 30 16:38:20 2021 +0200 feat: Consistent Integration scale status --- pkg/cmd/operator/operator.go | 16 +- .../integration/integration_controller.go | 243 ++++++++------------- pkg/controller/integration/monitor.go | 44 ++-- 3 files changed, 126 insertions(+), 177 deletions(-) diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index d10bd66..3d046ac 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -21,7 +21,6 @@ import ( "context" "flag" "fmt" - "k8s.io/klog/v2" "math/rand" "os" "runtime" @@ -32,11 +31,13 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager/signals" "github.com/apache/camel-k/pkg/apis" @@ -51,7 +52,6 @@ import ( var log = logf.Log.WithName("cmd") -// GitCommit -- var GitCommit string func printVersion() { @@ -124,7 +124,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { log.Info("Leader election is disabled!") } - mgr, err := ctrl.NewManager(c.GetConfig(), ctrl.Options{ + mgr, err := manager.New(c.GetConfig(), manager.Options{ Namespace: watchNamespace, EventBroadcaster: broadcaster, LeaderElection: leaderElection, @@ -137,6 +137,14 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { }) exitOnError(err, "") + exitOnError( + mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, "status.phase", + func(obj ctrl.Object) []string { + return []string{string(obj.(*corev1.Pod).Status.Phase)} + }), + "unable to set up field indexer for status.phase: %v", + ) + log.Info("Configuring manager") exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check") exitOnError(apis.AddToScheme(mgr.GetScheme()), "") diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 187b6db..962a62c 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -20,16 +20,15 @@ package integration import ( "context" - appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/builder" ctrl "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -51,7 +50,7 @@ func Add(mgr manager.Manager) error { if err != nil { return err } - return add(mgr, newReconciler(mgr, c), c) + return add(mgr, newReconciler(mgr, c)) } func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { @@ -69,109 +68,59 @@ func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { ) } -func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error { - // Create a new controller - c, err := controller.New("integration-controller", mgr, controller.Options{Reconciler: r}) - if err != nil { - return err - } - - // Watch for changes to primary resource Integration - err = c.Watch(&source.Kind{Type: &v1.Integration{}}, - &handler.EnqueueRequestForObject{}, - predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - oldIntegration := e.ObjectOld.(*v1.Integration) - newIntegration := e.ObjectNew.(*v1.Integration) - // Ignore updates to the integration status in which case metadata.Generation does not change, - // or except when the integration phase changes as it's used to transition from one phase - // to another. - return oldIntegration.Generation != newIntegration.Generation || - oldIntegration.Status.Phase != newIntegration.Status.Phase - }, - DeleteFunc: func(e event.DeleteEvent) bool { - // Evaluates to false if the object has been confirmed deleted - return !e.DeleteStateUnknown - }, - }, - ) - if err != nil { - return err - } - - // Watch for IntegrationKit phase transitioning to ready or error, and - // enqueue requests for any integration that matches the kit, in building - // or running phase. - err = c.Watch(&source.Kind{Type: &v1.IntegrationKit{}}, - handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { - kit := a.(*v1.IntegrationKit) - var requests []reconcile.Request - - if kit.Status.Phase != v1.IntegrationKitPhaseReady && kit.Status.Phase != v1.IntegrationKitPhaseError { - return requests - } - - list := &v1.IntegrationList{} - // Do global search in case of global operator (it may be using a global platform) - var opts []ctrl.ListOption - if !platform.IsCurrentOperatorGlobal() { - opts = append(opts, ctrl.InNamespace(kit.Namespace)) - } - if err := mgr.GetClient().List(context.Background(), list, opts...); err != nil { - log.Error(err, "Failed to retrieve integration list") - return requests - } - - for _, integration := range list.Items { - if match, err := integrationMatches(&integration, kit); err != nil { - log.Errorf(err, "Error matching integration %q with kit %q", integration.Name, kit.Name) - continue - } else if !match { - continue - } - if integration.Status.Phase == v1.IntegrationPhaseBuildingKit || - integration.Status.Phase == v1.IntegrationPhaseRunning { - log.Infof("Kit %s ready, notify integration: %s", kit.Name, integration.Name) - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: integration.Namespace, - Name: integration.Name, - }, - }) +func add(mgr manager.Manager, r reconcile.Reconciler) error { + return builder.ControllerManagedBy(mgr). + Named("integration-controller"). + // Watch for changes to primary resource Integration + For(&v1.Integration{}, builder.WithPredicates( + predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + oldIntegration := e.ObjectOld.(*v1.Integration) + newIntegration := e.ObjectNew.(*v1.Integration) + // Ignore updates to the integration status in which case metadata.Generation does not change, + // or except when the integration phase changes as it's used to transition from one phase + // to another. + return oldIntegration.Generation != newIntegration.Generation || + oldIntegration.Status.Phase != newIntegration.Status.Phase + }, + DeleteFunc: func(e event.DeleteEvent) bool { + // Evaluates to false if the object has been confirmed deleted + return !e.DeleteStateUnknown + }, + })). + // Watch for IntegrationKit phase transitioning to ready or error, and + // enqueue requests for any integration that matches the kit, in building + // or running phase. + Watches(&source.Kind{Type: &v1.IntegrationKit{}}, + handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { + kit := a.(*v1.IntegrationKit) + var requests []reconcile.Request + + if kit.Status.Phase != v1.IntegrationKitPhaseReady && kit.Status.Phase != v1.IntegrationKitPhaseError { + return requests } - } - - return requests - }), - ) - if err != nil { - return err - } - - // Watch for IntegrationPlatform phase transitioning to ready and enqueue - // requests for any integrations that are in phase waiting for platform - err = c.Watch(&source.Kind{Type: &v1.IntegrationPlatform{}}, - handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { - p := a.(*v1.IntegrationPlatform) - var requests []reconcile.Request - if p.Status.Phase == v1.IntegrationPlatformPhaseReady { list := &v1.IntegrationList{} - // Do global search in case of global operator (it may be using a global platform) var opts []ctrl.ListOption if !platform.IsCurrentOperatorGlobal() { - opts = append(opts, ctrl.InNamespace(p.Namespace)) + opts = append(opts, ctrl.InNamespace(kit.Namespace)) } - - if err := mgr.GetClient().List(context.TODO(), list, opts...); err != nil { - log.Error(err, "Failed to list integrations") + if err := mgr.GetClient().List(context.Background(), list, opts...); err != nil { + log.Error(err, "Failed to retrieve integration list") return requests } for _, integration := range list.Items { - if integration.Status.Phase == v1.IntegrationPhaseWaitingForPlatform { - log.Infof("Platform %s ready, wake-up integration: %s", p.Name, integration.Name) + if match, err := integrationMatches(&integration, kit); err != nil { + log.Errorf(err, "Error matching integration %q with kit %q", integration.Name, kit.Name) + continue + } else if !match { + continue + } + if integration.Status.Phase == v1.IntegrationPhaseBuildingKit || + integration.Status.Phase == v1.IntegrationPhaseRunning { + log.Infof("Kit %s ready, notify integration: %s", kit.Name, integration.Name) requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: integration.Namespace, @@ -180,63 +129,59 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error { }) } } - } - return requests - }), - ) - if err != nil { - return err - } - - // Watch for ReplicaSet to reconcile replicas to the integration status. We cannot use - // the EnqueueRequestForOwner handler as the owner depends on the deployment strategy, - // either regular deployment or Knative service. In any case, the integration is not the - // direct owner of the ReplicaSet. - err = c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, - handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { - rs := a.(*appsv1.ReplicaSet) - var requests []reconcile.Request - - labels := rs.GetLabels() - integrationName, ok := labels[v1.IntegrationLabel] - if ok { - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: rs.Namespace, - Name: integrationName, - }, - }) - } + return requests + })). + // Watch for IntegrationPlatform phase transitioning to ready and enqueue + // requests for any integrations that are in phase waiting for platform + Watches(&source.Kind{Type: &v1.IntegrationPlatform{}}, + handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { + p := a.(*v1.IntegrationPlatform) + var requests []reconcile.Request + + if p.Status.Phase == v1.IntegrationPlatformPhaseReady { + list := &v1.IntegrationList{} + + // Do global search in case of global operator (it may be using a global platform) + var opts []ctrl.ListOption + if !platform.IsCurrentOperatorGlobal() { + opts = append(opts, ctrl.InNamespace(p.Namespace)) + } - return requests - }), - predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - oldReplicaSet := e.ObjectOld.(*appsv1.ReplicaSet) - newReplicaSet := e.ObjectNew.(*appsv1.ReplicaSet) - // Ignore updates to the ReplicaSet other than the replicas ones, - // that are used to reconcile the integration replicas. - return oldReplicaSet.Status.Replicas != newReplicaSet.Status.Replicas || - oldReplicaSet.Status.ReadyReplicas != newReplicaSet.Status.ReadyReplicas || - oldReplicaSet.Status.AvailableReplicas != newReplicaSet.Status.AvailableReplicas - }, - }, - ) - if err != nil { - return err - } + if err := mgr.GetClient().List(context.Background(), list, opts...); err != nil { + log.Error(err, "Failed to list integrations") + return requests + } - // Watch for CronJob to update the ready condition - err = c.Watch(&source.Kind{Type: &v1beta1.CronJob{}}, &handler.EnqueueRequestForOwner{ - OwnerType: &v1.Integration{}, - IsController: false, - }) - if err != nil { - return err - } + for _, integration := range list.Items { + if integration.Status.Phase == v1.IntegrationPhaseWaitingForPlatform { + log.Infof("Platform %s ready, wake-up integration: %s", p.Name, integration.Name) + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: integration.Namespace, + Name: integration.Name, + }, + }) + } + } + } - return nil + return requests + })). + // Watch for the Integration Pods + Watches(&source.Kind{Type: &corev1.Pod{}}, + handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { + pod := a.(*corev1.Pod) + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: pod.GetNamespace(), + Name: pod.Labels[v1.IntegrationLabel], + }, + }, + } + })). + Complete(r) } var _ reconcile.Reconciler = &reconcileIntegration{} diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index a475abe..33911ae 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -36,7 +36,6 @@ import ( "github.com/apache/camel-k/pkg/util/kubernetes" ) -// NewMonitorAction creates a new monitoring action for an integration func NewMonitorAction() Action { return &monitorAction{} } @@ -115,25 +114,32 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra // to list the pods owned by the integration. integration.Status.Selector = v1.IntegrationLabel + "=" + integration.Name - // Check replicas - replicaSets := &appsv1.ReplicaSetList{} - err = action.client.List(ctx, replicaSets, + // Update the replicas count + pendingPods := &corev1.PodList{} + err = action.client.List(ctx, pendingPods, ctrl.InNamespace(integration.Namespace), - ctrl.MatchingLabels{ - v1.IntegrationLabel: integration.Name, - }) + ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, + ctrl.MatchingFields{"status.phase": string(corev1.PodPending)}) if err != nil { return nil, err } - - // And update the scale status accordingly - if len(replicaSets.Items) > 0 { - replicaSet := findLatestReplicaSet(replicaSets) - replicas := replicaSet.Status.Replicas - if integration.Status.Replicas == nil || replicas != *integration.Status.Replicas { - integration.Status.Replicas = &replicas + runningPods := &corev1.PodList{} + err = action.client.List(ctx, runningPods, + ctrl.InNamespace(integration.Namespace), + ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, + ctrl.MatchingFields{"status.phase": string(corev1.PodRunning)}) + if err != nil { + return nil, err + } + nonTerminatingPods := 0 + for _, pod := range runningPods.Items { + if pod.DeletionTimestamp != nil { + continue } + nonTerminatingPods++ } + podCount := int32(len(pendingPods.Items) + nonTerminatingPods) + integration.Status.Replicas = &podCount // Reconcile Integration phase if integration.Status.Phase == v1.IntegrationPhaseDeploying { @@ -218,16 +224,6 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return integration, nil } -func findLatestReplicaSet(list *appsv1.ReplicaSetList) *appsv1.ReplicaSet { - latest := list.Items[0] - for i, rs := range list.Items[1:] { - if latest.CreationTimestamp.Before(&rs.CreationTimestamp) { - latest = list.Items[i+1] - } - } - return &latest -} - func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, error) { if len(kits) == 0 { return nil, nil