This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push:
new 082835ee1 feat(util): allow Kamelet crossnamespace in Pipes
082835ee1 is described below
commit 082835ee11693ddf4589bdaafbfd921a367035ae
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Wed Oct 15 08:36:16 2025 +0200
feat(util): allow Kamelet crossnamespace in Pipes
Closes #6303
---
pkg/util/bindings/api.go | 1 +
pkg/util/bindings/catalog.go | 10 ++--
pkg/util/bindings/catalog_test.go | 123 ++++++++++++++++++++------------------
pkg/util/bindings/kamelet.go | 19 ++++--
pkg/util/bindings/kamelet_test.go | 64 ++++++++++++++++++++
pkg/util/bindings/strimzi.go | 28 ++++++---
pkg/util/source/kamelet.go | 35 ++++++-----
7 files changed, 189 insertions(+), 91 deletions(-)
diff --git a/pkg/util/bindings/api.go b/pkg/util/bindings/api.go
index 56dadc0a5..47dbe7ff2 100644
--- a/pkg/util/bindings/api.go
+++ b/pkg/util/bindings/api.go
@@ -39,6 +39,7 @@ type Binding struct {
// Step is to support complex mapping such as Camel's EIPs
Step map[string]interface{}
// Traits is a partial trait specification that should be merged into
the integration
+ // Deprecated: will be removed in future releases
Traits v1.Traits
// ApplicationProperties contain properties that should be set on the
integration for the binding to work
ApplicationProperties map[string]string
diff --git a/pkg/util/bindings/catalog.go b/pkg/util/bindings/catalog.go
index d05cdb3ae..726216c5e 100644
--- a/pkg/util/bindings/catalog.go
+++ b/pkg/util/bindings/catalog.go
@@ -23,7 +23,6 @@ import (
"sort"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
- "github.com/apache/camel-k/v2/pkg/platform"
"k8s.io/utils/ptr"
)
@@ -73,12 +72,13 @@ func validateEndpoint(ctx BindingContext, e v1.Endpoint)
error {
return errors.New("cannot use both ref and URI to specify an
endpoint: only one of them should be used")
}
if e.Ref != nil && e.Ref.Namespace != "" && e.Ref.Namespace !=
ctx.Namespace {
- // referencing default Kamelets in operator namespace is allowed
- if e.Ref.Kind == v1.KameletKind && e.Ref.Namespace ==
platform.GetOperatorNamespace() {
- return nil
+ if ok, err := isKnownKnativeResource(e.Ref); ok {
+ if err != nil {
+ return err
+ }
+ return errors.New("cross-namespace Pipe references are
not allowed for Knative")
}
- return errors.New("cross-namespace references are not allowed
in Pipe")
}
return nil
}
diff --git a/pkg/util/bindings/catalog_test.go
b/pkg/util/bindings/catalog_test.go
index d9d235150..53538280d 100644
--- a/pkg/util/bindings/catalog_test.go
+++ b/pkg/util/bindings/catalog_test.go
@@ -96,76 +96,25 @@ func TestValidateEndpoint(t *testing.T) {
},
},
},
- }
-
- for i, tc := range testcases {
- t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T)
{
- if tc.operatorNamespace != "" {
- t.Setenv("NAMESPACE", tc.operatorNamespace)
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- client, err := internal.NewFakeClient()
- require.NoError(t, err)
-
- bindingContext := BindingContext{
- Ctx: ctx,
- Client: client,
- Namespace: tc.namespace,
- Profile: v1.DefaultTraitProfile,
- }
-
- err = validateEndpoint(bindingContext, tc.endpoint)
- require.NoError(t, err)
- })
- }
-}
-
-func TestValidateEndpointError(t *testing.T) {
- uri := "log:info"
-
- testcases := []struct {
- name string
- namespace string
- operatorNamespace string
- endpoint v1.Endpoint
- }{
{
- name: "kamelet-ref-and-uri",
+ name: "it-ref",
namespace: "test",
endpoint: v1.Endpoint{
- URI: &uri,
Ref: &corev1.ObjectReference{
- Kind: v1.KameletKind,
+ Kind: v1.IntegrationKind,
APIVersion:
v1.SchemeGroupVersion.String(),
- Name: "foo-kamelet",
+ Name: "foo-it",
},
},
},
{
- name: "kamelet-ref-cross-namespace",
+ name: "pipe-ref",
namespace: "test",
endpoint: v1.Endpoint{
Ref: &corev1.ObjectReference{
- Kind: v1.KameletKind,
- Namespace: "other",
+ Kind: v1.PipeKind,
APIVersion:
v1.SchemeGroupVersion.String(),
- Name: "foo-kamelet",
- },
- },
- },
- {
- name:
"knative-broker-ref-in-operator-namespace",
- namespace: "test",
- operatorNamespace: "global",
- endpoint: v1.Endpoint{
- Ref: &corev1.ObjectReference{
- Kind: "Broker",
- Namespace: "global",
- APIVersion:
eventing.SchemeGroupVersion.String(),
- Name: "foo-broker",
+ Name: "foo-pipe",
},
},
},
@@ -191,7 +140,65 @@ func TestValidateEndpointError(t *testing.T) {
}
err = validateEndpoint(bindingContext, tc.endpoint)
- require.Error(t, err, "cross-namespace references are
not allowed in Pipe")
+ require.NoError(t, err)
})
}
}
+
+func TestValidateEndpointErrorRefURI(t *testing.T) {
+ uri := "log:info"
+
+ endpoint := v1.Endpoint{
+ URI: &uri,
+ Ref: &corev1.ObjectReference{
+ Kind: v1.KameletKind,
+ APIVersion: v1.SchemeGroupVersion.String(),
+ Name: "foo-kamelet",
+ },
+ }
+
+ bindingContext := BindingContext{
+ Namespace: "default",
+ }
+
+ err := validateEndpoint(bindingContext, endpoint)
+ require.Error(t, err)
+ require.Equal(t, "cannot use both ref and URI to specify an endpoint:
only one of them should be used", err.Error())
+}
+
+func TestValidateEndpointKameletCrossNS(t *testing.T) {
+ endpoint := v1.Endpoint{
+ Ref: &corev1.ObjectReference{
+ Kind: v1.KameletKind,
+ APIVersion: v1.SchemeGroupVersion.String(),
+ Name: "foo-kamelet",
+ Namespace: "kamelet-ns",
+ },
+ }
+
+ bindingContext := BindingContext{
+ Namespace: "default",
+ }
+
+ err := validateEndpoint(bindingContext, endpoint)
+ require.NoError(t, err)
+}
+
+func TestValidateEndpointErrorKnativeCrossNS(t *testing.T) {
+ endpoint := v1.Endpoint{
+ Ref: &corev1.ObjectReference{
+ Kind: "Broker",
+ Namespace: "knative-ns",
+ APIVersion: eventing.SchemeGroupVersion.String(),
+ Name: "foo-broker",
+ },
+ }
+
+ bindingContext := BindingContext{
+ Namespace: "default",
+ }
+
+ err := validateEndpoint(bindingContext, endpoint)
+ require.Error(t, err)
+ require.Equal(t, "cross-namespace Pipe references are not allowed for
Knative", err.Error())
+}
diff --git a/pkg/util/bindings/kamelet.go b/pkg/util/bindings/kamelet.go
index 1c6bad97b..f96421612 100644
--- a/pkg/util/bindings/kamelet.go
+++ b/pkg/util/bindings/kamelet.go
@@ -23,6 +23,7 @@ import (
"strings"
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+ "github.com/apache/camel-k/v2/pkg/util/source"
"k8s.io/apimachinery/pkg/runtime/schema"
)
@@ -62,18 +63,26 @@ func (k BindingConverter) Translate(ctx BindingContext,
endpointCtx EndpointCont
return nil, err
}
+ // Set id, if specified
id, idPresent := props[v1.KameletIDProperty]
if idPresent {
delete(props, v1.KameletIDProperty)
} else {
id = endpointCtx.GenerateID()
}
+ // Set version, if specified
version, versionPresent := props[v1.KameletVersionProperty]
if versionPresent {
delete(props, v1.KameletVersionProperty)
}
- kameletTranslated := getKameletName(kameletName, id, version)
+ // Set namespace, if specified and different from the actual context
+ namespace := ""
+ if e.Ref.Namespace != "" && e.Ref.Namespace != ctx.Namespace {
+ namespace = e.Ref.Namespace
+ }
+
+ kameletTranslated := getKameletName(kameletName, id, version, namespace)
binding := Binding{}
binding.ApplicationProperties = make(map[string]string)
@@ -146,12 +155,10 @@ func (k BindingConverter) Translate(ctx BindingContext,
endpointCtx EndpointCont
return &binding, nil
}
-func getKameletName(name, id, version string) string {
+// getKameletName returns the kamelet with it's name and querystring attached.
+func getKameletName(name, id, version, namespace string) string {
kamelet := fmt.Sprintf("%s/%s", name, url.PathEscape(id))
- if version != "" {
- kamelet = fmt.Sprintf("%s?%s=%s", kamelet,
v1.KameletVersionProperty, version)
- }
- return kamelet
+ return source.GetKameletQuerystring(kamelet, version, namespace)
}
// DataTypeStep --.
diff --git a/pkg/util/bindings/kamelet_test.go
b/pkg/util/bindings/kamelet_test.go
index dcd8bf7cb..6a6faa5e5 100644
--- a/pkg/util/bindings/kamelet_test.go
+++ b/pkg/util/bindings/kamelet_test.go
@@ -429,3 +429,67 @@ func getExpectedStep(withIn bool, withOut bool,
dataTypeActionKamelet string) ma
},
}
}
+
+func TestBindingCrossNamespacedKamelets(t *testing.T) {
+ client, err := internal.NewFakeClient()
+ require.NoError(t, err)
+
+ endpoint := v1.Endpoint{
+ Ref: &corev1.ObjectReference{
+ Kind: "Kamelet",
+ APIVersion: "camel.apache.org/v1any1",
+ Name: "mykamelet",
+ Namespace: "extra",
+ },
+ }
+
+ binding, err := BindingConverter{}.Translate(
+ BindingContext{
+ Ctx: context.Background(),
+ Client: client,
+ Namespace: "test",
+ Profile: v1.TraitProfileKubernetes,
+ },
+ EndpointContext{
+ Type: v1.EndpointTypeSource,
+ },
+ endpoint)
+
+ require.NoError(t, err)
+ assert.NotNil(t, binding)
+ assert.Equal(t, "kamelet:mykamelet/source?kameletNamespace=extra",
binding.URI)
+}
+
+func TestBindingCrossNamespacedAndVersionedKamelets(t *testing.T) {
+ client, err := internal.NewFakeClient()
+ require.NoError(t, err)
+
+ endpoint := v1.Endpoint{
+ Ref: &corev1.ObjectReference{
+ Kind: "Kamelet",
+ APIVersion: "camel.apache.org/v1any1",
+ Name: "mykamelet",
+ Namespace: "extra",
+ },
+ Properties: asEndpointProperties(
+ map[string]string{"kameletVersion": "v1"},
+ ),
+ }
+
+ binding, err := BindingConverter{}.Translate(
+ BindingContext{
+ Ctx: context.Background(),
+ Client: client,
+ Namespace: "test",
+ Profile: v1.TraitProfileKubernetes,
+ },
+ EndpointContext{
+ Type: v1.EndpointTypeSource,
+ },
+ endpoint)
+
+ require.NoError(t, err)
+ assert.NotNil(t, binding)
+ assert.Equal(t,
"kamelet:mykamelet/source?kameletVersion=v1&kameletNamespace=extra",
binding.URI)
+ assert.Empty(t, binding.ApplicationProperties)
+}
diff --git a/pkg/util/bindings/strimzi.go b/pkg/util/bindings/strimzi.go
index aac175ab0..5f76d3bfc 100644
--- a/pkg/util/bindings/strimzi.go
+++ b/pkg/util/bindings/strimzi.go
@@ -97,6 +97,7 @@ func (s StrimziBindingProvider) fromKafkaToCamel(ctx
BindingContext, endpoint ca
}
topicName := props["topic"]
delete(props, "topic")
+ //nolint:nestif
if props["brokers"] == "" {
// build the client if needed
if s.Client == nil {
@@ -106,7 +107,12 @@ func (s StrimziBindingProvider) fromKafkaToCamel(ctx
BindingContext, endpoint ca
}
s.Client = kafkaClient
}
- bootstrapServers, err := s.getBootstrapServers(ctx,
endpoint.Ref.Name)
+ namespace := endpoint.Ref.Namespace
+ if namespace == "" {
+ namespace = ctx.Namespace
+ }
+
+ bootstrapServers, err := s.getBootstrapServers(ctx,
endpoint.Ref.Name, namespace)
if err != nil {
return nil, err
}
@@ -162,7 +168,11 @@ func (s StrimziBindingProvider) lookupBootstrapServers(ctx
BindingContext, endpo
if clusterName == "" {
return "", fmt.Errorf("no %q label defined on topic %s",
v1beta2.StrimziKafkaClusterLabel, endpoint.Ref.Name)
}
- bootstrapServers, err := s.getBootstrapServers(ctx, clusterName)
+ namespace := endpoint.Ref.Namespace
+ if namespace == "" {
+ namespace = ctx.Namespace
+ }
+ bootstrapServers, err := s.getBootstrapServers(ctx, clusterName,
namespace)
if err != nil {
return "", err
}
@@ -170,8 +180,8 @@ func (s StrimziBindingProvider) lookupBootstrapServers(ctx
BindingContext, endpo
return bootstrapServers, nil
}
-func (s StrimziBindingProvider) getBootstrapServers(ctx BindingContext,
clusterName string) (string, error) {
- cluster, err :=
s.Client.KafkaV1beta2().Kafkas(ctx.Namespace).Get(ctx.Ctx, clusterName,
v1.GetOptions{})
+func (s StrimziBindingProvider) getBootstrapServers(ctx BindingContext,
clusterName, namespace string) (string, error) {
+ cluster, err := s.Client.KafkaV1beta2().Kafkas(namespace).Get(ctx.Ctx,
clusterName, v1.GetOptions{})
if err != nil {
return "", err
}
@@ -190,8 +200,12 @@ func (s StrimziBindingProvider) getBootstrapServers(ctx
BindingContext, clusterN
}
func (s StrimziBindingProvider) lookupTopic(ctx BindingContext, endpoint
camelv1.Endpoint) (*v1beta2.KafkaTopic, error) {
+ namespace := endpoint.Ref.Namespace
+ if namespace == "" {
+ namespace = ctx.Namespace
+ }
// first check by KafkaTopic name
- topic, err :=
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx,
endpoint.Ref.Name, v1.GetOptions{})
+ topic, err :=
s.Client.KafkaV1beta2().KafkaTopics(namespace).Get(ctx.Ctx, endpoint.Ref.Name,
v1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return nil, err
}
@@ -200,12 +214,12 @@ func (s StrimziBindingProvider) lookupTopic(ctx
BindingContext, endpoint camelv1
}
// if not found, then, look at the .status.topicName (it may be
autogenerated)
- topics, err :=
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).List(ctx.Ctx, v1.ListOptions{
+ topics, err :=
s.Client.KafkaV1beta2().KafkaTopics(namespace).List(ctx.Ctx, v1.ListOptions{
FieldSelector: "status.topicName=" + endpoint.Ref.Name,
})
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("couldn't find any KafkaTopic with
either name or topicName %s; error %w", endpoint.Ref.Name, err)
}
if len(topics.Items) == 0 {
return nil, fmt.Errorf("couldn't find any KafkaTopic with
either name or topicName %s", endpoint.Ref.Name)
diff --git a/pkg/util/source/kamelet.go b/pkg/util/source/kamelet.go
index 82579aead..d5b3fcfcb 100644
--- a/pkg/util/source/kamelet.go
+++ b/pkg/util/source/kamelet.go
@@ -27,26 +27,12 @@ import (
var kameletNameRegexp =
regexp.MustCompile("kamelet:(?://)?([a-z0-9-.]+(/[a-z0-9-.]+)?)(?:$|[^a-z0-9-.].*)")
-//nolint:nestif
func ExtractKamelet(uri string) string {
matches := kameletNameRegexp.FindStringSubmatch(uri)
if len(matches) > 1 {
version := getKameletParam(uri, v1.KameletVersionProperty)
namespace := getKameletParam(uri, v1.KameletNamespaceProperty)
- if version != "" || namespace != "" {
- var querystring string
- if version != "" {
- querystring = v1.KameletVersionProperty + "=" +
version
- }
- if namespace != "" {
- if querystring != "" {
- querystring += "&"
- }
- querystring += v1.KameletNamespaceProperty +
"=" + namespace
- }
- return fmt.Sprintf("%s?%s", matches[1], querystring)
- }
- return matches[1]
+ return GetKameletQuerystring(matches[1], version, namespace)
}
return ""
}
@@ -67,3 +53,22 @@ func getKameletParam(uri, param string) string {
queryParams := parsedURL.Query()
return queryParams.Get(param)
}
+
+// GetKameletQuerystring returns a kamelet name appended with its version and
namespace (if provided).
+func GetKameletQuerystring(name, version, namespace string) string {
+ if version != "" || namespace != "" {
+ var querystring string
+ if version != "" {
+ querystring = v1.KameletVersionProperty + "=" + version
+ }
+ if namespace != "" {
+ if querystring != "" {
+ querystring += "&"
+ }
+ querystring += v1.KameletNamespaceProperty + "=" +
namespace
+ }
+ return fmt.Sprintf("%s?%s", name, querystring)
+ }
+
+ return name
+}