This is an automated email from the ASF dual-hosted git repository. astefanutti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit c55cad5c7179b6efc36a54c5b6e3de6c5f11cabd Author: Antonin Stefanutti <anto...@stefanutti.fr> AuthorDate: Tue Oct 26 14:51:56 2021 +0200 feat: Report runtime health checks into Integration readiness condition --- config/rbac/operator-role.yaml | 6 ++ go.sum | 1 - helm/camel-k/templates/operator-role.yaml | 6 ++ pkg/apis/camel/v1/integration_types.go | 2 + pkg/controller/integration/health.go | 59 ++++++++++++++++ pkg/controller/integration/monitor.go | 110 ++++++++++++++++++++++++++---- pkg/resources/resources.go | 4 +- pkg/trait/trait_types.go | 6 +- 8 files changed, 175 insertions(+), 19 deletions(-) diff --git a/config/rbac/operator-role.yaml b/config/rbac/operator-role.yaml index e619bbf..cc7fd7b 100644 --- a/config/rbac/operator-role.yaml +++ b/config/rbac/operator-role.yaml @@ -54,6 +54,12 @@ rules: verbs: - create - apiGroups: + - "" + resources: + - pods/proxy + verbs: + - get +- apiGroups: - policy resources: - poddisruptionbudgets diff --git a/go.sum b/go.sum index d98d9d4..ea84466 100644 --- a/go.sum +++ b/go.sum @@ -1750,7 +1750,6 @@ k8s.io/code-generator v0.18.2/go.mod h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRV k8s.io/code-generator v0.18.6/go.mod h1:TgNEVx9hCyPGpdtCWA34olQYLkh3ok9ar7XfSsr8b6c= k8s.io/code-generator v0.19.2/go.mod h1:moqLn7w0t9cMs4+5CQyxnfA/HV8MF6aAVENF+WZZhgk= k8s.io/code-generator v0.21.1/go.mod h1:hUlps5+9QaTrKx+jiM4rmq7YmH8wPOIko64uZCHDh6Q= -k8s.io/code-generator v0.21.4 h1:vO8jVuEGV4UF+/2s/88Qg05MokE/1QUFi/Q2YDgz++A= k8s.io/code-generator v0.21.4/go.mod h1:K3y0Bv9Cz2cOW2vXUrNZlFbflhuPvuadW6JdnN6gGKo= k8s.io/component-base v0.17.4/go.mod h1:5BRqHMbbQPm2kKu35v3G+CpVq4K0RJKC7TRioF0I9lE= k8s.io/component-base v0.18.2/go.mod h1:kqLlMuhJNHQ9lz8Z7V5bxUUtjFZnrypArGl58gmDfUM= diff --git a/helm/camel-k/templates/operator-role.yaml b/helm/camel-k/templates/operator-role.yaml index 1ec9512..3afbe47 100644 --- a/helm/camel-k/templates/operator-role.yaml +++ b/helm/camel-k/templates/operator-role.yaml @@ -55,6 +55,12 @@ rules: verbs: - create - apiGroups: + - "" + resources: + - pods/proxy + verbs: + - get +- apiGroups: - policy resources: - poddisruptionbudgets diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go index 0729413..cafaa57 100644 --- a/pkg/apis/camel/v1/integration_types.go +++ b/pkg/apis/camel/v1/integration_types.go @@ -197,6 +197,8 @@ const ( IntegrationConditionLastJobSucceededReason string = "LastJobSucceeded" // IntegrationConditionLastJobFailedReason -- IntegrationConditionLastJobFailedReason string = "LastJobFailed" + // IntegrationConditionRuntimeNotReadyReason -- + IntegrationConditionRuntimeNotReadyReason string = "RuntimeNotReady" // IntegrationConditionErrorReason -- IntegrationConditionErrorReason string = "Error" diff --git a/pkg/controller/integration/health.go b/pkg/controller/integration/health.go new file mode 100644 index 0000000..e14d241 --- /dev/null +++ b/pkg/controller/integration/health.go @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" +) + +type HealthCheckState string + +const ( + HealthCheckStateDown HealthCheckState = "DOWN" + HealthCheckStateUp HealthCheckState = "UP" +) + +type HealthCheck struct { + Status HealthCheckState `json:"state,omitempty"` + Checks []HealthCheckResponse `json:"checks,omitempty"` +} + +type HealthCheckResponse struct { + Name string `json:"name,omitempty"` + Status HealthCheckState `json:"state,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` +} + +func proxyGetHTTPProbe(ctx context.Context, c kubernetes.Interface, p *corev1.Probe, pod *corev1.Pod) ([]byte, error) { + if p.HTTPGet == nil { + return nil, fmt.Errorf("missing probe handler for %s/%s", pod.Namespace, pod.Name) + } + + probeCtx, cancel := context.WithTimeout(ctx, time.Duration(p.TimeoutSeconds)*time.Second) + defer cancel() + params := make(map[string]string) + return c.CoreV1().Pods(pod.Namespace). + ProxyGet(strings.ToLower(string(p.HTTPGet.Scheme)), pod.Name, p.HTTPGet.Port.String(), p.HTTPGet.Path, params). + DoRaw(probeCtx) +} diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 2b01a7a..d4bd5ab 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -19,6 +19,7 @@ package integration import ( "context" + "encoding/json" "fmt" "reflect" "strconv" @@ -28,6 +29,7 @@ import ( batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -41,6 +43,9 @@ import ( "github.com/apache/camel-k/pkg/util/kubernetes" ) +// The key used for propagating error details from Camel health to MicroProfile Health (See CAMEL-17138) +const runtimeHealthCheckErrorMessage = "error.message" + func NewMonitorAction() Action { return &monitorAction{} } @@ -161,6 +166,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error { var controller ctrl.Object var lastCompletedJob *batchv1.Job + var podSpec corev1.PodSpec switch { case isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable): @@ -189,6 +195,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context setReadyConditionError(integration, progressing.Message) return nil } + podSpec = c.Spec.Template.Spec case *servingv1.Service: // Check the KnativeService conditions @@ -197,6 +204,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context setReadyConditionError(integration, ready.Message) return nil } + podSpec = c.Spec.Template.Spec.PodSpec case *batchv1beta1.CronJob: // Check latest job result @@ -224,6 +232,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context } } } + podSpec = c.Spec.JobTemplate.Spec.Template.Spec } // Check Pods statuses @@ -274,6 +283,26 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context integration.Status.Phase = v1.IntegrationPhaseRunning + var readyPods []corev1.Pod + var unreadyPods []corev1.Pod + for _, pod := range runningPods { + // We compare the Integration PodSpec to that of the Pod in order to make + // sure we account for up-to-date version. + if !equality.Semantic.DeepDerivative(podSpec, pod.Spec) { + continue + } + ready := kubernetes.GetPodCondition(pod, corev1.PodReady) + if ready == nil { + continue + } + switch ready.Status { + case corev1.ConditionTrue: + readyPods = append(readyPods, pod) + case corev1.ConditionFalse: + unreadyPods = append(unreadyPods, pod) + } + } + switch c := controller.(type) { case *appsv1.Deployment: replicas := int32(1) @@ -282,27 +311,18 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context } // The Deployment status reports updated and ready replicas separately, // so that the number of ready replicas also accounts for older versions. - // We compare the Integration PodSpec to that of the Pod in order to make - // sure we account for up-to-date version. - var readyPods []corev1.Pod - for _, pod := range runningPods { - if ready := kubernetes.GetPodCondition(pod, corev1.PodReady); ready == nil || ready.Status != corev1.ConditionTrue { - continue - } - if equality.Semantic.DeepDerivative(c.Spec.Template.Spec, pod.Spec) { - readyPods = append(readyPods, pod) - } - } readyReplicas := int32(len(readyPods)) - // The Integration is considered ready when the number of replicas - // reported to be ready is larger than or equal to the specified number - // of replicas. This avoids reporting a falsy readiness condition - // when the Integration is being down-scaled. switch { case readyReplicas >= replicas: + // The Integration is considered ready when the number of replicas + // reported to be ready is larger than or equal to the specified number + // of replicas. This avoids reporting a falsy readiness condition + // when the Integration is being down-scaled. setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", c.Status.ReadyReplicas, replicas)) + case c.Status.UpdatedReplicas < replicas: setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas)) + default: setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) } @@ -319,19 +339,69 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context switch { case c.Status.LastScheduleTime == nil: setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created") + case len(c.Status.Active) > 0: setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active") + case c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0: setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available") + case lastCompletedJob != nil: if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue { setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name)) } + default: integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "") } } + // Finally, call the readiness probes of the non-ready Pods directly, + // to retrieve insights from the Camel runtime. + var runtimeNotReadyMessages []string + for _, pod := range unreadyPods { + if ready := kubernetes.GetPodCondition(pod, corev1.PodReady); ready.Reason != "ContainersNotReady" { + continue + } + container := getIntegrationContainer(environment, &pod) + if container == nil { + return fmt.Errorf("integration container not found in Pod %s/%s", pod.Namespace, pod.Name) + } + if probe := container.ReadinessProbe; probe != nil && probe.HTTPGet != nil { + body, err := proxyGetHTTPProbe(ctx, action.client, probe, &pod) + if err == nil { + continue + } + if !k8serrors.IsServiceUnavailable(err) { + return err + } + health := HealthCheck{} + err = json.Unmarshal(body, &health) + if err != nil { + return err + } + for _, check := range health.Checks { + if check.Name != "camel-readiness-checks" { + continue + } + if check.Status == HealthCheckStateUp { + continue + } + if _, ok := check.Data[runtimeHealthCheckErrorMessage]; ok { + integration.Status.Phase = v1.IntegrationPhaseError + } + runtimeNotReadyMessages = append(runtimeNotReadyMessages, fmt.Sprintf("Pod %s runtime is not ready: %s", pod.Name, check.Data)) + } + } + } + if len(runtimeNotReadyMessages) > 0 { + reason := v1.IntegrationConditionRuntimeNotReadyReason + if integration.Status.Phase == v1.IntegrationPhaseError { + reason = v1.IntegrationConditionErrorReason + } + setReadyCondition(integration, corev1.ConditionFalse, reason, fmt.Sprintf("%s", runtimeNotReadyMessages)) + } + return nil } @@ -357,6 +427,16 @@ func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, return kit, nil } +func getIntegrationContainer(environment *trait.Environment, pod *corev1.Pod) *corev1.Container { + name := environment.GetIntegrationContainerName() + for i, container := range pod.Spec.Containers { + if container.Name == name { + return &pod.Spec.Containers[i] + } + } + return nil +} + func isConditionTrue(integration *v1.Integration, conditionType v1.IntegrationConditionType) bool { cond := integration.Status.GetCondition(conditionType) if cond == nil { diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go index 9189dc6..f486ed7 100644 --- a/pkg/resources/resources.go +++ b/pkg/resources/resources.go @@ -371,9 +371,9 @@ var assets = func() http.FileSystem { "/rbac/operator-role.yaml": &vfsgen۰CompressedFileInfo{ name: "operator-role.yaml", modTime: time.Time{}, - uncompressedSize: 2311, + uncompressedSize: 2376, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x95\x41\x8f\xdb\x36\x10\x85\xef\xfa\x15\x0f\xd6\x25\x29\xd6\x76\xdb\x53\xe1\x9e\xdc\x64\xb7\x35\x1a\xd8\xc0\xca\x69\x90\x23\x45\x8e\xe5\xe9\x52\x1c\x96\xa4\xec\x75\x7f\x7d\x41\xda\x6e\xbc\xf1\x2e\x90\x43\xd0\x54\x17\x0f\xa9\xd1\x9b\xef\x71\xc6\x52\x8d\xf1\xd7\xbb\xaa\x1a\xef\x58\x93\x8b\x64\x90\x04\x69\x4b\x98\x7b\xa5\xb7\x84\x46\x36\x69\xaf\x02\xe1\x4e\x06\x67\x54\x62\x71\x78\x35\x6f\xee\x5e\x63\x70\x86\x02\xc4\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x55\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xac\xcb\x6e\x11\xdb\x6d\x4f\x85\x7b\x72\xb3\x49\x6b\x74\x61\x03\x91\xb7\x8b\x3d\x52\xd4\x58\x9e\x86\xe2\xb0\x43\x2a\x8e\xfb\xf5\x05\x65\xbb\xeb\xac\x13\x20\x87\x45\xb7\xba\x78\x48\x8d\xde\xbc\x37\xf3\x4c\x96\x18\x7f\xbd\xa7\x28\xf1\x9e\x2d\xf9\x48\x0d\x92\x20\x6d\x09\xf3\x60\xec\x96\x50\xc9\x26\xed\x8c\x12\x6e\xa5\xf7\x8d\x49\x2c\x1e\x6f\xe6\xd5\xed\x5b\xf4\xbe\x21\x85\x78\x [...] }, "/rbac/patch-role-to-clusterrole.yaml": &vfsgen۰CompressedFileInfo{ name: "patch-role-to-clusterrole.yaml", diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go index d4993b9..8c5a5d0 100644 --- a/pkg/trait/trait_types.go +++ b/pkg/trait/trait_types.go @@ -747,13 +747,17 @@ func (e *Environment) collectConfigurations(configurationType string) []map[stri return collectConfigurations(configurationType, e.Platform, e.IntegrationKit, e.Integration) } -func (e *Environment) GetIntegrationContainer() *corev1.Container { +func (e *Environment) GetIntegrationContainerName() string { containerName := defaultContainerName dt := e.Catalog.GetTrait(containerTraitID) if dt != nil { containerName = dt.(*containerTrait).Name } + return containerName +} +func (e *Environment) GetIntegrationContainer() *corev1.Container { + containerName := e.GetIntegrationContainerName() return e.Resources.GetContainerByName(containerName) }