This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit c55cad5c7179b6efc36a54c5b6e3de6c5f11cabd
Author: Antonin Stefanutti <anto...@stefanutti.fr>
AuthorDate: Tue Oct 26 14:51:56 2021 +0200

    feat: Report runtime health checks into Integration readiness condition
---
 config/rbac/operator-role.yaml            |   6 ++
 go.sum                                    |   1 -
 helm/camel-k/templates/operator-role.yaml |   6 ++
 pkg/apis/camel/v1/integration_types.go    |   2 +
 pkg/controller/integration/health.go      |  59 ++++++++++++++++
 pkg/controller/integration/monitor.go     | 110 ++++++++++++++++++++++++++----
 pkg/resources/resources.go                |   4 +-
 pkg/trait/trait_types.go                  |   6 +-
 8 files changed, 175 insertions(+), 19 deletions(-)

diff --git a/config/rbac/operator-role.yaml b/config/rbac/operator-role.yaml
index e619bbf..cc7fd7b 100644
--- a/config/rbac/operator-role.yaml
+++ b/config/rbac/operator-role.yaml
@@ -54,6 +54,12 @@ rules:
   verbs:
   - create
 - apiGroups:
+  - ""
+  resources:
+  - pods/proxy
+  verbs:
+  - get
+- apiGroups:
   - policy
   resources:
   - poddisruptionbudgets
diff --git a/go.sum b/go.sum
index d98d9d4..ea84466 100644
--- a/go.sum
+++ b/go.sum
@@ -1750,7 +1750,6 @@ k8s.io/code-generator v0.18.2/go.mod 
h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRV
 k8s.io/code-generator v0.18.6/go.mod 
h1:TgNEVx9hCyPGpdtCWA34olQYLkh3ok9ar7XfSsr8b6c=
 k8s.io/code-generator v0.19.2/go.mod 
h1:moqLn7w0t9cMs4+5CQyxnfA/HV8MF6aAVENF+WZZhgk=
 k8s.io/code-generator v0.21.1/go.mod 
h1:hUlps5+9QaTrKx+jiM4rmq7YmH8wPOIko64uZCHDh6Q=
-k8s.io/code-generator v0.21.4 h1:vO8jVuEGV4UF+/2s/88Qg05MokE/1QUFi/Q2YDgz++A=
 k8s.io/code-generator v0.21.4/go.mod 
h1:K3y0Bv9Cz2cOW2vXUrNZlFbflhuPvuadW6JdnN6gGKo=
 k8s.io/component-base v0.17.4/go.mod 
h1:5BRqHMbbQPm2kKu35v3G+CpVq4K0RJKC7TRioF0I9lE=
 k8s.io/component-base v0.18.2/go.mod 
h1:kqLlMuhJNHQ9lz8Z7V5bxUUtjFZnrypArGl58gmDfUM=
diff --git a/helm/camel-k/templates/operator-role.yaml 
b/helm/camel-k/templates/operator-role.yaml
index 1ec9512..3afbe47 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -55,6 +55,12 @@ rules:
   verbs:
   - create
 - apiGroups:
+  - ""
+  resources:
+  - pods/proxy
+  verbs:
+  - get
+- apiGroups:
   - policy
   resources:
   - poddisruptionbudgets
diff --git a/pkg/apis/camel/v1/integration_types.go 
b/pkg/apis/camel/v1/integration_types.go
index 0729413..cafaa57 100644
--- a/pkg/apis/camel/v1/integration_types.go
+++ b/pkg/apis/camel/v1/integration_types.go
@@ -197,6 +197,8 @@ const (
        IntegrationConditionLastJobSucceededReason string = "LastJobSucceeded"
        // IntegrationConditionLastJobFailedReason --
        IntegrationConditionLastJobFailedReason string = "LastJobFailed"
+       // IntegrationConditionRuntimeNotReadyReason --
+       IntegrationConditionRuntimeNotReadyReason string = "RuntimeNotReady"
        // IntegrationConditionErrorReason --
        IntegrationConditionErrorReason string = "Error"
 
diff --git a/pkg/controller/integration/health.go 
b/pkg/controller/integration/health.go
new file mode 100644
index 0000000..e14d241
--- /dev/null
+++ b/pkg/controller/integration/health.go
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package integration
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "time"
+
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/client-go/kubernetes"
+)
+
+type HealthCheckState string
+
+const (
+       HealthCheckStateDown HealthCheckState = "DOWN"
+       HealthCheckStateUp   HealthCheckState = "UP"
+)
+
+type HealthCheck struct {
+       Status HealthCheckState      `json:"state,omitempty"`
+       Checks []HealthCheckResponse `json:"checks,omitempty"`
+}
+
+type HealthCheckResponse struct {
+       Name   string                 `json:"name,omitempty"`
+       Status HealthCheckState       `json:"state,omitempty"`
+       Data   map[string]interface{} `json:"data,omitempty"`
+}
+
+func proxyGetHTTPProbe(ctx context.Context, c kubernetes.Interface, p 
*corev1.Probe, pod *corev1.Pod) ([]byte, error) {
+       if p.HTTPGet == nil {
+               return nil, fmt.Errorf("missing probe handler for %s/%s", 
pod.Namespace, pod.Name)
+       }
+
+       probeCtx, cancel := context.WithTimeout(ctx, 
time.Duration(p.TimeoutSeconds)*time.Second)
+       defer cancel()
+       params := make(map[string]string)
+       return c.CoreV1().Pods(pod.Namespace).
+               ProxyGet(strings.ToLower(string(p.HTTPGet.Scheme)), pod.Name, 
p.HTTPGet.Port.String(), p.HTTPGet.Path, params).
+               DoRaw(probeCtx)
+}
diff --git a/pkg/controller/integration/monitor.go 
b/pkg/controller/integration/monitor.go
index 2b01a7a..d4bd5ab 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -19,6 +19,7 @@ package integration
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "reflect"
        "strconv"
@@ -28,6 +29,7 @@ import (
        batchv1beta1 "k8s.io/api/batch/v1beta1"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/equality"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/selection"
 
@@ -41,6 +43,9 @@ import (
        "github.com/apache/camel-k/pkg/util/kubernetes"
 )
 
+// The key used for propagating error details from Camel health to 
MicroProfile Health (See CAMEL-17138)
+const runtimeHealthCheckErrorMessage = "error.message"
+
 func NewMonitorAction() Action {
        return &monitorAction{}
 }
@@ -161,6 +166,7 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
 func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx 
context.Context, environment *trait.Environment, integration *v1.Integration, 
pendingPods []corev1.Pod, runningPods []corev1.Pod) error {
        var controller ctrl.Object
        var lastCompletedJob *batchv1.Job
+       var podSpec corev1.PodSpec
 
        switch {
        case isConditionTrue(integration, 
v1.IntegrationConditionDeploymentAvailable):
@@ -189,6 +195,7 @@ func (action *monitorAction) 
updateIntegrationPhaseAndReadyCondition(ctx context
                        setReadyConditionError(integration, progressing.Message)
                        return nil
                }
+               podSpec = c.Spec.Template.Spec
 
        case *servingv1.Service:
                // Check the KnativeService conditions
@@ -197,6 +204,7 @@ func (action *monitorAction) 
updateIntegrationPhaseAndReadyCondition(ctx context
                        setReadyConditionError(integration, ready.Message)
                        return nil
                }
+               podSpec = c.Spec.Template.Spec.PodSpec
 
        case *batchv1beta1.CronJob:
                // Check latest job result
@@ -224,6 +232,7 @@ func (action *monitorAction) 
updateIntegrationPhaseAndReadyCondition(ctx context
                                }
                        }
                }
+               podSpec = c.Spec.JobTemplate.Spec.Template.Spec
        }
 
        // Check Pods statuses
@@ -274,6 +283,26 @@ func (action *monitorAction) 
updateIntegrationPhaseAndReadyCondition(ctx context
 
        integration.Status.Phase = v1.IntegrationPhaseRunning
 
+       var readyPods []corev1.Pod
+       var unreadyPods []corev1.Pod
+       for _, pod := range runningPods {
+               // We compare the Integration PodSpec to that of the Pod in 
order to make
+               // sure we account for up-to-date version.
+               if !equality.Semantic.DeepDerivative(podSpec, pod.Spec) {
+                       continue
+               }
+               ready := kubernetes.GetPodCondition(pod, corev1.PodReady)
+               if ready == nil {
+                       continue
+               }
+               switch ready.Status {
+               case corev1.ConditionTrue:
+                       readyPods = append(readyPods, pod)
+               case corev1.ConditionFalse:
+                       unreadyPods = append(unreadyPods, pod)
+               }
+       }
+
        switch c := controller.(type) {
        case *appsv1.Deployment:
                replicas := int32(1)
@@ -282,27 +311,18 @@ func (action *monitorAction) 
updateIntegrationPhaseAndReadyCondition(ctx context
                }
                // The Deployment status reports updated and ready replicas 
separately,
                // so that the number of ready replicas also accounts for older 
versions.
-               // We compare the Integration PodSpec to that of the Pod in 
order to make
-               // sure we account for up-to-date version.
-               var readyPods []corev1.Pod
-               for _, pod := range runningPods {
-                       if ready := kubernetes.GetPodCondition(pod, 
corev1.PodReady); ready == nil || ready.Status != corev1.ConditionTrue {
-                               continue
-                       }
-                       if 
equality.Semantic.DeepDerivative(c.Spec.Template.Spec, pod.Spec) {
-                               readyPods = append(readyPods, pod)
-                       }
-               }
                readyReplicas := int32(len(readyPods))
-               // The Integration is considered ready when the number of 
replicas
-               // reported to be ready is larger than or equal to the 
specified number
-               // of replicas. This avoids reporting a falsy readiness 
condition
-               // when the Integration is being down-scaled.
                switch {
                case readyReplicas >= replicas:
+                       // The Integration is considered ready when the number 
of replicas
+                       // reported to be ready is larger than or equal to the 
specified number
+                       // of replicas. This avoids reporting a falsy readiness 
condition
+                       // when the Integration is being down-scaled.
                        setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready 
replicas", c.Status.ReadyReplicas, replicas))
+
                case c.Status.UpdatedReplicas < replicas:
                        setReadyCondition(integration, corev1.ConditionFalse, 
v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated 
replicas", c.Status.UpdatedReplicas, replicas))
+
                default:
                        setReadyCondition(integration, corev1.ConditionFalse, 
v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready 
replicas", readyReplicas, replicas))
                }
@@ -319,19 +339,69 @@ func (action *monitorAction) 
updateIntegrationPhaseAndReadyCondition(ctx context
                switch {
                case c.Status.LastScheduleTime == nil:
                        setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
+
                case len(c.Status.Active) > 0:
                        setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionCronJobActiveReason, "cronjob active")
+
                case c.Spec.SuccessfulJobsHistoryLimit != nil && 
*c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil 
&& *c.Spec.FailedJobsHistoryLimit == 0:
                        setReadyCondition(integration, corev1.ConditionTrue, 
v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
+
                case lastCompletedJob != nil:
                        if complete := 
kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != 
nil && complete.Status == corev1.ConditionTrue {
                                setReadyCondition(integration, 
corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, 
fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name))
                        }
+
                default:
                        
integration.Status.SetCondition(v1.IntegrationConditionReady, 
corev1.ConditionUnknown, "", "")
                }
        }
 
+       // Finally, call the readiness probes of the non-ready Pods directly,
+       // to retrieve insights from the Camel runtime.
+       var runtimeNotReadyMessages []string
+       for _, pod := range unreadyPods {
+               if ready := kubernetes.GetPodCondition(pod, corev1.PodReady); 
ready.Reason != "ContainersNotReady" {
+                       continue
+               }
+               container := getIntegrationContainer(environment, &pod)
+               if container == nil {
+                       return fmt.Errorf("integration container not found in 
Pod %s/%s", pod.Namespace, pod.Name)
+               }
+               if probe := container.ReadinessProbe; probe != nil && 
probe.HTTPGet != nil {
+                       body, err := proxyGetHTTPProbe(ctx, action.client, 
probe, &pod)
+                       if err == nil {
+                               continue
+                       }
+                       if !k8serrors.IsServiceUnavailable(err) {
+                               return err
+                       }
+                       health := HealthCheck{}
+                       err = json.Unmarshal(body, &health)
+                       if err != nil {
+                               return err
+                       }
+                       for _, check := range health.Checks {
+                               if check.Name != "camel-readiness-checks" {
+                                       continue
+                               }
+                               if check.Status == HealthCheckStateUp {
+                                       continue
+                               }
+                               if _, ok := 
check.Data[runtimeHealthCheckErrorMessage]; ok {
+                                       integration.Status.Phase = 
v1.IntegrationPhaseError
+                               }
+                               runtimeNotReadyMessages = 
append(runtimeNotReadyMessages, fmt.Sprintf("Pod %s runtime is not ready: %s", 
pod.Name, check.Data))
+                       }
+               }
+       }
+       if len(runtimeNotReadyMessages) > 0 {
+               reason := v1.IntegrationConditionRuntimeNotReadyReason
+               if integration.Status.Phase == v1.IntegrationPhaseError {
+                       reason = v1.IntegrationConditionErrorReason
+               }
+               setReadyCondition(integration, corev1.ConditionFalse, reason, 
fmt.Sprintf("%s", runtimeNotReadyMessages))
+       }
+
        return nil
 }
 
@@ -357,6 +427,16 @@ func findHighestPriorityReadyKit(kits []v1.IntegrationKit) 
(*v1.IntegrationKit,
        return kit, nil
 }
 
+func getIntegrationContainer(environment *trait.Environment, pod *corev1.Pod) 
*corev1.Container {
+       name := environment.GetIntegrationContainerName()
+       for i, container := range pod.Spec.Containers {
+               if container.Name == name {
+                       return &pod.Spec.Containers[i]
+               }
+       }
+       return nil
+}
+
 func isConditionTrue(integration *v1.Integration, conditionType 
v1.IntegrationConditionType) bool {
        cond := integration.Status.GetCondition(conditionType)
        if cond == nil {
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index 9189dc6..f486ed7 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -371,9 +371,9 @@ var assets = func() http.FileSystem {
                "/rbac/operator-role.yaml": &vfsgen۰CompressedFileInfo{
                        name:             "operator-role.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 2311,
+                       uncompressedSize: 2376,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x95\x41\x8f\xdb\x36\x10\x85\xef\xfa\x15\x0f\xd6\x25\x29\xd6\x76\xdb\x53\xe1\x9e\xdc\x64\xb7\x35\x1a\xd8\xc0\xca\x69\x90\x23\x45\x8e\xe5\xe9\x52\x1c\x96\xa4\xec\x75\x7f\x7d\x41\xda\x6e\xbc\xf1\x2e\x90\x43\xd0\x54\x17\x0f\xa9\xd1\x9b\xef\x71\xc6\x52\x8d\xf1\xd7\xbb\xaa\x1a\xef\x58\x93\x8b\x64\x90\x04\x69\x4b\x98\x7b\xa5\xb7\x84\x46\x36\x69\xaf\x02\xe1\x4e\x06\x67\x54\x62\x71\x78\x35\x6f\xee\x5e\x63\x70\x86\x02\xc4\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xcc\x55\xc1\x6e\xe3\x36\x10\xbd\xeb\x2b\x1e\xac\xcb\x6e\x11\xdb\x6d\x4f\x85\x7b\x72\xb3\x49\x6b\x74\x61\x03\x91\xb7\x8b\x3d\x52\xd4\x58\x9e\x86\xe2\xb0\x43\x2a\x8e\xfb\xf5\x05\x65\xbb\xeb\xac\x13\x20\x87\x45\xb7\xba\x78\x48\x8d\xde\xbc\x37\xf3\x4c\x96\x18\x7f\xbd\xa7\x28\xf1\x9e\x2d\xf9\x48\x0d\x92\x20\x6d\x09\xf3\x60\xec\x96\x50\xc9\x26\xed\x8c\x12\x6e\xa5\xf7\x8d\x49\x2c\x1e\x6f\xe6\xd5\xed\x5b\xf4\xbe\x21\x85\x78\x
 [...]
                },
                "/rbac/patch-role-to-clusterrole.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             "patch-role-to-clusterrole.yaml",
diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go
index d4993b9..8c5a5d0 100644
--- a/pkg/trait/trait_types.go
+++ b/pkg/trait/trait_types.go
@@ -747,13 +747,17 @@ func (e *Environment) 
collectConfigurations(configurationType string) []map[stri
        return collectConfigurations(configurationType, e.Platform, 
e.IntegrationKit, e.Integration)
 }
 
-func (e *Environment) GetIntegrationContainer() *corev1.Container {
+func (e *Environment) GetIntegrationContainerName() string {
        containerName := defaultContainerName
        dt := e.Catalog.GetTrait(containerTraitID)
        if dt != nil {
                containerName = dt.(*containerTrait).Name
        }
+       return containerName
+}
 
+func (e *Environment) GetIntegrationContainer() *corev1.Container {
+       containerName := e.GetIntegrationContainerName()
        return e.Resources.GetContainerByName(containerName)
 }
 

Reply via email to