This is an automated email from the ASF dual-hosted git repository. tsato pushed a commit to branch release-1.9.x in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 204af03e7bd04c6d5a733fa974d6138f9e4b5a91 Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Fri May 27 18:02:39 2022 +0900 chore(controller): refactor integration monitor --- pkg/controller/integration/monitor.go | 194 +++++++++-------------- pkg/controller/integration/monitor_cronjob.go | 104 ++++++++++++ pkg/controller/integration/monitor_deployment.go | 77 +++++++++ pkg/controller/integration/monitor_knative.go | 60 +++++++ 4 files changed, 312 insertions(+), 123 deletions(-) diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index e0877e734..0dfb695b3 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -26,7 +26,6 @@ import ( "strconv" appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -164,85 +163,87 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return integration, nil } -func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error { - var controller ctrl.Object - var lastCompletedJob *batchv1.Job - var podSpec corev1.PodSpec +type controller interface { + checkReadyCondition() (bool, error) + getPodSpec() corev1.PodSpec + updateReadyCondition(readyPods []corev1.Pod) bool +} +func (action *monitorAction) newController(ctx context.Context, env *trait.Environment, integration *v1.Integration) (controller, error) { + var controller controller + var obj ctrl.Object switch { case isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable): - controller = &appsv1.Deployment{} + obj = getUpdatedController(env, &appsv1.Deployment{}) + controller = &deploymentController{ + obj: obj.(*appsv1.Deployment), + integration: integration, + } case isConditionTrue(integration, v1.IntegrationConditionKnativeServiceAvailable): - controller = &servingv1.Service{} + obj = getUpdatedController(env, &servingv1.Service{}) + controller = &knativeServiceController{ + obj: obj.(*servingv1.Service), + integration: integration, + } case isConditionTrue(integration, v1.IntegrationConditionCronJobAvailable): - controller = &batchv1beta1.CronJob{} + obj = getUpdatedController(env, &batchv1beta1.CronJob{}) + controller = &cronJobController{ + obj: obj.(*batchv1beta1.CronJob), + integration: integration, + client: action.client, + context: ctx, + } default: - return fmt.Errorf("unsupported controller for integration %s", integration.Name) + return nil, fmt.Errorf("unsupported controller for integration %s", integration.Name) } - // Retrieve the controller updated from the deployer trait execution - controller = environment.Resources.GetController(func(object ctrl.Object) bool { - return reflect.TypeOf(controller) == reflect.TypeOf(object) + if obj == nil { + return nil, fmt.Errorf("unable to retrieve controller for integration %s", integration.Name) + } + + return controller, nil +} + +// getUpdatedController retrieves the controller updated from the deployer trait execution. +func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object { + return env.Resources.GetController(func(object ctrl.Object) bool { + return reflect.TypeOf(obj) == reflect.TypeOf(object) }) - if controller == nil { - return fmt.Errorf("unable to retrieve controller for integration %s", integration.Name) +} + +func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error { + controller, err := action.newController(ctx, environment, integration) + if err != nil { + return err } - switch c := controller.(type) { - case *appsv1.Deployment: - // Check the Deployment progression - if progressing := kubernetes.GetDeploymentCondition(*c, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" { - integration.Status.Phase = v1.IntegrationPhaseError - setReadyConditionError(integration, progressing.Message) - return nil - } - podSpec = c.Spec.Template.Spec + if done, err := controller.checkReadyCondition(); done || err != nil { + return err + } + if done := checkPodStatuses(integration, pendingPods, runningPods); done { + return nil + } + integration.Status.Phase = v1.IntegrationPhaseRunning - case *servingv1.Service: - // Check the KnativeService conditions - if ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" { - integration.Status.Phase = v1.IntegrationPhaseError - setReadyConditionError(integration, ready.Message) - return nil - } - podSpec = c.Spec.Template.Spec.PodSpec - - case *batchv1beta1.CronJob: - // Check latest job result - if lastScheduleTime := c.Status.LastScheduleTime; lastScheduleTime != nil && len(c.Status.Active) == 0 { - jobs := batchv1.JobList{} - if err := action.client.List(ctx, &jobs, - ctrl.InNamespace(integration.Namespace), - ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, - ); err != nil { - return err - } - t := lastScheduleTime.Time - for i, job := range jobs.Items { - if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) { - continue - } - lastCompletedJob = &jobs.Items[i] - t = lastCompletedJob.CreationTimestamp.Time - } - if lastCompletedJob != nil { - if failed := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue { - setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", lastCompletedJob.Name, failed.Message)) - integration.Status.Phase = v1.IntegrationPhaseError - return nil - } - } - } - podSpec = c.Spec.JobTemplate.Spec.Template.Spec + readyPods, unreadyPods := filterPodsByReadyStatus(runningPods, controller.getPodSpec()) + if done := controller.updateReadyCondition(readyPods); done { + return nil + } + if err := action.probeReadiness(ctx, environment, integration, unreadyPods); err != nil { + return err } + return nil +} + +func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool { // Check Pods statuses for _, pod := range pendingPods { // Check the scheduled condition if scheduled := kubernetes.GetPodCondition(pod, corev1.PodScheduled); scheduled != nil && scheduled.Status == corev1.ConditionFalse && scheduled.Reason == "Unschedulable" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, scheduled.Message) - return nil + return true } } // Check pending container statuses @@ -255,7 +256,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "ImagePullBackOff" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, waiting.Message) - return nil + return true } } } @@ -272,18 +273,20 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "CrashLoopBackOff" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, waiting.Message) - return nil + return true } if terminated := container.State.Terminated; terminated != nil && terminated.Reason == "Error" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, terminated.Message) - return nil + return true } } } - integration.Status.Phase = v1.IntegrationPhaseRunning + return false +} +func filterPodsByReadyStatus(runningPods []corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) { var readyPods []corev1.Pod var unreadyPods []corev1.Pod for _, pod := range runningPods { @@ -308,66 +311,11 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context } } - switch c := controller.(type) { - case *appsv1.Deployment: - replicas := int32(1) - if r := integration.Spec.Replicas; r != nil { - replicas = *r - } - // The Deployment status reports updated and ready replicas separately, - // so that the number of ready replicas also accounts for older versions. - readyReplicas := int32(len(readyPods)) - switch { - case readyReplicas >= replicas: - // The Integration is considered ready when the number of replicas - // reported to be ready is larger than or equal to the specified number - // of replicas. This avoids reporting a falsy readiness condition - // when the Integration is being down-scaled. - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) - return nil - - case c.Status.UpdatedReplicas < replicas: - setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas)) - - default: - setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) - } - - case *servingv1.Service: - ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady) - if ready.IsTrue() { - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "") - return nil - } - setReadyCondition(integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage()) - - case *batchv1beta1.CronJob: - switch { - case c.Status.LastScheduleTime == nil: - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created") - return nil - - case len(c.Status.Active) > 0: - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active") - return nil - - case c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0: - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available") - return nil - - case lastCompletedJob != nil: - if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue { - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name)) - return nil - } - - default: - integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "") - } - } + return readyPods, unreadyPods +} - // Finally, call the readiness probes of the non-ready Pods directly, - // to retrieve insights from the Camel runtime. +// probeReadiness calls the readiness probes of the non-ready Pods directly to retrieve insights from the Camel runtime. +func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, unreadyPods []corev1.Pod) error { var runtimeNotReadyMessages []string for i := range unreadyPods { pod := &unreadyPods[i] diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go new file mode 100644 index 000000000..8a024df16 --- /dev/null +++ b/pkg/controller/integration/monitor_cronjob.go @@ -0,0 +1,104 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "fmt" + + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + corev1 "k8s.io/api/core/v1" + + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/client" + "github.com/apache/camel-k/pkg/util/kubernetes" +) + +type cronJobController struct { + obj *batchv1beta1.CronJob + integration *v1.Integration + client client.Client + context context.Context + lastCompletedJob *batchv1.Job +} + +var _ controller = &cronJobController{} + +func (c *cronJobController) checkReadyCondition() (bool, error) { + // Check latest job result + if lastScheduleTime := c.obj.Status.LastScheduleTime; lastScheduleTime != nil && len(c.obj.Status.Active) == 0 { + jobs := batchv1.JobList{} + if err := c.client.List(c.context, &jobs, + ctrl.InNamespace(c.integration.Namespace), + ctrl.MatchingLabels{v1.IntegrationLabel: c.integration.Name}, + ); err != nil { + return true, err + } + t := lastScheduleTime.Time + for i, job := range jobs.Items { + if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) { + continue + } + c.lastCompletedJob = &jobs.Items[i] + t = c.lastCompletedJob.CreationTimestamp.Time + } + if c.lastCompletedJob != nil { + if failed := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue { + setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", c.lastCompletedJob.Name, failed.Message)) + c.integration.Status.Phase = v1.IntegrationPhaseError + return true, nil + } + } + } + + return false, nil +} + +func (c *cronJobController) getPodSpec() corev1.PodSpec { + return c.obj.Spec.JobTemplate.Spec.Template.Spec +} + +func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool { + switch { + case c.obj.Status.LastScheduleTime == nil: + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created") + return true + + case len(c.obj.Status.Active) > 0: + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active") + return true + + case c.obj.Spec.SuccessfulJobsHistoryLimit != nil && *c.obj.Spec.SuccessfulJobsHistoryLimit == 0 && c.obj.Spec.FailedJobsHistoryLimit != nil && *c.obj.Spec.FailedJobsHistoryLimit == 0: + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available") + return true + + case c.lastCompletedJob != nil: + if complete := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue { + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", c.lastCompletedJob.Name)) + return true + } + + default: + setReadyCondition(c.integration, corev1.ConditionUnknown, "", "") + } + + return false +} diff --git a/pkg/controller/integration/monitor_deployment.go b/pkg/controller/integration/monitor_deployment.go new file mode 100644 index 000000000..9cf748ff6 --- /dev/null +++ b/pkg/controller/integration/monitor_deployment.go @@ -0,0 +1,77 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/kubernetes" +) + +type deploymentController struct { + obj *appsv1.Deployment + integration *v1.Integration +} + +var _ controller = &deploymentController{} + +func (c *deploymentController) checkReadyCondition() (bool, error) { + // Check the Deployment progression + if progressing := kubernetes.GetDeploymentCondition(*c.obj, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" { + c.integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(c.integration, progressing.Message) + return true, nil + } + + return false, nil +} + +func (c *deploymentController) getPodSpec() corev1.PodSpec { + return c.obj.Spec.Template.Spec +} + +func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod) bool { + replicas := int32(1) + if r := c.integration.Spec.Replicas; r != nil { + replicas = *r + } + // The Deployment status reports updated and ready replicas separately, + // so that the number of ready replicas also accounts for older versions. + readyReplicas := int32(len(readyPods)) + switch { + case readyReplicas >= replicas: + // The Integration is considered ready when the number of replicas + // reported to be ready is larger than or equal to the specified number + // of replicas. This avoids reporting a falsy readiness condition + // when the Integration is being down-scaled. + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) + return true + + case c.obj.Status.UpdatedReplicas < replicas: + setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.obj.Status.UpdatedReplicas, replicas)) + + default: + setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) + } + + return false +} diff --git a/pkg/controller/integration/monitor_knative.go b/pkg/controller/integration/monitor_knative.go new file mode 100644 index 000000000..cf8d09860 --- /dev/null +++ b/pkg/controller/integration/monitor_knative.go @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + corev1 "k8s.io/api/core/v1" + + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/kubernetes" +) + +type knativeServiceController struct { + obj *servingv1.Service + integration *v1.Integration +} + +var _ controller = &knativeServiceController{} + +func (c *knativeServiceController) checkReadyCondition() (bool, error) { + // Check the KnativeService conditions + if ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" { + c.integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(c.integration, ready.Message) + return true, nil + } + + return false, nil +} + +func (c *knativeServiceController) getPodSpec() corev1.PodSpec { + return c.obj.Spec.Template.Spec.PodSpec +} + +func (c *knativeServiceController) updateReadyCondition(readyPods []corev1.Pod) bool { + ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady) + if ready.IsTrue() { + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "") + return true + } + setReadyCondition(c.integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage()) + + return false +}