This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch issue/3397 in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 047e361bd7a827e0525a4d563ca2563999b2140d Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Tue Jan 16 15:38:19 2024 +0100 fix(#4948): Move handling of IntegrationPlatform CR to a separate operator. Add a separate platformcontroller subcommand to kamel, amend install command and other installations (OLM, kustomize, helm) as needed. the operator dose not manage IntegrationPlatform crd any more. --- e2e/advanced/operator_metrics_test.go | 115 +++++---- e2e/support/test_support.go | 75 ++++-- helm/camel-k/templates/platformcontroller.yaml | 111 +++++++++ pkg/cmd/platformcontroller.go | 70 ++++++ pkg/cmd/platformcontroller/platformcontroller.go | 271 +++++++++++++++++++++ pkg/cmd/platformcontroller_test.go | 84 +++++++ pkg/cmd/root.go | 3 +- pkg/controller/add_integrationplatform.go | 2 +- ...ntegrationplatform.go => platformcontroller.go} | 19 +- pkg/install/common.go | 4 + pkg/install/operator.go | 30 ++- pkg/platform/operator.go | 16 ++ .../platformcontroller.go} | 19 +- pkg/resources/config/manager/kustomization.yaml | 1 + .../manager/platformcontroller-deployment.yaml | 89 +++++++ pkg/resources/config/prometheus/kustomization.yaml | 2 + .../platformcontroller-pod-monitor.yaml} | 20 +- .../platformcontroller-prometheus-rule.yaml | 55 +++++ 18 files changed, 893 insertions(+), 93 deletions(-) diff --git a/e2e/advanced/operator_metrics_test.go b/e2e/advanced/operator_metrics_test.go index 493051a4a..419a41cab 100644 --- a/e2e/advanced/operator_metrics_test.go +++ b/e2e/advanced/operator_metrics_test.go @@ -63,24 +63,24 @@ func TestMetrics(t *testing.T) { Should(Equal(corev1.ConditionTrue)) g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) - pod := OperatorPod(t, ctx, ns)() - g.Expect(pod).NotTo(BeNil()) + operatorPod := OperatorPod(t, ctx, ns)() + g.Expect(operatorPod).NotTo(BeNil()) + // pod.Namespace could be different from ns if using global operator - fmt.Printf("Fetching logs for operator pod %s in namespace %s", pod.Name, pod.Namespace) + fmt.Printf("Fetching logs for operator pod %s in namespace %s", operatorPod.Name, operatorPod.Namespace) logOptions := &corev1.PodLogOptions{ Container: "camel-k-operator", } - logs, err := StructuredLogs(t, ctx, pod.Namespace, pod.Name, logOptions, false) + logs, err := StructuredLogs(t, ctx, operatorPod.Namespace, operatorPod.Name, logOptions, false) g.Expect(err).To(BeNil()) g.Expect(logs).NotTo(BeEmpty()) - response, err := TestClient(t).CoreV1().RESTClient().Get(). - AbsPath(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/proxy/metrics", pod.Namespace, pod.Name)). - Timeout(30 * time.Second). - DoRaw(ctx) - g.Expect(err).To(BeNil()) - metrics, err := parsePrometheusData(response) - g.Expect(err).To(BeNil()) + platformcontrollerPod := PlatformcontrollerPod(t, ctx, ns)() + Expect(platformcontrollerPod).NotTo(BeNil()) + + operatorLogs, operatorMetrics := getLogsAndMetrics(t, ctx, operatorPod, "camel-k-operator") + + platformcontrollerLogs, platformcontrollerMetrics := getLogsAndMetrics(t, ctx, platformcontrollerPod, "camel-k-platformcontroller") it := Integration(t, ctx, ns, name)() g.Expect(it).NotTo(BeNil()) @@ -92,9 +92,9 @@ func TestMetrics(t *testing.T) { duration, err := time.ParseDuration(build.Status.Duration) g.Expect(err).To(BeNil()) - // Check it's consistent with the duration observed from logs + // Check it's consistent with the duration observed from operatorLogs var ts1, ts2 time.Time - err = NewLogWalker(&logs). + err = NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("State transition"), @@ -124,8 +124,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs((durationFromLogs - duration).Seconds())).To(BeNumerically("<", 10)) // Check the duration is observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_build_duration_seconds")) - g.Expect(metrics["camel_k_build_duration_seconds"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_duration_seconds")) + g.Expect(operatorMetrics["camel_k_build_duration_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_duration_seconds"), Help: stringP("Camel K build duration"), @@ -151,8 +151,8 @@ func TestMetrics(t *testing.T) { // Check there are no failures reported in the Build status g.Expect(build.Status.Failure).To(BeNil()) - // Check no recovery attempts are reported in the logs - recoveryAttempts, err := NewLogCounter(&logs).Count(MatchFields(IgnoreExtras, Fields{ + // Check no recovery attempts are reported in the operatorLogs + recoveryAttempts, err := NewLogCounter(&operatorLogs).Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": HavePrefix("Recovery attempt"), "Kind": Equal("Build"), @@ -162,8 +162,8 @@ func TestMetrics(t *testing.T) { g.Expect(recoveryAttempts).To(BeNumerically("==", 0)) // Check no recovery attempts are observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_build_recovery_attempts")) - g.Expect(metrics["camel_k_build_recovery_attempts"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_recovery_attempts")) + g.Expect(operatorMetrics["camel_k_build_recovery_attempts"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_recovery_attempts"), Help: stringP("Camel K build recovery attempts"), @@ -186,8 +186,8 @@ func TestMetrics(t *testing.T) { }) t.Run("reconciliation duration metric", func(t *testing.T) { - g.Expect(metrics).To(HaveKey("camel_k_reconciliation_duration_seconds")) - g.Expect(metrics["camel_k_reconciliation_duration_seconds"]).To(PointTo(MatchFields(IgnoreExtras, + g.Expect(platformcontrollerMetrics).To(HaveKey("camel_k_reconciliation_duration_seconds")) + g.Expect(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"]).To(PointTo(MatchFields(IgnoreExtras, Fields{ "Name": EqualP("camel_k_reconciliation_duration_seconds"), "Help": EqualP("Camel K reconciliation loop duration"), @@ -195,10 +195,10 @@ func TestMetrics(t *testing.T) { }, ))) - counter := NewLogCounter(&logs) + platformcontrollerCounter := NewLogCounter(&platformcontrollerLogs) // Count the number of IntegrationPlatform reconciliations - platformReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + platformReconciliations, err := platformcontrollerCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integrationplatform"), "Message": Equal("Reconciling IntegrationPlatform"), "RequestNamespace": Equal(ns), @@ -207,7 +207,7 @@ func TestMetrics(t *testing.T) { g.Expect(err).To(BeNil()) // Check it matches the observation in the corresponding metric - platformReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformReconciled := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -222,7 +222,7 @@ func TestMetrics(t *testing.T) { platformReconciledCount := *platformReconciled.Histogram.SampleCount g.Expect(platformReconciledCount).To(BeNumerically(">", 0)) - platformRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformRequeued := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -238,7 +238,7 @@ func TestMetrics(t *testing.T) { platformRequeuedCount = *platformRequeued.Histogram.SampleCount } - platformErrored := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + platformErrored := getMetric(platformcontrollerMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -259,8 +259,9 @@ func TestMetrics(t *testing.T) { g.Expect(platformReconciliations).To(BeNumerically("==", platformReconciledCount+platformRequeuedCount+platformErroredCount)) + operatorCounter := NewLogCounter(&operatorLogs) // Count the number of Integration reconciliations - integrationReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + integrationReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integration"), "Message": Equal("Reconciling Integration"), "RequestNamespace": Equal(it.Namespace), @@ -270,7 +271,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationReconciliations).To(BeNumerically(">", 0)) // Check it matches the observation in the corresponding metric - integrationReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -285,7 +286,7 @@ func TestMetrics(t *testing.T) { integrationReconciledCount := *integrationReconciled.Histogram.SampleCount g.Expect(integrationReconciledCount).To(BeNumerically(">", 0)) - integrationRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -301,7 +302,7 @@ func TestMetrics(t *testing.T) { integrationRequeuedCount = *integrationRequeued.Histogram.SampleCount } - integrationErrored := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationErrored := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -323,7 +324,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationReconciliations).To(BeNumerically("==", integrationReconciledCount+integrationRequeuedCount+integrationErroredCount)) // Count the number of IntegrationKit reconciliations - integrationKitReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + integrationKitReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integrationkit"), "Message": Equal("Reconciling IntegrationKit"), "RequestNamespace": Equal(it.Status.IntegrationKit.Namespace), @@ -333,7 +334,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciliations).To(BeNumerically(">", 0)) // Check it matches the observation in the corresponding metric - integrationKitReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationKitReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -349,7 +350,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciledCount).To(BeNumerically(">", 0)) // Kit can be requeued, above all when a catalog needs to be built - integrationKitRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + integrationKitRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -372,7 +373,7 @@ func TestMetrics(t *testing.T) { g.Expect(integrationKitReconciliations).To(BeNumerically("==", integrationKitReconciledCount+integrationKitRequeuedCount)) // Count the number of Build reconciliations - buildReconciliations, err := counter.Count(MatchFields(IgnoreExtras, Fields{ + buildReconciliations, err := operatorCounter.Count(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("Reconciling Build"), "RequestNamespace": Equal(build.Namespace), @@ -381,7 +382,7 @@ func TestMetrics(t *testing.T) { g.Expect(err).To(BeNil()) // Check it matches the observation in the corresponding metric - buildReconciled := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + buildReconciled := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -396,7 +397,7 @@ func TestMetrics(t *testing.T) { buildReconciledCount := *buildReconciled.Histogram.SampleCount g.Expect(buildReconciledCount).To(BeNumerically(">", 0)) - buildRequeued := getMetric(metrics["camel_k_reconciliation_duration_seconds"], + buildRequeued := getMetric(operatorMetrics["camel_k_reconciliation_duration_seconds"], MatchFieldsP(IgnoreExtras, Fields{ "Label": ConsistOf( label("group", v1.SchemeGroupVersion.Group), @@ -423,8 +424,8 @@ func TestMetrics(t *testing.T) { // The start queuing time is taken from the creation time ts1 = build.CreationTimestamp.Time - // Retrieve the end queuing time from the logs - err = NewLogWalker(&logs). + // Retrieve the end queuing time from the operatorLogs + err := NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.build"), "Message": Equal("State transition"), @@ -440,8 +441,8 @@ func TestMetrics(t *testing.T) { durationFromLogs := ts2.Sub(ts1) // Retrieve the queuing duration from the metric - g.Expect(metrics).To(HaveKey("camel_k_build_queue_duration_seconds")) - metric := metrics["camel_k_build_queue_duration_seconds"].Metric + g.Expect(operatorMetrics).To(HaveKey("camel_k_build_queue_duration_seconds")) + metric := operatorMetrics["camel_k_build_queue_duration_seconds"].Metric g.Expect(metric).To(HaveLen(1)) histogram := metric[0].Histogram g.Expect(histogram).NotTo(BeNil()) @@ -453,7 +454,7 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs(durationFromLogs.Seconds() - duration)).To(BeNumerically("<", 1)) // Check the queuing duration is correctly observed in the corresponding metric - g.Expect(metrics["camel_k_build_queue_duration_seconds"]).To(EqualP( + g.Expect(operatorMetrics["camel_k_build_queue_duration_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_build_queue_duration_seconds"), Help: stringP("Camel K build queue duration"), @@ -486,8 +487,8 @@ func TestMetrics(t *testing.T) { duration := ts2.Sub(ts1) - // Retrieve these start and end times from the logs - err = NewLogWalker(&logs). + // Retrieve these start and end times from the operatorLogs + err := NewLogWalker(&operatorLogs). AddStep(MatchFields(IgnoreExtras, Fields{ "LoggerName": Equal("camel-k.controller.integration"), "Message": Equal("Reconciling Integration"), @@ -511,8 +512,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs((durationFromLogs - duration).Seconds())).To(BeNumerically("<=", 1)) // Retrieve the first readiness duration from the metric - g.Expect(metrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) - metric := metrics["camel_k_integration_first_readiness_seconds"].Metric + g.Expect(operatorMetrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) + metric := operatorMetrics["camel_k_integration_first_readiness_seconds"].Metric g.Expect(metric).To(HaveLen(1)) histogram := metric[0].Histogram g.Expect(histogram).NotTo(BeNil()) @@ -522,8 +523,8 @@ func TestMetrics(t *testing.T) { g.Expect(math.Abs(*histogram.SampleSum - d)).To(BeNumerically("<=", 1)) // Check the duration is correctly observed in the corresponding metric - g.Expect(metrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) - g.Expect(metrics["camel_k_integration_first_readiness_seconds"]).To(EqualP( + g.Expect(operatorMetrics).To(HaveKey("camel_k_integration_first_readiness_seconds")) + g.Expect(operatorMetrics["camel_k_integration_first_readiness_seconds"]).To(EqualP( prometheus.MetricFamily{ Name: stringP("camel_k_integration_first_readiness_seconds"), Help: stringP("Camel K integration time to first readiness"), @@ -543,6 +544,26 @@ func TestMetrics(t *testing.T) { }) } +func getLogsAndMetrics(t *testing.T, ctx context.Context, componentPod *corev1.Pod, containerName string) ([]LogEntry, map[string]*prometheus.MetricFamily) { + // componentPod.Namespace could be different from ns if using global operator + fmt.Printf("Fetching logs for component pod %s in namespace %s", componentPod.Name, componentPod.Namespace) + logOptions := &corev1.PodLogOptions{ + Container: containerName, + } + logs, err := StructuredLogs(t, ctx, componentPod.Namespace, componentPod.Name, logOptions, false) + Expect(err).To(BeNil()) + Expect(logs).NotTo(BeEmpty()) + + response, err := TestClient(t).CoreV1().RESTClient().Get(). + AbsPath(fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/proxy/metrics", componentPod.Namespace, componentPod.Name)). + Timeout(30 * time.Second). + DoRaw(ctx) + Expect(err).To(BeNil()) + metrics, err := parsePrometheusData(response) + Expect(err).To(BeNil()) + return logs, metrics +} + func getMetric(family *prometheus.MetricFamily, matcher types.GomegaMatcher) *prometheus.Metric { for _, metric := range family.Metric { if match, err := matcher.Match(metric); err != nil { diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 4bef76677..881a9b9c3 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -2275,7 +2275,49 @@ func ConsoleCLIDownload(t *testing.T, ctx context.Context, name string) func() * } } -func operatorPods(t *testing.T, ctx context.Context, ns string) []corev1.Pod { +func OperatorPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Pod { + return componentPod(t, ctx, ns, "operator") +} + +// Return the first global operator Pod found in the cluster (if any). +func OperatorPodGlobal(t *testing.T, ctx context.Context) func() *corev1.Pod { + return func() *corev1.Pod { + pods := operatorPods(t, ctx, "") + for _, pod := range pods { + for _, envVar := range pod.Spec.Containers[0].Env { + if envVar.Name == "WATCH_NAMESPACE" { + if envVar.Value == "" { + return &pod + } + } + } + } + return nil + } +} + +func PlatformcontrollerPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Pod { + return componentPod(t, ctx, ns, "platformcontroller") +} + +// Return the first global platformcontoller Pod found in the cluster (if any). +func PlatformcontrollerPodGlobal(t *testing.T, ctx context.Context) func() *corev1.Pod { + return func() *corev1.Pod { + pods := platformcontrollerPods(t, ctx, "") + for _, pod := range pods { + for _, envVar := range pod.Spec.Containers[0].Env { + if envVar.Name == "WATCH_NAMESPACE" { + if envVar.Value == "" { + return &pod + } + } + } + } + return nil + } +} + +func componentPods(t *testing.T, ctx context.Context, ns string, componentLabelValue string) []corev1.Pod { lst := corev1.PodList{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", @@ -2284,7 +2326,7 @@ func operatorPods(t *testing.T, ctx context.Context, ns string) []corev1.Pod { } opts := []ctrl.ListOption{ ctrl.MatchingLabels{ - "camel.apache.org/component": "operator", + "camel.apache.org/component": componentLabelValue, }, } if ns != "" { @@ -2299,28 +2341,19 @@ func operatorPods(t *testing.T, ctx context.Context, ns string) []corev1.Pod { return lst.Items } -func OperatorPod(t *testing.T, ctx context.Context, ns string) func() *corev1.Pod { - return func() *corev1.Pod { - pods := operatorPods(t, ctx, ns) - if len(pods) > 0 { - return &pods[0] - } - return nil - } +func operatorPods(t *testing.T, ctx context.Context, ns string) []corev1.Pod { + return componentPods(t, ctx, ns, "operator") } -// Return the first global operator Pod found in the cluster (if any). -func OperatorPodGlobal(t *testing.T, ctx context.Context) func() *corev1.Pod { +func platformcontrollerPods(t *testing.T, ctx context.Context, ns string) []corev1.Pod { + return componentPods(t, ctx, ns, "platformcontroller") +} + +func componentPod(t *testing.T, ctx context.Context, ns string, componentLabelValue string) func() *corev1.Pod { return func() *corev1.Pod { - pods := operatorPods(t, ctx, "") - for _, pod := range pods { - for _, envVar := range pod.Spec.Containers[0].Env { - if envVar.Name == "WATCH_NAMESPACE" { - if envVar.Value == "" { - return &pod - } - } - } + pods := componentPods(t, ctx, ns, componentLabelValue) + if len(pods) > 0 { + return &pods[0] } return nil } diff --git a/helm/camel-k/templates/platformcontroller.yaml b/helm/camel-k/templates/platformcontroller.yaml new file mode 100644 index 000000000..bb50211c2 --- /dev/null +++ b/helm/camel-k/templates/platformcontroller.yaml @@ -0,0 +1,111 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: camel-k + camel.apache.org/component: platformcontroller + {{- include "camel-k.labels" . | nindent 4 }} + {{- with .Values.operator.annotations }} + annotations: + {{ toYaml . | nindent 4 }} + {{- end }} + name: camel-k-platformcontroller +spec: + replicas: 1 + selector: + matchLabels: + name: camel-k-platformcontroller + strategy: + type: Recreate + template: + metadata: + labels: + app: camel-k + camel.apache.org/component: platformcontroller + name: camel-k-platformcontroller + spec: + {{- if .Values.operator.imagePullSecrets }} + imagePullSecrets: +{{ toYaml .Values.operator.imagePullSecrets | indent 8 }} + {{- end }} + + containers: + - command: + - kamel + - platformcontroller + env: + - name: WATCH_NAMESPACE + {{- if eq .Values.operator.global "false" }} + valueFrom: + fieldRef: + fieldPath: metadata.namespace + {{- else }} + value: "" + {{- end }} + - name: LOG_LEVEL + value: {{ .Values.operator.logLevel }} + - name: OPERATOR_NAME + value: camel-k-platformcontroller + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: KAMEL_OPERATOR_ID + value: {{ .Values.operator.operatorId }} + image: {{ .Values.operator.image }} + imagePullPolicy: IfNotPresent + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 20 + periodSeconds: 10 + name: camel-k-platformcontroller + ports: + - containerPort: 8080 + name: metrics + {{- with .Values.operator.resources }} + resources: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- if .Values.operator.securityContext }} + {{- with .Values.operator.securityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- else }} + securityContext: + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + {{- end }} + serviceAccountName: camel-k-operator + {{- with .Values.operator.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/pkg/cmd/platformcontroller.go b/pkg/cmd/platformcontroller.go new file mode 100644 index 000000000..e2815b82a --- /dev/null +++ b/pkg/cmd/platformcontroller.go @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "github.com/apache/camel-k/v2/pkg/cmd/platformcontroller" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/defaults" + "github.com/spf13/cobra" +) + +const ( + platformcontrollerCommand = "platformcontroller" +) + +func newCmdPlatformController(rootCmdOptions *RootCmdOptions) (*cobra.Command, *platformcontrollerCmdOptions) { + options := platformcontrollerCmdOptions{} + + cmd := cobra.Command{ + Use: "platformcontroller", + Short: "Run the Camel K platform controller", + Long: `Run the Camel K platform controller`, + Hidden: true, + PreRunE: decode(&options, rootCmdOptions.Flags), + Run: options.run, + } + + cmd.Flags().Int32("health-port", defaultHealthPort, "The port of the health endpoint") + cmd.Flags().Int32("monitoring-port", defaultMonitoringPort, "The port of the metrics endpoint") + cmd.Flags().Bool("leader-election", true, "Use leader election") + cmd.Flags().String("leader-election-id", "", "Use the given ID as the leader election Lease name") + + return &cmd, &options +} + +type platformcontrollerCmdOptions struct { + HealthPort int32 `mapstructure:"health-port"` + MonitoringPort int32 `mapstructure:"monitoring-port"` + LeaderElection bool `mapstructure:"leader-election"` + LeaderElectionID string `mapstructure:"leader-election-id"` +} + +func (o *platformcontrollerCmdOptions) run(_ *cobra.Command, _ []string) { + + leaderElectionID := o.LeaderElectionID + if leaderElectionID == "" { + if defaults.OperatorID() != "" { + leaderElectionID = platform.GetPlatformControllerLockName(defaults.OperatorID()) + } else { + leaderElectionID = platform.PlatformControllerLockName + } + } + + platformcontroller.Run(o.HealthPort, o.MonitoringPort, o.LeaderElection, leaderElectionID) +} diff --git a/pkg/cmd/platformcontroller/platformcontroller.go b/pkg/cmd/platformcontroller/platformcontroller.go new file mode 100644 index 000000000..90b5f70a2 --- /dev/null +++ b/pkg/cmd/platformcontroller/platformcontroller.go @@ -0,0 +1,271 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package platformcontroller + +import ( + "context" + "flag" + "fmt" + "os" + "reflect" + "runtime" + "strconv" + "strings" + "time" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/klog/v2" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + "go.uber.org/automaxprocs/maxprocs" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "sigs.k8s.io/controller-runtime/pkg/cache" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/healthz" + logf "sigs.k8s.io/controller-runtime/pkg/log" + zapctrl "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + + "github.com/apache/camel-k/v2/pkg/apis" + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/controller" + "github.com/apache/camel-k/v2/pkg/install" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/defaults" + "github.com/apache/camel-k/v2/pkg/util/kubernetes" + logutil "github.com/apache/camel-k/v2/pkg/util/log" +) + +var log = logutil.Log.WithName("cmd") + +func printVersion() { + log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + log.Info(fmt.Sprintf("Camel K Platform Controller Version: %v", defaults.Version)) + log.Info(fmt.Sprintf("Camel K Default Runtime Version: %v", defaults.DefaultRuntimeVersion)) + log.Info(fmt.Sprintf("Camel K Git Commit: %v", defaults.GitCommit)) + log.Info(fmt.Sprintf("Camel K Platform Controller ID: %v", defaults.OperatorID())) + + // Will only appear if DEBUG level has been enabled using the env var LOG_LEVEL + log.Debug("*** DEBUG level messages will be logged ***") +} + +// Run starts the Camel K platform controller. +func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) { + + flag.Parse() + + // The logger instantiated here can be changed to any logger + // implementing the logr.Logger interface. This logger will + // be propagated through the whole operator, generating + // uniform and structured logs. + + // The constants specified here are zap specific + var logLevel zapcore.Level + logLevelVal, ok := os.LookupEnv("LOG_LEVEL") + if ok { + switch strings.ToLower(logLevelVal) { + case "error": + logLevel = zapcore.ErrorLevel + case "info": + logLevel = zapcore.InfoLevel + case "debug": + logLevel = zapcore.DebugLevel + default: + customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal)) + exitOnError(err, "Invalid log-level") + // Need to multiply by -1 to turn logr expected level into zap level + logLevel = zapcore.Level(int8(customLevel) * -1) + } + } else { + logLevel = zapcore.InfoLevel + } + + // Use and set atomic level that all following log events are compared with + // in order to evaluate if a given log level on the event is enabled. + logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) { + o.Development = false + o.Level = zap.NewAtomicLevelAt(logLevel) + })) + + klog.SetLogger(log.AsLogger()) + + _, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) })) + if err != nil { + log.Error(err, "failed to set GOMAXPROCS from cgroups") + } + + printVersion() + + watchNamespace, err := getWatchNamespace() + exitOnError(err, "failed to get watch namespace") + + ctx := signals.SetupSignalHandler() + + cfg, err := config.GetConfig() + exitOnError(err, "cannot get client config") + // Increase maximum burst that is used by client-side throttling, + // to prevent the requests made to apply the bundled Kamelets + // from being throttled. + cfg.QPS = 20 + cfg.Burst = 200 + bootstrapClient, err := client.NewClientWithConfig(false, cfg) + exitOnError(err, "cannot initialize client") + + platformcontrollerNamespace := platform.GetPlatformControllerNamespace() + if platformcontrollerNamespace == "" { + // Fallback to using the watch namespace when the platform controller is not in-cluster. + // It does not support local (off-cluster) platform controller watching resources globally, + // in which case it's not possible to determine a namespace. + platformcontrollerNamespace = watchNamespace + if platformcontrollerNamespace == "" { + leaderElection = false + log.Info("unable to determine namespace for leader election") + } + } + + // Set the platform controller container image if it runs in-container + platform.PlatformControllerImage, err = getPlatformControllerImage(ctx, bootstrapClient) + exitOnError(err, "cannot get platform controller container image") + + if !leaderElection { + log.Info("Leader election is disabled!") + } + + hasIntegrationLabel, err := labels.NewRequirement(v1.IntegrationLabel, selection.Exists, []string{}) + exitOnError(err, "cannot create Integration label selector") + labelsSelector := labels.NewSelector().Add(*hasIntegrationLabel) + + selector := cache.ByObject{ + Label: labelsSelector, + } + + if !platform.IsCurrentOperatorGlobal() { + selector = cache.ByObject{ + Label: labelsSelector, + Namespaces: getNamespacesSelector(platformcontrollerNamespace, watchNamespace), + } + } + + selectors := map[ctrl.Object]cache.ByObject{ + &corev1.Pod{}: selector, + &appsv1.Deployment{}: selector, + &batchv1.Job{}: selector, + } + + if ok, err := kubernetes.IsAPIResourceInstalled(bootstrapClient, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); ok && err == nil { + selectors[&servingv1.Service{}] = selector + } + + if ok, err := kubernetes.IsAPIResourceInstalled(bootstrapClient, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { + selectors[&batchv1.CronJob{}] = selector + } + + options := cache.Options{ + ByObject: selectors, + } + + if !platform.IsCurrentOperatorGlobal() { + options.DefaultNamespaces = getNamespacesSelector(platformcontrollerNamespace, watchNamespace) + } + + mgr, err := manager.New(cfg, manager.Options{ + LeaderElection: leaderElection, + LeaderElectionNamespace: platformcontrollerNamespace, + LeaderElectionID: leaderElectionID, + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaderElectionReleaseOnCancel: true, + HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), + Metrics: metricsserver.Options{BindAddress: ":" + strconv.Itoa(int(monitoringPort))}, + Cache: options, + }) + exitOnError(err, "") + + log.Info("Configuring manager") + exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check") + exitOnError(apis.AddToScheme(mgr.GetScheme()), "") + ctrlClient, err := client.FromManager(mgr) + exitOnError(err, "") + exitOnError(controller.AddToPlatformManager(ctx, mgr, ctrlClient), "") + + log.Info("Installing platform manager resources") + installCtx, installCancel := context.WithTimeout(ctx, 1*time.Minute) + defer installCancel() + install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, platformcontrollerNamespace, log) + + log.Info("Starting the platform manager") + exitOnError(mgr.Start(ctx), "platform manager exited non-zero") +} + +func getNamespacesSelector(operatorNamespace string, watchNamespace string) map[string]cache.Config { + namespacesSelector := map[string]cache.Config{ + operatorNamespace: {}, + } + if operatorNamespace != watchNamespace { + namespacesSelector[watchNamespace] = cache.Config{} + } + return namespacesSelector +} + +// getWatchNamespace returns the Namespace the platform controller should be watching for changes. +func getWatchNamespace() (string, error) { + ns, found := os.LookupEnv(platform.PlatformControllerWatchNamespaceEnvVariable) + if !found { + return "", fmt.Errorf("%s must be set", platform.PlatformControllerWatchNamespaceEnvVariable) + } + return ns, nil +} + +// getPlatformControllerImage returns the image currently used by the running platform controller if present (when running out of cluster, it may be absent). +func getPlatformControllerImage(ctx context.Context, c ctrl.Reader) (string, error) { + ns := platform.GetPlatformControllerNamespace() + name := platform.GetOperatorPodName() + if ns == "" || name == "" { + return "", nil + } + + pod := corev1.Pod{} + if err := c.Get(ctx, ctrl.ObjectKey{Namespace: ns, Name: name}, &pod); err != nil && k8serrors.IsNotFound(err) { + return "", nil + } else if err != nil { + return "", err + } + if len(pod.Spec.Containers) == 0 { + return "", fmt.Errorf("no containers found in platform controller pod") + } + return pod.Spec.Containers[0].Image, nil +} + +func exitOnError(err error, msg string) { + if err != nil { + log.Error(err, msg) + os.Exit(1) + } +} diff --git a/pkg/cmd/platformcontroller_test.go b/pkg/cmd/platformcontroller_test.go new file mode 100644 index 000000000..1037d1901 --- /dev/null +++ b/pkg/cmd/platformcontroller_test.go @@ -0,0 +1,84 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "testing" + + "github.com/apache/camel-k/v2/pkg/util/test" + "github.com/spf13/cobra" + + "github.com/stretchr/testify/assert" +) + +const cmdPlatformcontroller = "platformcontroller" + +// nolint: unparam +func initializePlatformcontrollerCmdOptions(t *testing.T) (*platformcontrollerCmdOptions, *cobra.Command, RootCmdOptions) { + t.Helper() + + options, rootCmd := kamelTestPreAddCommandInit() + platformcontrollerCmdOptions := addTestPlatformcontrollerCmd(*options, rootCmd) + kamelTestPostAddCommandInit(t, rootCmd, options) + + return platformcontrollerCmdOptions, rootCmd, *options +} + +// nolint: unparam +func addTestPlatformcontrollerCmd(options RootCmdOptions, rootCmd *cobra.Command) *platformcontrollerCmdOptions { + // add a testing version of operator Command + platformcontrollerCmd, platformcontrollerOptions := newCmdPlatformController(&options) + platformcontrollerCmd.RunE = func(c *cobra.Command, args []string) error { + return nil + } + platformcontrollerCmd.PostRunE = func(c *cobra.Command, args []string) error { + return nil + } + platformcontrollerCmd.Args = test.ArbitraryArgs + rootCmd.AddCommand(platformcontrollerCmd) + return platformcontrollerOptions +} + +func TestPlatformcontrollerNoFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller) + assert.Nil(t, err) + // Check default expected values + assert.Equal(t, int32(8081), operatorCmdOptions.HealthPort) + assert.Equal(t, int32(8080), operatorCmdOptions.MonitoringPort) +} + +func TestPlatformcontrollerNonExistingFlag(t *testing.T) { + _, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--nonExistingFlag") + assert.NotNil(t, err) +} + +func TestPlatformcontrollerHealthPortFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--health-port", "7171") + assert.Nil(t, err) + assert.Equal(t, int32(7171), operatorCmdOptions.HealthPort) +} + +func TestPlatformcontrollerMonitoringPortFlag(t *testing.T) { + operatorCmdOptions, rootCmd, _ := initializePlatformcontrollerCmdOptions(t) + _, err := test.ExecuteCommand(rootCmd, cmdPlatformcontroller, "--monitoring-port", "7172") + assert.Nil(t, err) + assert.Equal(t, int32(7172), operatorCmdOptions.MonitoringPort) +} diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index 4ce7f4bcd..1ac1425c7 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -147,6 +147,7 @@ func addKamelSubcommands(cmd *cobra.Command, options *RootCmdOptions) { cmd.AddCommand(newCmdDescribe(options)) cmd.AddCommand(cmdOnly(newCmdRebuild(options))) cmd.AddCommand(cmdOnly(newCmdOperator(options))) + cmd.AddCommand(cmdOnly(newCmdPlatformController(options))) cmd.AddCommand(cmdOnly(newCmdBuilder(options))) cmd.AddCommand(cmdOnly(newCmdDebug(options))) cmd.AddCommand(cmdOnly(newCmdDump(options))) @@ -201,7 +202,7 @@ func (command *RootCmdOptions) preRun(cmd *cobra.Command, _ []string) error { // reconciled. Hence the compatibility check is skipped for the install and the operator command. // Furthermore, there can be any incompatibilities, as the install command deploys // the operator version it's compatible with. - if cmd.Use != builderCommand && cmd.Use != installCommand && cmd.Use != operatorCommand { + if cmd.Use != builderCommand && cmd.Use != installCommand && cmd.Use != operatorCommand && cmd.Use != platformcontrollerCommand { checkAndShowCompatibilityWarning(command.Context, cmd, c, command.Namespace) } } diff --git a/pkg/controller/add_integrationplatform.go b/pkg/controller/add_integrationplatform.go index 16b88f91a..b0348bc52 100644 --- a/pkg/controller/add_integrationplatform.go +++ b/pkg/controller/add_integrationplatform.go @@ -22,5 +22,5 @@ import ( ) func init() { - addToManager = append(addToManager, integrationplatform.Add) + addToPlatformManager = append(addToPlatformManager, integrationplatform.Add) } diff --git a/pkg/controller/add_integrationplatform.go b/pkg/controller/platformcontroller.go similarity index 58% copy from pkg/controller/add_integrationplatform.go copy to pkg/controller/platformcontroller.go index 16b88f91a..661e048bc 100644 --- a/pkg/controller/add_integrationplatform.go +++ b/pkg/controller/platformcontroller.go @@ -18,9 +18,22 @@ limitations under the License. package controller import ( - "github.com/apache/camel-k/v2/pkg/controller/integrationplatform" + "context" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/apache/camel-k/v2/pkg/client" ) -func init() { - addToManager = append(addToManager, integrationplatform.Add) +// addToPlatformManager is a list of functions to add Controllers to the PlatformManager. +var addToPlatformManager []func(context.Context, ctrl.Manager, client.Client) error + +// AddToPlatformManager adds Controllers to the PlatformManager. +func AddToPlatformManager(ctx context.Context, manager ctrl.Manager, client client.Client) error { + for _, f := range addToPlatformManager { + if err := f(ctx, manager, client); err != nil { + return err + } + } + return nil } diff --git a/pkg/install/common.go b/pkg/install/common.go index 1c0b715a8..d82c00a05 100644 --- a/pkg/install/common.go +++ b/pkg/install/common.go @@ -37,6 +37,10 @@ import ( const serviceAccountName = "camel-k-operator" +func getComponentsNames() []string { + return []string{"operator", "platformcontroller"} +} + // ResourceCustomizer can be used to inject code that changes the objects before they are created. type ResourceCustomizer func(object ctrl.Object) ctrl.Object diff --git a/pkg/install/operator.go b/pkg/install/operator.go index f6075bdae..2760613e7 100644 --- a/pkg/install/operator.go +++ b/pkg/install/operator.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" + "github.com/apache/camel-k/v2/pkg/util" "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" @@ -98,7 +99,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, customizer := func(o ctrl.Object) ctrl.Object { if cfg.CustomImage != "" { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].Image = cfg.CustomImage } } @@ -106,7 +107,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.CustomImagePullPolicy != "" { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullPolicy(cfg.CustomImagePullPolicy) } } @@ -114,7 +115,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.Tolerations != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { tolerations, err := kubernetes.NewTolerations(cfg.Tolerations) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured tolerations!") @@ -126,7 +127,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.ResourcesRequirements != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { resourceReq, err := kubernetes.NewResourceRequirements(cfg.ResourcesRequirements) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured resources requests!") @@ -140,7 +141,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.EnvVars != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { envVars, _, _, err := env.ParseEnv(cfg.EnvVars, nil) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse environment variables!") @@ -156,7 +157,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.NodeSelectors != nil { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { nodeSelector, err := kubernetes.NewNodeSelectors(cfg.NodeSelectors) if err != nil { fmt.Fprintln(cmd.ErrOrStderr(), "Warning: could not parse the configured node selectors!") @@ -167,7 +168,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, } if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { // Metrics endpoint port d.Spec.Template.Spec.Containers[0].Args = append(d.Spec.Template.Spec.Containers[0].Args, fmt.Sprintf("--monitoring-port=%d", cfg.Monitoring.Port)) @@ -180,7 +181,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, } if cfg.Debugging.Enabled { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { d.Spec.Template.Spec.Containers[0].Command = []string{"dlv", fmt.Sprintf("--listen=:%d", cfg.Debugging.Port), "--headless=true", "--api-version=2", "exec", cfg.Debugging.Path, "--", "operator", "--leader-election=false"} @@ -197,7 +198,7 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, if cfg.Global { if d, ok := o.(*appsv1.Deployment); ok { - if d.Labels["camel.apache.org/component"] == "operator" { + if util.StringSliceContainsAnyOf(getComponentsNames(), d.Labels["camel.apache.org/component"]) { // Make the operator watch all namespaces envvar.SetVal(&d.Spec.Template.Spec.Containers[0].Env, "WATCH_NAMESPACE", "") } @@ -274,6 +275,11 @@ func OperatorOrCollect(ctx context.Context, cmd *cobra.Command, c client.Client, return err } + // Deploy the platformcontroller + if err := installPlatformcontroller(ctx, c, cfg.Namespace, customizer, collection, force); err != nil { + return err + } + if err = installEvents(ctx, c, cfg.Namespace, customizer, collection, force, cfg.Global); err != nil { if k8serrors.IsAlreadyExists(err) { return err @@ -500,6 +506,12 @@ func installOperator(ctx context.Context, c client.Client, namespace string, cus ) } +func installPlatformcontroller(ctx context.Context, c client.Client, namespace string, customizer ResourceCustomizer, collection *kubernetes.Collection, force bool) error { + return ResourcesOrCollect(ctx, c, namespace, collection, force, customizer, + "/config/manager/platformcontroller-deployment.yaml", + ) +} + func installKnativeBindings(ctx context.Context, c client.Client, namespace string, customizer ResourceCustomizer, collection *kubernetes.Collection, force bool, global bool) error { if global { return ResourcesOrCollect(ctx, c, namespace, collection, force, customizer, diff --git a/pkg/platform/operator.go b/pkg/platform/operator.go index e9bebd009..9579ae7f9 100644 --- a/pkg/platform/operator.go +++ b/pkg/platform/operator.go @@ -94,6 +94,14 @@ func GetOperatorNamespace() string { return "" } +// GetPlatformControllerNamespace returns the namespace where the current platform controller is located (if set). +func GetPlatformControllerNamespace() string { + if podNamespace, envSet := os.LookupEnv(platformControllerNamespaceEnvVariable); envSet { + return podNamespace + } + return "" +} + // GetOperatorPodName returns the pod that is running the current operator (if any). func GetOperatorPodName() string { if podName, envSet := os.LookupEnv(operatorPodNameEnvVariable); envSet { @@ -102,6 +110,14 @@ func GetOperatorPodName() string { return "" } +// GetPlatformControllerPodName returns the pod that is running the current platform controller (if any). +func GetPlatformControllerPodName() string { + if podName, envSet := os.LookupEnv(platformControllerPodNameEnvVariable); envSet { + return podName + } + return "" +} + // GetOperatorLockName returns the name of the lock lease that is electing a leader on the particular namespace. func GetOperatorLockName(operatorID string) string { return fmt.Sprintf("%s-lock", operatorID) diff --git a/pkg/controller/add_integrationplatform.go b/pkg/platform/platformcontroller.go similarity index 55% copy from pkg/controller/add_integrationplatform.go copy to pkg/platform/platformcontroller.go index 16b88f91a..8ab6a47e8 100644 --- a/pkg/controller/add_integrationplatform.go +++ b/pkg/platform/platformcontroller.go @@ -15,12 +15,21 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package platform -import ( - "github.com/apache/camel-k/v2/pkg/controller/integrationplatform" +import "fmt" + +const ( + PlatformControllerWatchNamespaceEnvVariable = "WATCH_NAMESPACE" + platformControllerNamespaceEnvVariable = "NAMESPACE" + platformControllerPodNameEnvVariable = "POD_NAME" ) -func init() { - addToManager = append(addToManager, integrationplatform.Add) +const PlatformControllerLockName = "camel-k-platform-controller-lock" + +var PlatformControllerImage string + +// GetPlatformControllerLockName returns the name of the lock lease that is electing a leader on the particular namespace. +func GetPlatformControllerLockName(platformControllerID string) string { + return fmt.Sprintf("camel-k-platform-controller-%s-lock", platformControllerID) } diff --git a/pkg/resources/config/manager/kustomization.yaml b/pkg/resources/config/manager/kustomization.yaml index 895978ab4..1ef36b774 100644 --- a/pkg/resources/config/manager/kustomization.yaml +++ b/pkg/resources/config/manager/kustomization.yaml @@ -20,4 +20,5 @@ kind: Kustomization resources: - operator-deployment.yaml +- platformcontroller-deployment.yaml - operator-service-account.yaml diff --git a/pkg/resources/config/manager/platformcontroller-deployment.yaml b/pkg/resources/config/manager/platformcontroller-deployment.yaml new file mode 100644 index 000000000..89691ffe6 --- /dev/null +++ b/pkg/resources/config/manager/platformcontroller-deployment.yaml @@ -0,0 +1,89 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: camel-k-platformcontroller + labels: + app: "camel-k" + camel.apache.org/component: platformcontroller + name: camel-k-platformcontroller + app.kubernetes.io/component: platformcontroller + app.kubernetes.io/name: camel-k-platformcontroller + app.kubernetes.io/version: "2.4.0-SNAPSHOT" +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + name: camel-k-platformcontroller + template: + metadata: + labels: + name: camel-k-platformcontroller + camel.apache.org/component: platformcontroller + app: "camel-k" + app.kubernetes.io/component: platformcontroller + app.kubernetes.io/name: camel-k-platformcontroller + app.kubernetes.io/version: "2.4.0-SNAPSHOT" + spec: + serviceAccountName: camel-k-operator + containers: + - name: camel-k-platformcontroller + image: docker.io/apache/camel-k:2.4.0-SNAPSHOT + imagePullPolicy: IfNotPresent + command: + - kamel + - platformcontroller + ports: + - containerPort: 8080 + name: metrics + env: + - name: WATCH_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OPERATOR_NAME + value: "camel-k-platformcontroller" + - name: OPERATOR_ID + value: "camel-k" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + # NAMESPACE is always the operator namespace, independently of WATCH_NAMESPACE + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + livenessProbe: + httpGet: + path: /healthz + port: 8081 + initialDelaySeconds: 20 + periodSeconds: 10 + securityContext: + runAsNonRoot: true + seccompProfile: + type: RuntimeDefault + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + diff --git a/pkg/resources/config/prometheus/kustomization.yaml b/pkg/resources/config/prometheus/kustomization.yaml index a14ee154c..c92fa62b2 100644 --- a/pkg/resources/config/prometheus/kustomization.yaml +++ b/pkg/resources/config/prometheus/kustomization.yaml @@ -18,3 +18,5 @@ resources: - operator-pod-monitor.yaml - operator-prometheus-rule.yaml +- platformcontroller-pod-monitor.yaml +- platformcontroller-prometheus-rule.yaml diff --git a/pkg/resources/config/manager/kustomization.yaml b/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml similarity index 73% copy from pkg/resources/config/manager/kustomization.yaml copy to pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml index 895978ab4..42b1ac5e9 100644 --- a/pkg/resources/config/manager/kustomization.yaml +++ b/pkg/resources/config/prometheus/platformcontroller-pod-monitor.yaml @@ -15,9 +15,17 @@ # limitations under the License. # --------------------------------------------------------------------------- -apiVersion: kustomize.config.k8s.io/v1beta1 -kind: Kustomization - -resources: -- operator-deployment.yaml -- operator-service-account.yaml +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: camel-k-platformcontroller + labels: + app: "camel-k" + camel.apache.org/component: platformcontroller +spec: + selector: + matchLabels: + app: "camel-k" + camel.apache.org/component: platformcontroller + podMetricsEndpoints: + - port: metrics diff --git a/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml b/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml new file mode 100644 index 000000000..6aae49fd2 --- /dev/null +++ b/pkg/resources/config/prometheus/platformcontroller-prometheus-rule.yaml @@ -0,0 +1,55 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: monitoring.coreos.com/v1 +kind: PrometheusRule +metadata: + name: camel-k-platformcontroller +spec: + groups: + - name: camel-k-platformcontroller + rules: + - alert: CamelKReconciliationDuration + expr: | + ( + 1 - sum(rate(camel_k_reconciliation_duration_seconds_bucket{le="0.5"}[5m])) by (job) + / + sum(rate(camel_k_reconciliation_duration_seconds_count[5m])) by (job) + ) + * 100 + > 10 + for: 1m + labels: + severity: warning + annotations: + message: | + {{ printf "%0.0f" $value }}% of the reconciliation requests + for {{ $labels.job }} have their duration above 0.5s. + - alert: CamelKReconciliationFailure + expr: | + sum(rate(camel_k_reconciliation_duration_seconds_count{result="Errored"}[5m])) by (job) + / + sum(rate(camel_k_reconciliation_duration_seconds_count[5m])) by (job) + * 100 + > 1 + for: 10m + labels: + severity: warning + annotations: + message: | + {{ printf "%0.0f" $value }}% of the reconciliation requests + for {{ $labels.job }} have failed.