This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit d998dd96f1dd2b3035ffcadbe809dc67ccf508ed Author: Pranjul Kalsi <[email protected]> AuthorDate: Tue Dec 16 12:31:36 2025 +0530 feat(keda): refactor auto-discovery with interface-based scalers --- docs/modules/ROOT/partials/apis/camel-k-crds.adoc | 8 ++ docs/modules/traits/pages/keda.adoc | 5 + .../files/keda-it-kafkatopic-to-log-auto.yaml | 21 +++- ... => keda-kafka-auto-discovery-with-manual.yaml} | 24 ++-- ...iscovery.yaml => keda-kafka-auto-metadata.yaml} | 18 +-- e2e/kafka/kafka_autoscale_keda_test.go | 61 +++++++++- helm/camel-k/crds/camel-k-crds.yaml | 72 ++++++++++++ pkg/apis/camel/v1/trait/keda.go | 8 +- pkg/apis/camel/v1/trait/zz_generated.deepcopy.go | 18 +++ .../camel.apache.org_integrationplatforms.yaml | 18 +++ .../camel.apache.org_integrationprofiles.yaml | 18 +++ .../crd/bases/camel.apache.org_integrations.yaml | 18 +++ .../config/crd/bases/camel.apache.org_pipes.yaml | 18 +++ pkg/trait/keda.go | 27 ++++- pkg/trait/keda/mapper.go | 60 ++++++++++ .../{keda_mapping_test.go => keda/mapper_test.go} | 82 +------------- pkg/trait/keda/scalers/kafka.go | 45 ++++++++ pkg/trait/keda/scalers/kafka_test.go | 108 ++++++++++++++++++ pkg/trait/keda/uri_parser.go | 57 ++++++++++ pkg/trait/keda_mapping.go | 125 --------------------- pkg/trait/keda_test.go | 84 ++++++++++---- 21 files changed, 645 insertions(+), 250 deletions(-) diff --git a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc index ae0916c91..1cc95669b 100644 --- a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc +++ b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc @@ -8083,6 +8083,14 @@ Automatically discover KEDA triggers from Camel component URIs. +|`autoMetadata` + +map[string]map[string]string +| + + +Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), +values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + |=== diff --git a/docs/modules/traits/pages/keda.adoc b/docs/modules/traits/pages/keda.adoc index dba237792..949d5b613 100644 --- a/docs/modules/traits/pages/keda.adoc +++ b/docs/modules/traits/pages/keda.adoc @@ -58,6 +58,11 @@ Automatically discover KEDA triggers from Camel component URIs. | []github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait.KedaTrigger | +| keda.auto-metadata +| map[string]map[string]string +| Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), +values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + |=== // End of autogenerated code - DO NOT EDIT! (configuration) diff --git a/e2e/kafka/files/keda-it-kafkatopic-to-log-auto.yaml b/e2e/kafka/files/keda-it-kafkatopic-to-log-auto.yaml index 018c93ec0..ed0cb165e 100644 --- a/e2e/kafka/files/keda-it-kafkatopic-to-log-auto.yaml +++ b/e2e/kafka/files/keda-it-kafkatopic-to-log-auto.yaml @@ -1,3 +1,20 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + apiVersion: camel.apache.org/v1 kind: Integration metadata: @@ -14,4 +31,6 @@ spec: enabled: true minReplicaCount: 0 maxReplicaCount: 1 - # No triggers - auto-discovery should create kafka scaler \ No newline at end of file + cooldownPeriod: 20 + pollingInterval: 10 + # No triggers specified - auto-discovery should create kafka scaler \ No newline at end of file diff --git a/e2e/kafka/files/keda-kafka-auto-discovery.yaml b/e2e/kafka/files/keda-kafka-auto-discovery-with-manual.yaml similarity index 68% copy from e2e/kafka/files/keda-kafka-auto-discovery.yaml copy to e2e/kafka/files/keda-kafka-auto-discovery-with-manual.yaml index 62e3f5661..02557f1b3 100644 --- a/e2e/kafka/files/keda-kafka-auto-discovery.yaml +++ b/e2e/kafka/files/keda-kafka-auto-discovery-with-manual.yaml @@ -14,24 +14,32 @@ # See the License for the specific language governing permissions and # limitations under the License. # --------------------------------------------------------------------------- -# This integration tests KEDA auto-discovery - no manual triggers are specified, -# the trait should automatically discover the kafka component and create a trigger. +# This integration tests KEDA auto-discovery with manual trigger (Case 2): +# - Manual cron trigger is specified +# - Kafka trigger should still be auto-discovered (different type) +# - Result: 2 triggers (cron + kafka) + apiVersion: camel.apache.org/v1 kind: Integration metadata: - name: keda-kafka-auto-discovery + name: keda-kafka-auto-discovery-with-manual namespace: kafka spec: sources: - content: | - from("kafka:my-topic?brokers=my-cluster-kafka-bootstrap.kafka.svc:9092&groupId=auto-group") + from("kafka:my-topic?brokers=my-cluster-kafka-bootstrap.kafka.svc:9092&groupId=mixed-group") .log("${body}"); name: routes.java traits: keda: enabled: true minReplicaCount: 0 - maxReplicaCount: 1 - cooldownPeriod: 20 - pollingInterval: 10 - # No triggers specified - auto-discovery should create kafka scaler + maxReplicaCount: 2 + # Manual cron trigger - kafka should still be auto-discovered + triggers: + - type: cron + metadata: + timezone: "Etc/UTC" + start: "0 * * * *" + end: "59 * * * *" + desiredReplicas: "1" diff --git a/e2e/kafka/files/keda-kafka-auto-discovery.yaml b/e2e/kafka/files/keda-kafka-auto-metadata.yaml similarity index 75% rename from e2e/kafka/files/keda-kafka-auto-discovery.yaml rename to e2e/kafka/files/keda-kafka-auto-metadata.yaml index 62e3f5661..c554ef792 100644 --- a/e2e/kafka/files/keda-kafka-auto-discovery.yaml +++ b/e2e/kafka/files/keda-kafka-auto-metadata.yaml @@ -14,17 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. # --------------------------------------------------------------------------- -# This integration tests KEDA auto-discovery - no manual triggers are specified, -# the trait should automatically discover the kafka component and create a trigger. +# This integration tests KEDA autoMetadata merge (Case 3): +# - Kafka trigger is auto-discovered from URI +# - Additional metadata (lagThreshold) is merged via autoMetadata + apiVersion: camel.apache.org/v1 kind: Integration metadata: - name: keda-kafka-auto-discovery + name: keda-kafka-auto-metadata namespace: kafka spec: sources: - content: | - from("kafka:my-topic?brokers=my-cluster-kafka-bootstrap.kafka.svc:9092&groupId=auto-group") + from("kafka:my-topic?brokers=my-cluster-kafka-bootstrap.kafka.svc:9092&groupId=metadata-group") .log("${body}"); name: routes.java traits: @@ -32,6 +34,8 @@ spec: enabled: true minReplicaCount: 0 maxReplicaCount: 1 - cooldownPeriod: 20 - pollingInterval: 10 - # No triggers specified - auto-discovery should create kafka scaler + # Extra metadata to merge into auto-discovered kafka trigger + autoMetadata: + kafka: + lagThreshold: "100" + activationLagThreshold: "10" diff --git a/e2e/kafka/kafka_autoscale_keda_test.go b/e2e/kafka/kafka_autoscale_keda_test.go index 58dca1365..91adf24e6 100644 --- a/e2e/kafka/kafka_autoscale_keda_test.go +++ b/e2e/kafka/kafka_autoscale_keda_test.go @@ -77,7 +77,7 @@ func TestKafkaKedaAutoscale(t *testing.T) { func TestKafkaKedaAutoDiscovery(t *testing.T) { WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { t.Run("Auto-discovery Kafka", func(t *testing.T) { - ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/keda-kafka-auto-discovery.yaml")) + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/keda-it-kafkatopic-to-log-auto.yaml")) ns := "kafka" integrationName := "keda-kafka-auto-discovery" @@ -98,3 +98,62 @@ func TestKafkaKedaAutoDiscovery(t *testing.T) { }) }) } + +func TestKafkaKedaAutoDiscoveryWithManualTrigger(t *testing.T) { + WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { + t.Run("Auto-discovery with manual trigger (Case 2)", func(t *testing.T) { + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/keda-kafka-auto-discovery-with-manual.yaml")) + ns := "kafka" + integrationName := "keda-kafka-auto-discovery-with-manual" + + g.Eventually(ScaledObject(t, ctx, ns, integrationName), TestTimeoutMedium). + ShouldNot(BeNil()) + + scaledObj := ScaledObject(t, ctx, ns, integrationName)() + g.Expect(scaledObj).NotTo(BeNil()) + g.Expect(scaledObj.Spec.Triggers).To(HaveLen(2)) + + var kafkaTrigger, cronTrigger interface{} + for _, trigger := range scaledObj.Spec.Triggers { + if trigger.Type == "kafka" { + kafkaTrigger = trigger + } + if trigger.Type == "cron" { + cronTrigger = trigger + } + } + g.Expect(kafkaTrigger).NotTo(BeNil(), "kafka trigger should be auto-discovered") + g.Expect(cronTrigger).NotTo(BeNil(), "cron trigger should exist from manual config") + + g.Expect(Kamel(t, ctx, "delete", integrationName, "-n", ns).Execute()).To(Succeed()) + }) + }) +} + +func TestKafkaKedaAutoMetadata(t *testing.T) { + WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { + t.Run("Auto-discovery with autoMetadata merge (Case 3)", func(t *testing.T) { + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/keda-kafka-auto-metadata.yaml")) + ns := "kafka" + integrationName := "keda-kafka-auto-metadata" + + g.Eventually(ScaledObject(t, ctx, ns, integrationName), TestTimeoutMedium). + ShouldNot(BeNil()) + + // Verify the trigger has merged metadata + scaledObj := ScaledObject(t, ctx, ns, integrationName)() + g.Expect(scaledObj).NotTo(BeNil()) + g.Expect(scaledObj.Spec.Triggers).To(HaveLen(1)) + g.Expect(scaledObj.Spec.Triggers[0].Type).To(Equal("kafka")) + // Auto-discovered + g.Expect(scaledObj.Spec.Triggers[0].Metadata["topic"]).To(Equal("my-topic")) + g.Expect(scaledObj.Spec.Triggers[0].Metadata["bootstrapServers"]).To(Equal("my-cluster-kafka-bootstrap.kafka.svc:9092")) + g.Expect(scaledObj.Spec.Triggers[0].Metadata["consumerGroup"]).To(Equal("metadata-group")) + // Merge + g.Expect(scaledObj.Spec.Triggers[0].Metadata["lagThreshold"]).To(Equal("100")) + g.Expect(scaledObj.Spec.Triggers[0].Metadata["activationLagThreshold"]).To(Equal("10")) + + g.Expect(Kamel(t, ctx, "delete", integrationName, "-n", ns).Execute()).To(Succeed()) + }) + }) +} diff --git a/helm/camel-k/crds/camel-k-crds.yaml b/helm/camel-k/crds/camel-k-crds.yaml index 0ad56a8db..b42346021 100644 --- a/helm/camel-k/crds/camel-k-crds.yaml +++ b/helm/camel-k/crds/camel-k-crds.yaml @@ -4814,6 +4814,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -7226,6 +7235,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -9540,6 +9558,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -11831,6 +11858,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -20956,6 +20992,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -23201,6 +23246,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -33688,6 +33742,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -35865,6 +35928,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. diff --git a/pkg/apis/camel/v1/trait/keda.go b/pkg/apis/camel/v1/trait/keda.go index 2d92a9432..ade26692c 100644 --- a/pkg/apis/camel/v1/trait/keda.go +++ b/pkg/apis/camel/v1/trait/keda.go @@ -36,10 +36,14 @@ type KedaTrait struct { // Definition of triggers according to the KEDA format. Each trigger must contain `type` field corresponding // to the name of a KEDA autoscaler and a key/value map named `metadata` containing specific trigger options // and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. + Triggers []KedaTrigger `json:"triggers,omitempty" property:"triggers"` // Automatically discover KEDA triggers from Camel component URIs. // +kubebuilder:validation:Optional - Auto *bool `json:"auto,omitempty" property:"auto"` - Triggers []KedaTrigger `json:"triggers,omitempty" property:"triggers"` + Auto *bool `json:"auto,omitempty" property:"auto"` + // Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + // values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + // +kubebuilder:validation:Optional + AutoMetadata map[string]map[string]string `json:"autoMetadata,omitempty" property:"auto-metadata"` } type KedaTrigger struct { diff --git a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go index c194c65e3..56a98cff6 100644 --- a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go +++ b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go @@ -743,6 +743,24 @@ func (in *KedaTrait) DeepCopyInto(out *KedaTrait) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.AutoMetadata != nil { + in, out := &in.AutoMetadata, &out.AutoMetadata + *out = make(map[string]map[string]string, len(*in)) + for key, val := range *in { + var outVal map[string]string + if val == nil { + (*out)[key] = nil + } else { + inVal := (*in)[key] + in, out := &inVal, &outVal + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + (*out)[key] = outVal + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KedaTrait. diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml index bb9b3ce73..67014b693 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml @@ -1565,6 +1565,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -3977,6 +3986,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml index b0770c973..14579406b 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml @@ -1433,6 +1433,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -3724,6 +3733,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml index 3b9a080fd..a6a8b7d20 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml @@ -8247,6 +8247,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -10492,6 +10501,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. diff --git a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml index a9bc05167..7e581348f 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml @@ -8303,6 +8303,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. @@ -10480,6 +10489,15 @@ spec: and optionally a mapping of secrets, used by Keda operator to poll resources according to the autoscaler type. Automatically discover KEDA triggers from Camel component URIs. type: boolean + autoMetadata: + additionalProperties: + additionalProperties: + type: string + type: object + description: |- + Additional metadata to merge into auto-discovered triggers. Keys are trigger types (e.g., "kafka"), + values are maps of metadata key-value pairs to merge (e.g., {"lagThreshold": "10"}). + type: object configuration: description: |- Legacy trait configuration parameters. diff --git a/pkg/trait/keda.go b/pkg/trait/keda.go index 0eb438019..e00d2f754 100644 --- a/pkg/trait/keda.go +++ b/pkg/trait/keda.go @@ -25,6 +25,8 @@ import ( traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/apache/camel-k/v2/pkg/apis/duck/keda/v1alpha1" "github.com/apache/camel-k/v2/pkg/metadata" + kedamapper "github.com/apache/camel-k/v2/pkg/trait/keda" + _ "github.com/apache/camel-k/v2/pkg/trait/keda/scalers" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" @@ -50,7 +52,7 @@ func (t *kedaTrait) Configure(e *Environment) (bool, *TraitCondition, error) { return false, nil, nil } - if ptr.Deref(t.Auto, true) && len(t.Triggers) == 0 { + if ptr.Deref(t.Auto, true) { if err := t.autoDiscoverTriggers(e); err != nil { return false, nil, err } @@ -156,20 +158,37 @@ func (t *kedaTrait) getScaleTarget(it *v1.Integration) *corev1.ObjectReference { } } -// Auto-discover triggers if Auto is enabled (default) and no manual triggers specified +// autoDiscoverTriggers discovers KEDA triggers from Camel source URIs. func (t *kedaTrait) autoDiscoverTriggers(e *Environment) error { + // Build set of manually configured trigger types + manualTypes := make(map[string]bool) + for _, trigger := range t.Triggers { + manualTypes[trigger.Type] = true + } + meta, err := metadata.ExtractAll(e.CamelCatalog, e.Integration.AllSources()) if err != nil { return err } + for _, fromURI := range meta.FromURIs { - trigger, err := mapToKedaTrigger(fromURI) + trigger, err := kedamapper.MapToKedaTrigger(fromURI) if err != nil { return err } - if trigger != nil { + // Only add if trigger type not already manually configured + if trigger != nil && !manualTypes[trigger.Type] { + // Merge additional metadata if configured + if t.AutoMetadata != nil { + if extra, ok := t.AutoMetadata[trigger.Type]; ok { + for k, v := range extra { + trigger.Metadata[k] = v + } + } + } t.Triggers = append(t.Triggers, *trigger) } } + return nil } diff --git a/pkg/trait/keda/mapper.go b/pkg/trait/keda/mapper.go new file mode 100644 index 000000000..eca30cfe2 --- /dev/null +++ b/pkg/trait/keda/mapper.go @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keda + +import ( + traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" +) + +// ScaleMapper defines the interface for mapping Camel component URIs to KEDA triggers. +type ScaleMapper interface { + Component() string + Map(pathValue string, params map[string]string) (kedaType string, metadata map[string]string) +} + +var registry = map[string]ScaleMapper{} + +func Register(m ScaleMapper) { + registry[m.Component()] = m +} + +func GetMapper(component string) (ScaleMapper, bool) { + m, ok := registry[component] + + return m, ok +} + +func MapToKedaTrigger(rawURI string) (*traitv1.KedaTrigger, error) { + scheme, pathValue, params, err := ParseComponentURI(rawURI) + if err != nil { + return nil, err + } + if scheme == "" { + return nil, nil + } + mapper, found := GetMapper(scheme) + if !found { + return nil, nil + } + kedaType, metadata := mapper.Map(pathValue, params) + + return &traitv1.KedaTrigger{ + Type: kedaType, + Metadata: metadata, + }, nil +} diff --git a/pkg/trait/keda_mapping_test.go b/pkg/trait/keda/mapper_test.go similarity index 51% rename from pkg/trait/keda_mapping_test.go rename to pkg/trait/keda/mapper_test.go index b60956de5..4d61d87df 100644 --- a/pkg/trait/keda_mapping_test.go +++ b/pkg/trait/keda/mapper_test.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package trait +package keda import ( "testing" @@ -46,13 +46,6 @@ func TestParseComponentURI(t *testing.T) { pathValue: "myQueue", params: map[string]string{"region": "us-east-1"}, }, - { - name: "spring-rabbitmq", - uri: "spring-rabbitmq:exchange?queues=myQueue&addresses=localhost:5672", - scheme: "spring-rabbitmq", - pathValue: "exchange", - params: map[string]string{"queues": "myQueue", "addresses": "localhost:5672"}, - }, { name: "timer unsupported", uri: "timer:tick?period=1000", @@ -78,7 +71,7 @@ func TestParseComponentURI(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - scheme, pathValue, params, err := parseComponentURI(tt.uri) + scheme, pathValue, params, err := ParseComponentURI(tt.uri) require.NoError(t, err) assert.Equal(t, tt.scheme, scheme) assert.Equal(t, tt.pathValue, pathValue) @@ -86,74 +79,3 @@ func TestParseComponentURI(t *testing.T) { }) } } - -func TestMapToKedaTrigger(t *testing.T) { - tests := []struct { - name string - uri string - expectNil bool - expectedType string - expectedMeta map[string]string - }{ - { - name: "kafka trigger", - uri: "kafka:orders?brokers=broker:9092&groupId=grp", - expectNil: false, - expectedType: "kafka", - expectedMeta: map[string]string{ - "topic": "orders", - "bootstrapServers": "broker:9092", - "consumerGroup": "grp", - }, - }, - { - name: "aws2-sqs trigger", - uri: "aws2-sqs:myQueue?region=us-east-1", - expectNil: false, - expectedType: "aws-sqs-queue", - expectedMeta: map[string]string{ - "queueURL": "myQueue", - "awsRegion": "us-east-1", - }, - }, - { - name: "spring-rabbitmq trigger", - uri: "spring-rabbitmq:exchange?queues=myQueue&addresses=localhost:5672", - expectNil: false, - expectedType: "rabbitmq", - expectedMeta: map[string]string{ - "queueName": "myQueue", - "host": "localhost:5672", - }, - }, - { - name: "timer no trigger", - uri: "timer:tick", - expectNil: true, - }, - { - name: "direct no trigger", - uri: "direct:start", - expectNil: true, - }, - { - name: "empty uri no trigger", - uri: "", - expectNil: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - trigger, err := mapToKedaTrigger(tt.uri) - require.NoError(t, err) - if tt.expectNil { - assert.Nil(t, trigger) - } else { - require.NotNil(t, trigger) - assert.Equal(t, tt.expectedType, trigger.Type) - assert.Equal(t, tt.expectedMeta, trigger.Metadata) - } - }) - } -} diff --git a/pkg/trait/keda/scalers/kafka.go b/pkg/trait/keda/scalers/kafka.go new file mode 100644 index 000000000..873664a00 --- /dev/null +++ b/pkg/trait/keda/scalers/kafka.go @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalers + +import ( + keda "github.com/apache/camel-k/v2/pkg/trait/keda" +) + +type KafkaScaler struct{} + +func init() { + keda.Register(&KafkaScaler{}) +} +func (k *KafkaScaler) Component() string { + return "kafka" +} +func (k *KafkaScaler) Map(pathValue string, params map[string]string) (string, map[string]string) { + metadata := make(map[string]string) + if pathValue != "" { + metadata["topic"] = pathValue + } + if v, ok := params["brokers"]; ok { + metadata["bootstrapServers"] = v + } + if v, ok := params["groupId"]; ok { + metadata["consumerGroup"] = v + } + + return "kafka", metadata +} diff --git a/pkg/trait/keda/scalers/kafka_test.go b/pkg/trait/keda/scalers/kafka_test.go new file mode 100644 index 000000000..dcfbec2cb --- /dev/null +++ b/pkg/trait/keda/scalers/kafka_test.go @@ -0,0 +1,108 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalers + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + keda "github.com/apache/camel-k/v2/pkg/trait/keda" +) + +func TestKafkaScalerRegistered(t *testing.T) { + mapper, found := keda.GetMapper("kafka") + require.True(t, found, "kafka scaler should be registered") + assert.Equal(t, "kafka", mapper.Component()) +} + +func TestKafkaScalerMap(t *testing.T) { + scaler := &KafkaScaler{} + + tests := []struct { + name string + pathValue string + params map[string]string + expectedType string + expectedMeta map[string]string + }{ + { + name: "full kafka params", + pathValue: "orders", + params: map[string]string{"brokers": "broker:9092", "groupId": "grp"}, + expectedType: "kafka", + expectedMeta: map[string]string{ + "topic": "orders", + "bootstrapServers": "broker:9092", + "consumerGroup": "grp", + }, + }, + { + name: "only topic", + pathValue: "my-topic", + params: map[string]string{}, + expectedType: "kafka", + expectedMeta: map[string]string{ + "topic": "my-topic", + }, + }, + { + name: "partial params", + pathValue: "events", + params: map[string]string{"brokers": "localhost:9092"}, + expectedType: "kafka", + expectedMeta: map[string]string{ + "topic": "events", + "bootstrapServers": "localhost:9092", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kedaType, metadata := scaler.Map(tt.pathValue, tt.params) + assert.Equal(t, tt.expectedType, kedaType) + assert.Equal(t, tt.expectedMeta, metadata) + }) + } +} + +func TestMapToKedaTriggerKafka(t *testing.T) { + trigger, err := keda.MapToKedaTrigger("kafka:orders?brokers=broker:9092&groupId=grp") + require.NoError(t, err) + require.NotNil(t, trigger) + assert.Equal(t, "kafka", trigger.Type) + assert.Equal(t, map[string]string{ + "topic": "orders", + "bootstrapServers": "broker:9092", + "consumerGroup": "grp", + }, trigger.Metadata) +} + +func TestMapToKedaTriggerUnsupported(t *testing.T) { + trigger, err := keda.MapToKedaTrigger("timer:tick") + require.NoError(t, err) + assert.Nil(t, trigger, "unsupported component should return nil trigger") +} + +func TestMapToKedaTriggerEmpty(t *testing.T) { + trigger, err := keda.MapToKedaTrigger("") + require.NoError(t, err) + assert.Nil(t, trigger) +} diff --git a/pkg/trait/keda/uri_parser.go b/pkg/trait/keda/uri_parser.go new file mode 100644 index 000000000..b12204ccd --- /dev/null +++ b/pkg/trait/keda/uri_parser.go @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package keda + +import ( + "net/url" + "strings" + + "github.com/apache/camel-k/v2/pkg/util/uri" +) + +// ParseComponentURI extracts the component scheme, path value, and query parameters from a Camel URI. +func ParseComponentURI(rawURI string) (string, string, map[string]string, error) { + scheme := uri.GetComponent(rawURI) + if scheme == "" { + return "", "", nil, nil + } + + params := make(map[string]string) + + // extract path + remainder := strings.TrimPrefix(rawURI, scheme+":") + var pathValue string + if idx := strings.Index(remainder, "?"); idx >= 0 { + pathValue = remainder[:idx] + queryString := remainder[idx+1:] + + values, parseErr := url.ParseQuery(queryString) + if parseErr != nil { + return "", "", nil, parseErr + } + for k, v := range values { + if len(v) > 0 { + params[k] = v[0] + } + } + } else { + pathValue = remainder + } + + return scheme, pathValue, params, nil +} diff --git a/pkg/trait/keda_mapping.go b/pkg/trait/keda_mapping.go deleted file mode 100644 index cc1c7ac0f..000000000 --- a/pkg/trait/keda_mapping.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package trait - -import ( - "net/url" - "strings" - - traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" - "github.com/apache/camel-k/v2/pkg/util/uri" -) - -type CamelToKedaMapping struct { - KedaScalerType string - // Maps Camel URI Param names to KEDA metadata keys. - ParameterMap map[string]string - // Params that are taken from URI path, (component:path?queryParams) - PathParamName string -} - -// camelToKedaMappings maps Camel component URI parameters to KEDA scaler metadata. -// Only components available in Camel-K catalog (pkg/resources/resources/camel-catalog-*.yaml) -// that have corresponding KEDA scalers are included. -var camelToKedaMappings = map[string]CamelToKedaMapping{ - "kafka": { - KedaScalerType: "kafka", - PathParamName: "topic", - ParameterMap: map[string]string{ - "brokers": "bootstrapServers", - "groupId": "consumerGroup", - }, - }, - "aws2-sqs": { - KedaScalerType: "aws-sqs-queue", - PathParamName: "queueURL", - ParameterMap: map[string]string{ - "region": "awsRegion", - }, - }, - "spring-rabbitmq": { - KedaScalerType: "rabbitmq", - PathParamName: "", - ParameterMap: map[string]string{ - "queues": "queueName", - "addresses": "host", - }, - }, -} - -// parseComponentURI extracts the component scheme, path value, and query parameters from a Camel URI. -func parseComponentURI(rawURI string) (string, string, map[string]string, error) { - scheme := uri.GetComponent(rawURI) - if scheme == "" { - return "", "", nil, nil - } - - params := make(map[string]string) - - // extract path - remainder := strings.TrimPrefix(rawURI, scheme+":") - var pathValue string - if idx := strings.Index(remainder, "?"); idx >= 0 { - pathValue = remainder[:idx] - queryString := remainder[idx+1:] - - values, parseErr := url.ParseQuery(queryString) - if parseErr != nil { - return "", "", nil, parseErr - } - for k, v := range values { - if len(v) > 0 { - params[k] = v[0] - } - } - } else { - pathValue = remainder - } - - return scheme, pathValue, params, nil -} - -// mapToKedaTrigger converts a Camel URI to a KEDA trigger if the component is supported. -func mapToKedaTrigger(rawURI string) (*traitv1.KedaTrigger, error) { - scheme, pathValue, params, err := parseComponentURI(rawURI) - if err != nil { - return nil, err - } - - mapping, found := camelToKedaMappings[scheme] - if scheme == "" || !found { - return nil, nil // no trigger for this URI - } - - metadata := make(map[string]string) - - if mapping.PathParamName != "" && pathValue != "" { - metadata[mapping.PathParamName] = pathValue - } - - for camelParam, kedaParam := range mapping.ParameterMap { - if val, ok := params[camelParam]; ok { - metadata[kedaParam] = val - } - } - - return &traitv1.KedaTrigger{ - Type: mapping.KedaScalerType, - Metadata: metadata, - }, nil -} diff --git a/pkg/trait/keda_test.go b/pkg/trait/keda_test.go index 5def6174c..08a220c47 100644 --- a/pkg/trait/keda_test.go +++ b/pkg/trait/keda_test.go @@ -64,6 +64,9 @@ func TestKedaAutoDiscovery(t *testing.T) { source string expectedType string expectedParams map[string]string + manualTrigger *traitv1.KedaTrigger + autoMetadata map[string]map[string]string + expectedCount int }{ { name: "kafka", @@ -74,30 +77,50 @@ func TestKedaAutoDiscovery(t *testing.T) { "bootstrapServers": "my-broker:9092", "consumerGroup": "my-group", }, + manualTrigger: nil, + expectedCount: 1, }, { - name: "aws2-sqs", - source: `from("aws2-sqs:my-queue?region=us-east-1").log("${body}");`, - expectedType: "aws-sqs-queue", + name: "manual-trigger-does-not-block-auto-discovery", + source: `from("kafka:my-topic?brokers=my-broker:9092&groupId=my-group").log("${body}");`, + expectedType: "kafka", expectedParams: map[string]string{ - "queueURL": "my-queue", - "awsRegion": "us-east-1", + "topic": "my-topic", + "bootstrapServers": "my-broker:9092", + "consumerGroup": "my-group", }, + manualTrigger: &traitv1.KedaTrigger{ + Type: "cron", + Metadata: map[string]string{ + "timezone": "Etc/UTC", + "start": "0 * * * *", + "end": "59 * * * *", + }, + }, + expectedCount: 2, // 1 manual (cron) + 1 auto-discovered (kafka) }, { - name: "spring-rabbitmq", - source: `from("spring-rabbitmq:exchange?queues=my-queue&addresses=rabbit:5672").log("${body}");`, - expectedType: "rabbitmq", + name: "auto-metadata-merge", + source: `from("kafka:my-topic?brokers=my-broker:9092&groupId=my-group").log("${body}");`, + expectedType: "kafka", expectedParams: map[string]string{ - "queueName": "my-queue", - "host": "rabbit:5672", + "topic": "my-topic", + "bootstrapServers": "my-broker:9092", + "consumerGroup": "my-group", + "lagThreshold": "10", // merged from autoMetadata + }, + autoMetadata: map[string]map[string]string{ + "kafka": { + "lagThreshold": "10", + }, }, + expectedCount: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - environment := autoDiscoveryEnvWithSource(t, tt.source) + environment := autoDiscoveryEnvWithSource(t, tt.source, tt.manualTrigger, tt.autoMetadata) environment.Platform.ResyncStatusFullConfig() traitCatalog := environment.Catalog @@ -109,10 +132,18 @@ func TestKedaAutoDiscovery(t *testing.T) { scaledObject := getKedaScaledObject(environment.Resources) require.NotNil(t, scaledObject) - require.Len(t, scaledObject.Spec.Triggers, 1) - assert.Equal(t, tt.expectedType, scaledObject.Spec.Triggers[0].Type) + require.Len(t, scaledObject.Spec.Triggers, tt.expectedCount) + // Find the auto-discovered trigger by type + var foundTrigger *v1alpha1.ScaleTriggers + for i := range scaledObject.Spec.Triggers { + if scaledObject.Spec.Triggers[i].Type == tt.expectedType { + foundTrigger = &scaledObject.Spec.Triggers[i] + break + } + } + require.NotNil(t, foundTrigger, "expected trigger type %s not found", tt.expectedType) for k, v := range tt.expectedParams { - assert.Equal(t, v, scaledObject.Spec.Triggers[0].Metadata[k], "metadata key %s mismatch", k) + assert.Equal(t, v, foundTrigger.Metadata[k], "metadata key %s mismatch", k) } }) } @@ -230,6 +261,7 @@ func nominalEnv(t *testing.T) Environment { Trait: traitv1.Trait{ Enabled: ptr.To(true), }, + Auto: ptr.To(false), // Disable auto-discovery for manual trigger test Triggers: []traitv1.KedaTrigger{ traitv1.KedaTrigger{ Type: "kafka", @@ -269,8 +301,8 @@ func nominalEnv(t *testing.T) Environment { } } -// autoDiscoveryEnvWithSource creates an environment with the given source but NO manual triggers. -func autoDiscoveryEnvWithSource(t *testing.T, source string) Environment { +// autoDiscoveryEnvWithSource creates an environment with the given source and optional manual trigger. +func autoDiscoveryEnvWithSource(t *testing.T, source string, manualTrigger *traitv1.KedaTrigger, autoMetadata map[string]map[string]string) Environment { t.Helper() catalog, err := camel.DefaultCatalog() require.NoError(t, err) @@ -302,12 +334,20 @@ func autoDiscoveryEnvWithSource(t *testing.T, source string) Environment { }, }, Traits: v1.Traits{ - Keda: &traitv1.KedaTrait{ - Trait: traitv1.Trait{ - Enabled: ptr.To(true), - }, - // No triggers - auto-discovery should kick in - }, + Keda: func() *traitv1.KedaTrait { + keda := &traitv1.KedaTrait{ + Trait: traitv1.Trait{ + Enabled: ptr.To(true), + }, + } + if manualTrigger != nil { + keda.Triggers = []traitv1.KedaTrigger{*manualTrigger} + } + if autoMetadata != nil { + keda.AutoMetadata = autoMetadata + } + return keda + }(), }, }, },
