This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push: new a51d0ee59 feat(pipe): bind to Integration, Pipe, Services a51d0ee59 is described below commit a51d0ee599031d1b5651f4425dbaa8dd19ea6e80 Author: Pasquale Congiusti <pasquale.congiu...@gmail.com> AuthorDate: Tue Apr 22 08:40:02 2025 +0200 feat(pipe): bind to Integration, Pipe, Services Closes #5935 --- docs/modules/ROOT/pages/concepts/overview.adoc | 2 +- docs/modules/ROOT/pages/pipes/pipes.adoc | 27 +++ e2e/common/binding/binding_test.go | 58 ++++++ e2e/common/binding/hello.yaml | 26 +++ e2e/common/binding/it-binding.yaml | 33 +++ pkg/controller/pipe/initialize_test.go | 32 +-- pkg/util/bindings/camel_uri_test.go | 85 ++++++++ pkg/util/bindings/knative_uri_test.go | 100 +++++++++ pkg/util/bindings/service_ref.go | 102 ++++++++++ pkg/util/bindings/service_ref_test.go | 269 +++++++++++++++++++++++++ pkg/util/kubernetes/lookup.go | 24 +++ pkg/util/kubernetes/service.go | 38 ++++ 12 files changed, 771 insertions(+), 25 deletions(-) diff --git a/docs/modules/ROOT/pages/concepts/overview.adoc b/docs/modules/ROOT/pages/concepts/overview.adoc index 7dadbc2c0..c4d716206 100644 --- a/docs/modules/ROOT/pages/concepts/overview.adoc +++ b/docs/modules/ROOT/pages/concepts/overview.adoc @@ -32,6 +32,6 @@ image::concepts/pipes.png[Camel connector lifecycle, width=640] **Pipe**: it is used to create the connector binding an event source to an event sink. -**ObjectReference**: this is the reference to any Kubernetes object. The operator is able to transform any Camel URI, Kamelet, Strimzi Kafka resource or Knative resource. +**ObjectReference**: this is the reference to any Kubernetes object. The operator is able to transform any Camel URI, Kamelet, Strimzi Kafka resource, Knative resource, Service, Integration or Pipe. **Integration**: it is created from the Pipe translating the source and sinks into a Camel route. diff --git a/docs/modules/ROOT/pages/pipes/pipes.adoc b/docs/modules/ROOT/pages/pipes/pipes.adoc index 102dc7452..514d9bf88 100644 --- a/docs/modules/ROOT/pages/pipes/pipes.adoc +++ b/docs/modules/ROOT/pages/pipes/pipes.adoc @@ -355,6 +355,33 @@ This Pipe explicitly defines an URI where data is going to be pushed. NOTE: the `uri` option is also conventionally used in Knative to specify a non-kubernetes destination. To comply with the Knative specifications, in case an "http" or "https" URI is used, Camel will send https://cloudevents.io/[CloudEvents] to the destination. +=== Binding to a Service, Integration or Pipe + +You can in general connect any Kubernetes Service or any Camel Integration or Pipe which have a Service associated. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: source-to-service +spec: + source: + ... + sink: + ref: + apiVersion: v1 + kind: Service + name: my-svc + namespace: my-svc-ns + properties: + path: /path/to/my/service (optional) +---- + +The operator will translate to the related URL. The same mechanism works using `apiVersion:camel.apache.org/v1` and `kind:Integration` or `kind:Pipe` types, assuming these Integrations are exposing any kind of ClusterIP Service. The operator will discover the port to use and you can optionally provide a `path` property if you need to specify a given endpoint to use. + +NOTE: this is still available for ClusterIP Service type only. + == Binding with data types When referencing Kamelets in a binding users may choose from one of the supported input/output data types provided by the Kamelet. The supported data types are declared on the Kamelet itself and give additional information about used header names, content type and content schema. diff --git a/e2e/common/binding/binding_test.go b/e2e/common/binding/binding_test.go new file mode 100644 index 000000000..ff199d098 --- /dev/null +++ b/e2e/common/binding/binding_test.go @@ -0,0 +1,58 @@ +//go:build integration +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" + + camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + + . "github.com/apache/camel-k/v2/e2e/support" +) + +func TestServiceTrait(t *testing.T) { + t.Parallel() + WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { + t.Run("Integration Binding", func(t *testing.T) { + // Not supported when running as a Knative Service as + // the Knative operator creates an external service with the same name of the Integration + g.Expect(KamelRun(t, ctx, ns, "hello.yaml", "-t", "knative-service.enabled=false").Execute()).To(Succeed()) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "hello", + camelv1.IntegrationConditionReady), TestTimeoutLong).Should(Equal(v1.ConditionTrue)) + g.Eventually(IntegrationPodPhase(t, ctx, ns, "hello")).Should(Equal(corev1.PodRunning)) + + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "it-binding.yaml", "-n", ns)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "timer-to-hello-it", + camelv1.IntegrationConditionReady), TestTimeoutLong).Should(Equal(v1.ConditionTrue)) + g.Eventually(IntegrationPodPhase(t, ctx, ns, "timer-to-hello-it")).Should(Equal(corev1.PodRunning)) + + g.Eventually(IntegrationLogs(t, ctx, ns, "timer-to-hello-it")).Should( + ContainSubstring("Hello from Camel")) + }) + }) +} diff --git a/e2e/common/binding/hello.yaml b/e2e/common/binding/hello.yaml new file mode 100644 index 000000000..a227fa020 --- /dev/null +++ b/e2e/common/binding/hello.yaml @@ -0,0 +1,26 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +- route: + from: + uri: platform-http:/hello + parameters: + httpMethodRestrict: "GET" + steps: + - setBody: + simple: Hello from Camel + - log: ${body} diff --git a/e2e/common/binding/it-binding.yaml b/e2e/common/binding/it-binding.yaml new file mode 100644 index 000000000..612633e8c --- /dev/null +++ b/e2e/common/binding/it-binding.yaml @@ -0,0 +1,33 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: timer-to-hello-it +spec: + source: + uri: timer:foo + steps: + - ref: + apiVersion: camel.apache.org/v1 + kind: Integration + name: hello + properties: + path: hello + sink: + uri: log:info \ No newline at end of file diff --git a/pkg/controller/pipe/initialize_test.go b/pkg/controller/pipe/initialize_test.go index a6637adfa..b6149d057 100644 --- a/pkg/controller/pipe/initialize_test.go +++ b/pkg/controller/pipe/initialize_test.go @@ -225,22 +225,6 @@ func templateOrFail(template map[string]interface{}) *v1.Template { } func TestNewPipeUnsupportedRef(t *testing.T) { - svc := &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "my-svc", - Namespace: "ns", - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{}, - Selector: map[string]string{ - v1.IntegrationLabel: "my-pipe", - }, - }, - } pipe := &v1.Pipe{ TypeMeta: metav1.TypeMeta{ APIVersion: v1.SchemeGroupVersion.String(), @@ -256,10 +240,10 @@ func TestNewPipeUnsupportedRef(t *testing.T) { }, Sink: v1.Endpoint{ Ref: &corev1.ObjectReference{ - APIVersion: svc.APIVersion, - Kind: svc.Kind, - Namespace: svc.Namespace, - Name: svc.Name, + APIVersion: "my-api-version", + Kind: "my-kind", + Namespace: "ns", + Name: "my-kind-name", }, }, }, @@ -274,15 +258,15 @@ func TestNewPipeUnsupportedRef(t *testing.T) { assert.True(t, a.CanHandle(pipe)) handledPipe, err := a.Handle(context.TODO(), pipe) require.Error(t, err) - assert.Equal(t, "could not find any suitable binding provider for v1/Service my-svc in namespace ns. "+ - "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" \"camel-uri\" \"knative-ref\"]", err.Error()) + assert.Equal(t, "could not find any suitable binding provider for my-api-version/my-kind my-kind-name in namespace ns. "+ + "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" \"service-ref\" \"camel-uri\" \"knative-ref\"]", err.Error()) assert.Equal(t, v1.PipePhaseError, handledPipe.Status.Phase) cond := handledPipe.Status.GetCondition(v1.PipeConditionReady) assert.NotNil(t, cond) assert.Equal(t, corev1.ConditionFalse, cond.Status) assert.Equal(t, "IntegrationError", cond.Reason) - assert.Equal(t, "could not find any suitable binding provider for v1/Service my-svc in namespace ns. "+ - "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" \"camel-uri\" \"knative-ref\"]", cond.Message) + assert.Equal(t, "could not find any suitable binding provider for my-api-version/my-kind my-kind-name in namespace ns. "+ + "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" \"service-ref\" \"camel-uri\" \"knative-ref\"]", cond.Message) } func TestNewPipeKnativeURIBinding(t *testing.T) { diff --git a/pkg/util/bindings/camel_uri_test.go b/pkg/util/bindings/camel_uri_test.go new file mode 100644 index 000000000..98fa40c48 --- /dev/null +++ b/pkg/util/bindings/camel_uri_test.go @@ -0,0 +1,85 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bindings + +import ( + "context" + "testing" + + camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/internal" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/utils/ptr" +) + +func TestCamelURI(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := internal.NewFakeClient() + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: camelv1.TraitProfileKubernetes, + } + + endpoint := camelv1.Endpoint{ + URI: ptr.To("my-component:my-uri"), + Properties: asEndpointProperties(map[string]string{ + "prop1": "v1", + "prop2": "v2", + }), + } + + binding, err := CamelURIBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + assert.NotNil(t, binding) + assert.Equal(t, "my-component:my-uri?prop1=v1&prop2=v2", binding.URI) +} + +func TestCamelURISkip(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := internal.NewFakeClient() + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: camelv1.TraitProfileKubernetes, + } + + endpoint := camelv1.Endpoint{ + // Missing URI + } + + binding, err := CamelURIBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.Nil(t, binding) +} diff --git a/pkg/util/bindings/knative_uri_test.go b/pkg/util/bindings/knative_uri_test.go new file mode 100644 index 000000000..c99414e54 --- /dev/null +++ b/pkg/util/bindings/knative_uri_test.go @@ -0,0 +1,100 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bindings + +import ( + "context" + "testing" + + camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/internal" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/utils/ptr" +) + +func TestKnativeURI(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := internal.NewFakeClient() + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: camelv1.TraitProfileKnative, + } + + endpoint := camelv1.Endpoint{ + URI: ptr.To("http://my-domain"), + } + + binding, err := KnativeURIBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.NotNil(t, binding) + assert.Equal(t, "knative:endpoint/sink", binding.URI) + require.NotNil(t, binding.Traits.Knative) + assert.Equal(t, + `{"services":[{"type":"endpoint","name":"sink","url":"http://my-domain","metadata":{"camel.endpoint.kind":"sink","knative.apiVersion":"","knative.kind":"","knative.name":"sink"}}]}`, + binding.Traits.Knative.Configuration, + ) +} + +func TestKnativeURISkip(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := internal.NewFakeClient() + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: camelv1.TraitProfileKubernetes, + } + + endpoint := camelv1.Endpoint{ + // Missing URI + } + + binding, err := KnativeURIBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.Nil(t, binding) + + endpoint.URI = ptr.To("non-http:my") + binding, err = KnativeURIBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.Nil(t, binding) + + endpoint.URI = ptr.To("http:my") + binding, err = KnativeURIBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSource, + }, endpoint) + require.NoError(t, err) + require.Nil(t, binding) +} diff --git a/pkg/util/bindings/service_ref.go b/pkg/util/bindings/service_ref.go new file mode 100644 index 000000000..3f5a7d0c6 --- /dev/null +++ b/pkg/util/bindings/service_ref.go @@ -0,0 +1,102 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bindings + +import ( + "fmt" + "strings" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/util/kubernetes" + corev1 "k8s.io/api/core/v1" +) + +// ServiceRefBindingProvider converts a Service into a Camel http endpoint. +type ServiceRefBindingProvider struct{} + +// ID --. +func (k ServiceRefBindingProvider) ID() string { + return "service-ref" +} + +// Translate will check the endpoint reference being either a Service or an Integration and Pipe which have a Service associated. +func (k ServiceRefBindingProvider) Translate(ctx BindingContext, endpointCtx EndpointContext, e v1.Endpoint) (*Binding, error) { + if e.Ref == nil || !(isService(e.Ref) || isIntegration(e.Ref) || isPipe(e.Ref)) { + // works only on Service refs + return nil, nil + } + + // IMPORTANT: when we have a Service associated to either a Pipe or an Integration, + // this Service has the same namespace and the same name of the resource, so, we can lookup with those info + namespace := e.Ref.Namespace + if namespace == "" { + namespace = ctx.Namespace + } + svc, err := kubernetes.LookupService(ctx.Ctx, ctx.Client, namespace, e.Ref.Name) + if err != nil { + return nil, err + } + if svc == nil { + return nil, fmt.Errorf("could not load a Service with name %s in namespace %s", e.Ref.Name, namespace) + } + if svc.Spec.Type != corev1.ServiceTypeClusterIP { + return nil, fmt.Errorf( + "operator only supports %s Service type, feel free to request this support", + corev1.ServiceTypeClusterIP, + ) + } + + svcURI := kubernetes.GetClusterTypeServiceURI(svc) + props, err := e.Properties.GetPropertyMap() + if err != nil { + return nil, err + } + if props["path"] != "" { + if !strings.HasPrefix(props["path"], "/") { + svcURI += "/" + } + svcURI += props["path"] + } + + return &Binding{ + URI: svcURI, + }, nil +} + +func isService(ref *corev1.ObjectReference) bool { + return ref.APIVersion == corev1.SchemeGroupVersion.String() && ref.Kind == "Service" +} + +func isIntegration(ref *corev1.ObjectReference) bool { + return ref.APIVersion == v1.SchemeGroupVersion.String() && ref.Kind == v1.IntegrationKind +} + +func isPipe(ref *corev1.ObjectReference) bool { + return ref.APIVersion == v1.SchemeGroupVersion.String() && ref.Kind == v1.PipeKind +} + +// Order --. +// +//nolint:mnd +func (k ServiceRefBindingProvider) Order() int { + return OrderLast - 10 +} + +func init() { + RegisterBindingProvider(ServiceRefBindingProvider{}) +} diff --git a/pkg/util/bindings/service_ref_test.go b/pkg/util/bindings/service_ref_test.go new file mode 100644 index 000000000..c949b0ba9 --- /dev/null +++ b/pkg/util/bindings/service_ref_test.go @@ -0,0 +1,269 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bindings + +import ( + "context" + "testing" + + camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/internal" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestServiceRef(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-svc-ns", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + } + + client, err := internal.NewFakeClient(&svc) + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + } + + endpoint := camelv1.Endpoint{ + Ref: &corev1.ObjectReference{ + Namespace: "my-svc-ns", + Name: "my-svc", + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Service", + }, + } + + binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.NotNil(t, binding) + assert.Equal(t, "http://my-svc.my-svc-ns.svc.cluster.local", binding.URI) +} + +func TestServiceRefWithOptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "my-svc", + Namespace: "my-svc-ns", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{ + { + Port: 123, + }, + }, + }, + } + + client, err := internal.NewFakeClient(&svc) + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + } + + endpoint := camelv1.Endpoint{ + Ref: &corev1.ObjectReference{ + Namespace: "my-svc-ns", + Name: "my-svc", + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Service", + }, + Properties: asEndpointProperties(map[string]string{"path": "/to/my/path"}), + } + + binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.NotNil(t, binding) + assert.Equal(t, "http://my-svc.my-svc-ns.svc.cluster.local:123/to/my/path", binding.URI) +} + +func TestServiceError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "my-svc-non-cluster-type", + Namespace: "my-svc-ns", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeExternalName, + }, + } + + client, err := internal.NewFakeClient(&svc) + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + } + + // Not a Service + endpoint := camelv1.Endpoint{ + Ref: &corev1.ObjectReference{ + Namespace: "my-svc-ns", + Name: "my-svc", + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "NotAService", + }, + } + + binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.Nil(t, binding) + + // Service Missing + endpoint = camelv1.Endpoint{ + Ref: &corev1.ObjectReference{ + Namespace: "my-svc-ns", + Name: "my-svc-missing", + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Service", + }, + } + + _, err = ServiceRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.Error(t, err) + assert.Equal(t, "could not load a Service with name my-svc-missing in namespace my-svc-ns", err.Error()) + + // Service Not Cluster type + endpoint = camelv1.Endpoint{ + Ref: &corev1.ObjectReference{ + Namespace: "my-svc-ns", + Name: "my-svc-non-cluster-type", + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Service", + }, + } + + _, err = ServiceRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.Error(t, err) + assert.Equal(t, "operator only supports ClusterIP Service type, feel free to request this support", err.Error()) +} + +func TestIntegrationRef(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "my-it", + Namespace: "my-it-ns", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + } + + client, err := internal.NewFakeClient(&svc) + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + } + + endpoint := camelv1.Endpoint{ + Ref: &corev1.ObjectReference{ + Namespace: "my-it-ns", + Name: "my-it", + APIVersion: camelv1.SchemeGroupVersion.String(), + Kind: "Integration", + }, + } + + binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.NotNil(t, binding) + assert.Equal(t, "http://my-it.my-it-ns.svc.cluster.local", binding.URI) +} + +func TestPipeRef(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + svc := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "my-pipe", + Namespace: "my-pipe-ns", + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + }, + } + + client, err := internal.NewFakeClient(&svc) + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + } + + endpoint := camelv1.Endpoint{ + Ref: &corev1.ObjectReference{ + Namespace: "my-pipe-ns", + Name: "my-pipe", + APIVersion: camelv1.SchemeGroupVersion.String(), + Kind: "Pipe", + }, + } + + binding, err := ServiceRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSink, + }, endpoint) + require.NoError(t, err) + require.NotNil(t, binding) + assert.Equal(t, "http://my-pipe.my-pipe-ns.svc.cluster.local", binding.URI) +} diff --git a/pkg/util/kubernetes/lookup.go b/pkg/util/kubernetes/lookup.go index 474c8f1f2..228f69ee2 100644 --- a/pkg/util/kubernetes/lookup.go +++ b/pkg/util/kubernetes/lookup.go @@ -224,3 +224,27 @@ func LookupRoleBinding(ctx context.Context, c client.Client, ns string, name str } return &rb, nil } + +// LookupService will look for any k8s Service with a given name in a given namespace. +func LookupService(ctx context.Context, c client.Client, ns string, name string) (*corev1.Service, error) { + svc := corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: corev1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + } + key := ctrl.ObjectKey{ + Namespace: ns, + Name: name, + } + if err := c.Get(ctx, key, &svc); err != nil && k8serrors.IsNotFound(err) { + return nil, nil + } else if err != nil { + return nil, err + } + return &svc, nil +} diff --git a/pkg/util/kubernetes/service.go b/pkg/util/kubernetes/service.go new file mode 100644 index 000000000..0364f10e2 --- /dev/null +++ b/pkg/util/kubernetes/service.go @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" +) + +// GetClusterTypeServiceURI will return the URL of the Service in the cluster type format. +func GetClusterTypeServiceURI(svc *corev1.Service) string { + url := fmt.Sprintf("http://%s.%s.svc.cluster.local", svc.Name, svc.Namespace) +loop: + for _, port := range svc.Spec.Ports { + if port.Port != 80 { // Assuming HTTP default port + url += fmt.Sprintf(":%d", port.Port) + break loop + } + } + + return url +}