This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new a51d0ee59 feat(pipe): bind to Integration, Pipe, Services
a51d0ee59 is described below

commit a51d0ee599031d1b5651f4425dbaa8dd19ea6e80
Author: Pasquale Congiusti <pasquale.congiu...@gmail.com>
AuthorDate: Tue Apr 22 08:40:02 2025 +0200

    feat(pipe): bind to Integration, Pipe, Services
    
    Closes #5935
---
 docs/modules/ROOT/pages/concepts/overview.adoc |   2 +-
 docs/modules/ROOT/pages/pipes/pipes.adoc       |  27 +++
 e2e/common/binding/binding_test.go             |  58 ++++++
 e2e/common/binding/hello.yaml                  |  26 +++
 e2e/common/binding/it-binding.yaml             |  33 +++
 pkg/controller/pipe/initialize_test.go         |  32 +--
 pkg/util/bindings/camel_uri_test.go            |  85 ++++++++
 pkg/util/bindings/knative_uri_test.go          | 100 +++++++++
 pkg/util/bindings/service_ref.go               | 102 ++++++++++
 pkg/util/bindings/service_ref_test.go          | 269 +++++++++++++++++++++++++
 pkg/util/kubernetes/lookup.go                  |  24 +++
 pkg/util/kubernetes/service.go                 |  38 ++++
 12 files changed, 771 insertions(+), 25 deletions(-)

diff --git a/docs/modules/ROOT/pages/concepts/overview.adoc 
b/docs/modules/ROOT/pages/concepts/overview.adoc
index 7dadbc2c0..c4d716206 100644
--- a/docs/modules/ROOT/pages/concepts/overview.adoc
+++ b/docs/modules/ROOT/pages/concepts/overview.adoc
@@ -32,6 +32,6 @@ image::concepts/pipes.png[Camel connector lifecycle, 
width=640]
 
 **Pipe**: it is used to create the connector binding an event source to an 
event sink.
 
-**ObjectReference**: this is the reference to any Kubernetes object. The 
operator is able to transform any Camel URI, Kamelet, Strimzi Kafka resource or 
Knative resource.
+**ObjectReference**: this is the reference to any Kubernetes object. The 
operator is able to transform any Camel URI, Kamelet, Strimzi Kafka resource, 
Knative resource, Service, Integration or Pipe.
 
 **Integration**: it is created from the Pipe translating the source and sinks 
into a Camel route.
diff --git a/docs/modules/ROOT/pages/pipes/pipes.adoc 
b/docs/modules/ROOT/pages/pipes/pipes.adoc
index 102dc7452..514d9bf88 100644
--- a/docs/modules/ROOT/pages/pipes/pipes.adoc
+++ b/docs/modules/ROOT/pages/pipes/pipes.adoc
@@ -355,6 +355,33 @@ This Pipe explicitly defines an URI where data is going to 
be pushed.
 
 NOTE: the `uri` option is also conventionally used in Knative to specify a 
non-kubernetes destination. To comply with the Knative specifications, in case 
an "http" or "https" URI is used, Camel will send 
https://cloudevents.io/[CloudEvents] to the destination.
 
+=== Binding to a Service, Integration or Pipe
+
+You can in general connect any Kubernetes Service or any Camel Integration or 
Pipe which have a Service associated.
+
+[source,yaml]
+----
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: source-to-service
+spec:
+  source:
+    ...
+  sink:
+    ref:
+      apiVersion: v1
+      kind: Service
+      name: my-svc
+      namespace: my-svc-ns
+    properties:
+      path: /path/to/my/service (optional)
+----
+
+The operator will translate to the related URL. The same mechanism works using 
`apiVersion:camel.apache.org/v1` and `kind:Integration` or `kind:Pipe` types, 
assuming these Integrations are exposing any kind of ClusterIP Service. The 
operator will discover the port to use and you can optionally provide a `path` 
property if you need to specify a given endpoint to use.
+
+NOTE: this is still available for ClusterIP Service type only.
+
 == Binding with data types
 
 When referencing Kamelets in a binding users may choose from one of the 
supported input/output data types provided by the Kamelet. The supported data 
types are declared on the Kamelet itself and give additional information about 
used header names, content type and content schema.
diff --git a/e2e/common/binding/binding_test.go 
b/e2e/common/binding/binding_test.go
new file mode 100644
index 000000000..ff199d098
--- /dev/null
+++ b/e2e/common/binding/binding_test.go
@@ -0,0 +1,58 @@
+//go:build integration
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> 
Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package common
+
+import (
+       "context"
+       "testing"
+
+       . "github.com/onsi/gomega"
+
+       camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       corev1 "k8s.io/api/core/v1"
+       v1 "k8s.io/api/core/v1"
+
+       . "github.com/apache/camel-k/v2/e2e/support"
+)
+
+func TestServiceTrait(t *testing.T) {
+       t.Parallel()
+       WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) {
+               t.Run("Integration Binding", func(t *testing.T) {
+                       // Not supported when running as a Knative Service as
+                       // the Knative operator creates an external service 
with the same name of the Integration
+                       g.Expect(KamelRun(t, ctx, ns, "hello.yaml", "-t", 
"knative-service.enabled=false").Execute()).To(Succeed())
+                       g.Eventually(IntegrationConditionStatus(t, ctx, ns, 
"hello",
+                               camelv1.IntegrationConditionReady), 
TestTimeoutLong).Should(Equal(v1.ConditionTrue))
+                       g.Eventually(IntegrationPodPhase(t, ctx, ns, 
"hello")).Should(Equal(corev1.PodRunning))
+
+                       ExpectExecSucceed(t, g, Kubectl("apply", "-f", 
"it-binding.yaml", "-n", ns))
+                       g.Eventually(IntegrationConditionStatus(t, ctx, ns, 
"timer-to-hello-it",
+                               camelv1.IntegrationConditionReady), 
TestTimeoutLong).Should(Equal(v1.ConditionTrue))
+                       g.Eventually(IntegrationPodPhase(t, ctx, ns, 
"timer-to-hello-it")).Should(Equal(corev1.PodRunning))
+
+                       g.Eventually(IntegrationLogs(t, ctx, ns, 
"timer-to-hello-it")).Should(
+                               ContainSubstring("Hello from Camel"))
+               })
+       })
+}
diff --git a/e2e/common/binding/hello.yaml b/e2e/common/binding/hello.yaml
new file mode 100644
index 000000000..a227fa020
--- /dev/null
+++ b/e2e/common/binding/hello.yaml
@@ -0,0 +1,26 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+- route:
+    from:
+      uri: platform-http:/hello
+      parameters:
+        httpMethodRestrict: "GET"
+      steps:
+        - setBody:
+            simple: Hello from Camel
+        - log: ${body}
diff --git a/e2e/common/binding/it-binding.yaml 
b/e2e/common/binding/it-binding.yaml
new file mode 100644
index 000000000..612633e8c
--- /dev/null
+++ b/e2e/common/binding/it-binding.yaml
@@ -0,0 +1,33 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: timer-to-hello-it
+spec:
+  source:
+    uri: timer:foo
+  steps:
+  - ref:
+      apiVersion: camel.apache.org/v1
+      kind: Integration
+      name: hello
+    properties:
+      path: hello
+  sink:
+    uri: log:info
\ No newline at end of file
diff --git a/pkg/controller/pipe/initialize_test.go 
b/pkg/controller/pipe/initialize_test.go
index a6637adfa..b6149d057 100644
--- a/pkg/controller/pipe/initialize_test.go
+++ b/pkg/controller/pipe/initialize_test.go
@@ -225,22 +225,6 @@ func templateOrFail(template map[string]interface{}) 
*v1.Template {
 }
 
 func TestNewPipeUnsupportedRef(t *testing.T) {
-       svc := &corev1.Service{
-               TypeMeta: metav1.TypeMeta{
-                       Kind:       "Service",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: metav1.ObjectMeta{
-                       Name:      "my-svc",
-                       Namespace: "ns",
-               },
-               Spec: corev1.ServiceSpec{
-                       Ports: []corev1.ServicePort{},
-                       Selector: map[string]string{
-                               v1.IntegrationLabel: "my-pipe",
-                       },
-               },
-       }
        pipe := &v1.Pipe{
                TypeMeta: metav1.TypeMeta{
                        APIVersion: v1.SchemeGroupVersion.String(),
@@ -256,10 +240,10 @@ func TestNewPipeUnsupportedRef(t *testing.T) {
                        },
                        Sink: v1.Endpoint{
                                Ref: &corev1.ObjectReference{
-                                       APIVersion: svc.APIVersion,
-                                       Kind:       svc.Kind,
-                                       Namespace:  svc.Namespace,
-                                       Name:       svc.Name,
+                                       APIVersion: "my-api-version",
+                                       Kind:       "my-kind",
+                                       Namespace:  "ns",
+                                       Name:       "my-kind-name",
                                },
                        },
                },
@@ -274,15 +258,15 @@ func TestNewPipeUnsupportedRef(t *testing.T) {
        assert.True(t, a.CanHandle(pipe))
        handledPipe, err := a.Handle(context.TODO(), pipe)
        require.Error(t, err)
-       assert.Equal(t, "could not find any suitable binding provider for 
v1/Service my-svc in namespace ns. "+
-               "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" 
\"camel-uri\" \"knative-ref\"]", err.Error())
+       assert.Equal(t, "could not find any suitable binding provider for 
my-api-version/my-kind my-kind-name in namespace ns. "+
+               "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" 
\"service-ref\" \"camel-uri\" \"knative-ref\"]", err.Error())
        assert.Equal(t, v1.PipePhaseError, handledPipe.Status.Phase)
        cond := handledPipe.Status.GetCondition(v1.PipeConditionReady)
        assert.NotNil(t, cond)
        assert.Equal(t, corev1.ConditionFalse, cond.Status)
        assert.Equal(t, "IntegrationError", cond.Reason)
-       assert.Equal(t, "could not find any suitable binding provider for 
v1/Service my-svc in namespace ns. "+
-               "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" 
\"camel-uri\" \"knative-ref\"]", cond.Message)
+       assert.Equal(t, "could not find any suitable binding provider for 
my-api-version/my-kind my-kind-name in namespace ns. "+
+               "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" 
\"service-ref\" \"camel-uri\" \"knative-ref\"]", cond.Message)
 }
 
 func TestNewPipeKnativeURIBinding(t *testing.T) {
diff --git a/pkg/util/bindings/camel_uri_test.go 
b/pkg/util/bindings/camel_uri_test.go
new file mode 100644
index 000000000..98fa40c48
--- /dev/null
+++ b/pkg/util/bindings/camel_uri_test.go
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package bindings
+
+import (
+       "context"
+       "testing"
+
+       camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/v2/pkg/internal"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "k8s.io/utils/ptr"
+)
+
+func TestCamelURI(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       client, err := internal.NewFakeClient()
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+               Profile:   camelv1.TraitProfileKubernetes,
+       }
+
+       endpoint := camelv1.Endpoint{
+               URI: ptr.To("my-component:my-uri"),
+               Properties: asEndpointProperties(map[string]string{
+                       "prop1": "v1",
+                       "prop2": "v2",
+               }),
+       }
+
+       binding, err := CamelURIBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       assert.NotNil(t, binding)
+       assert.Equal(t, "my-component:my-uri?prop1=v1&prop2=v2", binding.URI)
+}
+
+func TestCamelURISkip(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       client, err := internal.NewFakeClient()
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+               Profile:   camelv1.TraitProfileKubernetes,
+       }
+
+       endpoint := camelv1.Endpoint{
+               // Missing URI
+       }
+
+       binding, err := CamelURIBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.Nil(t, binding)
+}
diff --git a/pkg/util/bindings/knative_uri_test.go 
b/pkg/util/bindings/knative_uri_test.go
new file mode 100644
index 000000000..c99414e54
--- /dev/null
+++ b/pkg/util/bindings/knative_uri_test.go
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package bindings
+
+import (
+       "context"
+       "testing"
+
+       camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/v2/pkg/internal"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "k8s.io/utils/ptr"
+)
+
+func TestKnativeURI(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       client, err := internal.NewFakeClient()
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+               Profile:   camelv1.TraitProfileKnative,
+       }
+
+       endpoint := camelv1.Endpoint{
+               URI: ptr.To("http://my-domain";),
+       }
+
+       binding, err := KnativeURIBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.NotNil(t, binding)
+       assert.Equal(t, "knative:endpoint/sink", binding.URI)
+       require.NotNil(t, binding.Traits.Knative)
+       assert.Equal(t,
+               
`{"services":[{"type":"endpoint","name":"sink","url":"http://my-domain","metadata":{"camel.endpoint.kind":"sink","knative.apiVersion":"","knative.kind":"","knative.name":"sink"}}]}`,
+               binding.Traits.Knative.Configuration,
+       )
+}
+
+func TestKnativeURISkip(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       client, err := internal.NewFakeClient()
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+               Profile:   camelv1.TraitProfileKubernetes,
+       }
+
+       endpoint := camelv1.Endpoint{
+               // Missing URI
+       }
+
+       binding, err := KnativeURIBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.Nil(t, binding)
+
+       endpoint.URI = ptr.To("non-http:my")
+       binding, err = KnativeURIBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.Nil(t, binding)
+
+       endpoint.URI = ptr.To("http:my")
+       binding, err = KnativeURIBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSource,
+       }, endpoint)
+       require.NoError(t, err)
+       require.Nil(t, binding)
+}
diff --git a/pkg/util/bindings/service_ref.go b/pkg/util/bindings/service_ref.go
new file mode 100644
index 000000000..3f5a7d0c6
--- /dev/null
+++ b/pkg/util/bindings/service_ref.go
@@ -0,0 +1,102 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package bindings
+
+import (
+       "fmt"
+       "strings"
+
+       v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/v2/pkg/util/kubernetes"
+       corev1 "k8s.io/api/core/v1"
+)
+
+// ServiceRefBindingProvider converts a Service into a Camel http endpoint.
+type ServiceRefBindingProvider struct{}
+
+// ID --.
+func (k ServiceRefBindingProvider) ID() string {
+       return "service-ref"
+}
+
+// Translate will check the endpoint reference being either a Service or an 
Integration and Pipe which have a Service associated.
+func (k ServiceRefBindingProvider) Translate(ctx BindingContext, endpointCtx 
EndpointContext, e v1.Endpoint) (*Binding, error) {
+       if e.Ref == nil || !(isService(e.Ref) || isIntegration(e.Ref) || 
isPipe(e.Ref)) {
+               // works only on Service refs
+               return nil, nil
+       }
+
+       // IMPORTANT: when we have a Service associated to either a Pipe or an 
Integration,
+       // this Service has the same namespace and the same name of the 
resource, so, we can lookup with those info
+       namespace := e.Ref.Namespace
+       if namespace == "" {
+               namespace = ctx.Namespace
+       }
+       svc, err := kubernetes.LookupService(ctx.Ctx, ctx.Client, namespace, 
e.Ref.Name)
+       if err != nil {
+               return nil, err
+       }
+       if svc == nil {
+               return nil, fmt.Errorf("could not load a Service with name %s 
in namespace %s", e.Ref.Name, namespace)
+       }
+       if svc.Spec.Type != corev1.ServiceTypeClusterIP {
+               return nil, fmt.Errorf(
+                       "operator only supports %s Service type, feel free to 
request this support",
+                       corev1.ServiceTypeClusterIP,
+               )
+       }
+
+       svcURI := kubernetes.GetClusterTypeServiceURI(svc)
+       props, err := e.Properties.GetPropertyMap()
+       if err != nil {
+               return nil, err
+       }
+       if props["path"] != "" {
+               if !strings.HasPrefix(props["path"], "/") {
+                       svcURI += "/"
+               }
+               svcURI += props["path"]
+       }
+
+       return &Binding{
+               URI: svcURI,
+       }, nil
+}
+
+func isService(ref *corev1.ObjectReference) bool {
+       return ref.APIVersion == corev1.SchemeGroupVersion.String() && ref.Kind 
== "Service"
+}
+
+func isIntegration(ref *corev1.ObjectReference) bool {
+       return ref.APIVersion == v1.SchemeGroupVersion.String() && ref.Kind == 
v1.IntegrationKind
+}
+
+func isPipe(ref *corev1.ObjectReference) bool {
+       return ref.APIVersion == v1.SchemeGroupVersion.String() && ref.Kind == 
v1.PipeKind
+}
+
+// Order --.
+//
+//nolint:mnd
+func (k ServiceRefBindingProvider) Order() int {
+       return OrderLast - 10
+}
+
+func init() {
+       RegisterBindingProvider(ServiceRefBindingProvider{})
+}
diff --git a/pkg/util/bindings/service_ref_test.go 
b/pkg/util/bindings/service_ref_test.go
new file mode 100644
index 000000000..c949b0ba9
--- /dev/null
+++ b/pkg/util/bindings/service_ref_test.go
@@ -0,0 +1,269 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package bindings
+
+import (
+       "context"
+       "testing"
+
+       camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/v2/pkg/internal"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       corev1 "k8s.io/api/core/v1"
+       v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+func TestServiceRef(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       svc := corev1.Service{
+               ObjectMeta: v1.ObjectMeta{
+                       Name:      "my-svc",
+                       Namespace: "my-svc-ns",
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeClusterIP,
+               },
+       }
+
+       client, err := internal.NewFakeClient(&svc)
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+       }
+
+       endpoint := camelv1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Namespace:  "my-svc-ns",
+                       Name:       "my-svc",
+                       APIVersion: corev1.SchemeGroupVersion.String(),
+                       Kind:       "Service",
+               },
+       }
+
+       binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.NotNil(t, binding)
+       assert.Equal(t, "http://my-svc.my-svc-ns.svc.cluster.local";, 
binding.URI)
+}
+
+func TestServiceRefWithOptions(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       svc := corev1.Service{
+               ObjectMeta: v1.ObjectMeta{
+                       Name:      "my-svc",
+                       Namespace: "my-svc-ns",
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeClusterIP,
+                       Ports: []corev1.ServicePort{
+                               {
+                                       Port: 123,
+                               },
+                       },
+               },
+       }
+
+       client, err := internal.NewFakeClient(&svc)
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+       }
+
+       endpoint := camelv1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Namespace:  "my-svc-ns",
+                       Name:       "my-svc",
+                       APIVersion: corev1.SchemeGroupVersion.String(),
+                       Kind:       "Service",
+               },
+               Properties: asEndpointProperties(map[string]string{"path": 
"/to/my/path"}),
+       }
+
+       binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.NotNil(t, binding)
+       assert.Equal(t, 
"http://my-svc.my-svc-ns.svc.cluster.local:123/to/my/path";, binding.URI)
+}
+
+func TestServiceError(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       svc := corev1.Service{
+               ObjectMeta: v1.ObjectMeta{
+                       Name:      "my-svc-non-cluster-type",
+                       Namespace: "my-svc-ns",
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeExternalName,
+               },
+       }
+
+       client, err := internal.NewFakeClient(&svc)
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+       }
+
+       // Not a Service
+       endpoint := camelv1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Namespace:  "my-svc-ns",
+                       Name:       "my-svc",
+                       APIVersion: corev1.SchemeGroupVersion.String(),
+                       Kind:       "NotAService",
+               },
+       }
+
+       binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.Nil(t, binding)
+
+       // Service Missing
+       endpoint = camelv1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Namespace:  "my-svc-ns",
+                       Name:       "my-svc-missing",
+                       APIVersion: corev1.SchemeGroupVersion.String(),
+                       Kind:       "Service",
+               },
+       }
+
+       _, err = ServiceRefBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.Error(t, err)
+       assert.Equal(t, "could not load a Service with name my-svc-missing in 
namespace my-svc-ns", err.Error())
+
+       // Service Not Cluster type
+       endpoint = camelv1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Namespace:  "my-svc-ns",
+                       Name:       "my-svc-non-cluster-type",
+                       APIVersion: corev1.SchemeGroupVersion.String(),
+                       Kind:       "Service",
+               },
+       }
+
+       _, err = ServiceRefBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.Error(t, err)
+       assert.Equal(t, "operator only supports ClusterIP Service type, feel 
free to request this support", err.Error())
+}
+
+func TestIntegrationRef(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       svc := corev1.Service{
+               ObjectMeta: v1.ObjectMeta{
+                       Name:      "my-it",
+                       Namespace: "my-it-ns",
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeClusterIP,
+               },
+       }
+
+       client, err := internal.NewFakeClient(&svc)
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+       }
+
+       endpoint := camelv1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Namespace:  "my-it-ns",
+                       Name:       "my-it",
+                       APIVersion: camelv1.SchemeGroupVersion.String(),
+                       Kind:       "Integration",
+               },
+       }
+
+       binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.NotNil(t, binding)
+       assert.Equal(t, "http://my-it.my-it-ns.svc.cluster.local";, binding.URI)
+}
+
+func TestPipeRef(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       svc := corev1.Service{
+               ObjectMeta: v1.ObjectMeta{
+                       Name:      "my-pipe",
+                       Namespace: "my-pipe-ns",
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeClusterIP,
+               },
+       }
+
+       client, err := internal.NewFakeClient(&svc)
+       require.NoError(t, err)
+
+       bindingContext := BindingContext{
+               Ctx:       ctx,
+               Client:    client,
+               Namespace: "test",
+       }
+
+       endpoint := camelv1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Namespace:  "my-pipe-ns",
+                       Name:       "my-pipe",
+                       APIVersion: camelv1.SchemeGroupVersion.String(),
+                       Kind:       "Pipe",
+               },
+       }
+
+       binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, 
EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       require.NotNil(t, binding)
+       assert.Equal(t, "http://my-pipe.my-pipe-ns.svc.cluster.local";, 
binding.URI)
+}
diff --git a/pkg/util/kubernetes/lookup.go b/pkg/util/kubernetes/lookup.go
index 474c8f1f2..228f69ee2 100644
--- a/pkg/util/kubernetes/lookup.go
+++ b/pkg/util/kubernetes/lookup.go
@@ -224,3 +224,27 @@ func LookupRoleBinding(ctx context.Context, c 
client.Client, ns string, name str
        }
        return &rb, nil
 }
+
+// LookupService will look for any k8s Service with a given name in a given 
namespace.
+func LookupService(ctx context.Context, c client.Client, ns string, name 
string) (*corev1.Service, error) {
+       svc := corev1.Service{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Service",
+                       APIVersion: corev1.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: ns,
+                       Name:      name,
+               },
+       }
+       key := ctrl.ObjectKey{
+               Namespace: ns,
+               Name:      name,
+       }
+       if err := c.Get(ctx, key, &svc); err != nil && 
k8serrors.IsNotFound(err) {
+               return nil, nil
+       } else if err != nil {
+               return nil, err
+       }
+       return &svc, nil
+}
diff --git a/pkg/util/kubernetes/service.go b/pkg/util/kubernetes/service.go
new file mode 100644
index 000000000..0364f10e2
--- /dev/null
+++ b/pkg/util/kubernetes/service.go
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kubernetes
+
+import (
+       "fmt"
+
+       corev1 "k8s.io/api/core/v1"
+)
+
+// GetClusterTypeServiceURI will return the URL of the Service in the cluster 
type format.
+func GetClusterTypeServiceURI(svc *corev1.Service) string {
+       url := fmt.Sprintf("http://%s.%s.svc.cluster.local";, svc.Name, 
svc.Namespace)
+loop:
+       for _, port := range svc.Spec.Ports {
+               if port.Port != 80 { // Assuming HTTP default port
+                       url += fmt.Sprintf(":%d", port.Port)
+                       break loop
+               }
+       }
+
+       return url
+}

Reply via email to