This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push: new 18d8a6863 chore(addon): move strimzi into core 18d8a6863 is described below commit 18d8a68637d640737d5a12cb0bd0179814fdcf8e Author: Pasquale Congiusti <pasquale.congiu...@gmail.com> AuthorDate: Sun Oct 27 15:41:12 2024 +0100 chore(addon): move strimzi into core Ref #5787 --- .github/actions/automatic-updates/action.yml | 1 - .github/actions/build/action.yml | 2 - .github/workflows/kafka.yml | 85 ++++++++ addons/register_strimzi.go | 28 --- .../kafka/files/kafka-to-log.yaml | 45 ++--- .../kafka/files/kafkatopic-to-log.yaml | 43 ++-- .../kafka/files/timer-to-kafka.yaml | 45 ++--- .../kafka/files/timer-to-kafkatopic.yaml | 43 ++-- e2e/kafka/kafka_binding_test.go | 67 +++++++ e2e/kafka/setup/kafka-ephemeral.yaml | 50 +++++ .../kafka/setup/kafka-topic.yaml | 43 ++-- e2e/kafka/setup/setup.sh | 36 ++++ .../apis}/addtoscheme_strimzi_duck_v1beta2.go | 7 +- .../duck => pkg/apis/duck/strimzi}/v1beta2/doc.go | 0 .../apis/duck/strimzi}/v1beta2/duck_types.go | 0 .../apis/duck/strimzi}/v1beta2/register.go | 0 .../duck/strimzi}/v1beta2/zz_generated.deepcopy.go | 0 .../clientset}/internalclientset/clientset.go | 2 +- .../internalclientset/fake/clientset_generated.go | 6 +- .../clientset}/internalclientset/fake/doc.go | 0 .../clientset}/internalclientset/fake/register.go | 2 +- .../clientset}/internalclientset/scheme/doc.go | 0 .../internalclientset/scheme/register.go | 2 +- .../typed/strimzi}/v1beta2/doc.go | 0 .../typed/strimzi}/v1beta2/fake/doc.go | 0 .../typed/strimzi}/v1beta2/fake/fake_kafka.go | 2 +- .../typed/strimzi}/v1beta2/fake/fake_kafkatopic.go | 2 +- .../strimzi/v1beta2/fake/fake_strimzi_client.go | 2 +- .../typed/strimzi}/v1beta2/generated_expansion.go | 0 .../typed/strimzi}/v1beta2/kafka.go | 4 +- .../typed/strimzi}/v1beta2/kafkatopic.go | 4 +- .../typed/strimzi/v1beta2/strimzi_client.go | 4 +- pkg/controller/pipe/initialize_test.go | 218 ++++++++++++++++++++- pkg/util/bindings/knative_uri.go | 1 - {addons/strimzi => pkg/util/bindings}/strimzi.go | 53 ++--- .../strimzi => pkg/util/bindings}/strimzi_test.go | 46 ++--- script/Makefile | 16 +- script/gen_client.sh | 9 +- 38 files changed, 615 insertions(+), 253 deletions(-) diff --git a/.github/actions/automatic-updates/action.yml b/.github/actions/automatic-updates/action.yml index 00785084d..12f75ae1b 100644 --- a/.github/actions/automatic-updates/action.yml +++ b/.github/actions/automatic-updates/action.yml @@ -64,7 +64,6 @@ runs: # Remove mock and generated code from account grep -v "github.com/apache/camel-k/v2/pkg/client" coverage.out \ | grep -v "zz_generated" \ - | grep -v "github.com/apache/camel-k/v2/addons/strimzi/duck/" \ | grep -v "github.com/apache/camel-k/v2/addons/keda/duck/" \ | grep -v "github.com/apache/camel-k/v2/cmd/util" > coverage.mod.out go tool cover -func=coverage.mod.out -o=coverage.mod.out diff --git a/.github/actions/build/action.yml b/.github/actions/build/action.yml index 8b9de8660..d44b33eac 100644 --- a/.github/actions/build/action.yml +++ b/.github/actions/build/action.yml @@ -56,7 +56,6 @@ runs: # Remove mock and generated code from account grep -v "github.com/apache/camel-k/v2/pkg/client" coverage.out \ | grep -v "zz_generated" \ - | grep -v "github.com/apache/camel-k/v2/addons/strimzi/duck/" \ | grep -v "github.com/apache/camel-k/v2/addons/keda/duck/" \ | grep -v "github.com/apache/camel-k/v2/cmd/util" > coverage.mod.out go tool cover -func=coverage.mod.out -o=coverage.mod.out @@ -79,7 +78,6 @@ runs: # Remove mock and generated code from account grep -v "github.com/apache/camel-k/v2/pkg/client" coverage.out \ | grep -v "zz_generated" \ - | grep -v "github.com/apache/camel-k/v2/addons/strimzi/duck/" \ | grep -v "github.com/apache/camel-k/v2/addons/keda/duck/" \ | grep -v "github.com/apache/camel-k/v2/cmd/util" > coverage.mod.out go tool cover -func=coverage.mod.out -o=coverage.mod.out diff --git a/.github/workflows/kafka.yml b/.github/workflows/kafka.yml new file mode 100644 index 000000000..43bc16702 --- /dev/null +++ b/.github/workflows/kafka.yml @@ -0,0 +1,85 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +name: kafka + +env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + +on: + pull_request: + branches: + - main + - "release-*" + paths-ignore: + - 'docs/**' + - 'java/**' + - 'proposals/**' + - '**.adoc' + - '**.md' + - 'KEYS' + - 'LICENSE' + - 'NOTICE' + push: + branches: + - main + - "release-*" + paths-ignore: + - 'docs/**' + - 'java/**' + - 'proposals/**' + - '**.adoc' + - '**.md' + - 'KEYS' + - 'LICENSE' + - 'NOTICE' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }} + cancel-in-progress: true + +jobs: + kafka-test: + runs-on: ubuntu-latest + steps: + + - name: Checkout code + uses: actions/checkout@v4 + with: + persist-credentials: false + submodules: recursive + + - name: Infra setting + uses: ./.github/actions/infra-setting + + - name: Install Kafka + shell: bash + run: | + ./e2e/kafka/setup/setup.sh + + - name: Install operator + shell: bash + run: | + kubectl create ns camel-k + make install-k8s-global + kubectl wait --for=jsonpath='{.status.phase}'=Ready itp camel-k -n camel-k --timeout=60s + + - name: Run test + shell: bash + run: | + DO_TEST_PREBUILD=false GOTESTFMT="-json 2>&1 | gotestfmt" make test-kafka diff --git a/addons/register_strimzi.go b/addons/register_strimzi.go deleted file mode 100644 index 3d9922f9d..000000000 --- a/addons/register_strimzi.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package addons - -import ( - "github.com/apache/camel-k/v2/addons/strimzi" - "github.com/apache/camel-k/v2/pkg/util/bindings" -) - -func init() { - bindings.RegisterBindingProvider(strimzi.BindingProvider{}) - bindings.V1alpha1RegisterBindingProvider(strimzi.V1alpha1BindingProvider{}) -} diff --git a/script/gen_client_strimzi.sh b/e2e/kafka/files/kafka-to-log.yaml old mode 100755 new mode 100644 similarity index 55% copy from script/gen_client_strimzi.sh copy to e2e/kafka/files/kafka-to-log.yaml index 70eceba40..359b32f97 --- a/script/gen_client_strimzi.sh +++ b/e2e/kafka/files/kafka-to-log.yaml @@ -1,5 +1,4 @@ -#!/bin/sh - +# --------------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -7,34 +6,26 @@ # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -set -e - -location=$(dirname $0) -rootdir=$location/.. - -GO111MODULE=on - -echo "Generating Go client code for Strimzi addon..." - -cd $rootdir - -$(go env GOPATH)/bin/client-gen \ - -h script/headers/default.txt \ - --input duck/v1beta2 \ - --input-base=github.com/apache/camel-k/v2/addons/strimzi \ - --output-base=. \ - --output-package=github.com/apache/camel-k/v2/addons/strimzi/duck/client - -rm -r ./addons/strimzi/duck/client || true - -mv github.com/apache/camel-k/v2/addons/strimzi/duck/client ./addons/strimzi/duck/ - -rm -r ./github.com +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: kafka-to-log + namespace: kafka +spec: + sink: + uri: log:info + source: + ref: + kind: Kafka + apiVersion: kafka.strimzi.io/v1beta2 + name: my-cluster + properties: + topic: my-topic-autogen diff --git a/script/gen_client_strimzi.sh b/e2e/kafka/files/kafkatopic-to-log.yaml old mode 100755 new mode 100644 similarity index 55% copy from script/gen_client_strimzi.sh copy to e2e/kafka/files/kafkatopic-to-log.yaml index 70eceba40..f2b42b6bc --- a/script/gen_client_strimzi.sh +++ b/e2e/kafka/files/kafkatopic-to-log.yaml @@ -1,5 +1,4 @@ -#!/bin/sh - +# --------------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -7,34 +6,24 @@ # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -set -e - -location=$(dirname $0) -rootdir=$location/.. - -GO111MODULE=on - -echo "Generating Go client code for Strimzi addon..." - -cd $rootdir - -$(go env GOPATH)/bin/client-gen \ - -h script/headers/default.txt \ - --input duck/v1beta2 \ - --input-base=github.com/apache/camel-k/v2/addons/strimzi \ - --output-base=. \ - --output-package=github.com/apache/camel-k/v2/addons/strimzi/duck/client - -rm -r ./addons/strimzi/duck/client || true - -mv github.com/apache/camel-k/v2/addons/strimzi/duck/client ./addons/strimzi/duck/ - -rm -r ./github.com +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: kafkatopic-to-log + namespace: kafka +spec: + sink: + uri: log:info + source: + ref: + kind: KafkaTopic + apiVersion: kafka.strimzi.io/v1beta2 + name: my-topic diff --git a/script/gen_client_strimzi.sh b/e2e/kafka/files/timer-to-kafka.yaml old mode 100755 new mode 100644 similarity index 55% copy from script/gen_client_strimzi.sh copy to e2e/kafka/files/timer-to-kafka.yaml index 70eceba40..04436373f --- a/script/gen_client_strimzi.sh +++ b/e2e/kafka/files/timer-to-kafka.yaml @@ -1,5 +1,4 @@ -#!/bin/sh - +# --------------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -7,34 +6,26 @@ # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -set -e - -location=$(dirname $0) -rootdir=$location/.. - -GO111MODULE=on - -echo "Generating Go client code for Strimzi addon..." - -cd $rootdir - -$(go env GOPATH)/bin/client-gen \ - -h script/headers/default.txt \ - --input duck/v1beta2 \ - --input-base=github.com/apache/camel-k/v2/addons/strimzi \ - --output-base=. \ - --output-package=github.com/apache/camel-k/v2/addons/strimzi/duck/client - -rm -r ./addons/strimzi/duck/client || true - -mv github.com/apache/camel-k/v2/addons/strimzi/duck/client ./addons/strimzi/duck/ - -rm -r ./github.com +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: timer-to-kafka + namespace: kafka +spec: + source: + uri: timer:foo + sink: + ref: + kind: Kafka + apiVersion: kafka.strimzi.io/v1beta2 + name: my-cluster + properties: + topic: my-topic-autogen diff --git a/script/gen_client_strimzi.sh b/e2e/kafka/files/timer-to-kafkatopic.yaml old mode 100755 new mode 100644 similarity index 55% copy from script/gen_client_strimzi.sh copy to e2e/kafka/files/timer-to-kafkatopic.yaml index 70eceba40..a2d0e7294 --- a/script/gen_client_strimzi.sh +++ b/e2e/kafka/files/timer-to-kafkatopic.yaml @@ -1,5 +1,4 @@ -#!/bin/sh - +# --------------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -7,34 +6,24 @@ # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -set -e - -location=$(dirname $0) -rootdir=$location/.. - -GO111MODULE=on - -echo "Generating Go client code for Strimzi addon..." - -cd $rootdir - -$(go env GOPATH)/bin/client-gen \ - -h script/headers/default.txt \ - --input duck/v1beta2 \ - --input-base=github.com/apache/camel-k/v2/addons/strimzi \ - --output-base=. \ - --output-package=github.com/apache/camel-k/v2/addons/strimzi/duck/client - -rm -r ./addons/strimzi/duck/client || true - -mv github.com/apache/camel-k/v2/addons/strimzi/duck/client ./addons/strimzi/duck/ - -rm -r ./github.com +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: timer-to-kafkatopic + namespace: kafka +spec: + source: + uri: timer:foo + sink: + ref: + kind: KafkaTopic + apiVersion: kafka.strimzi.io/v1beta2 + name: my-topic diff --git a/e2e/kafka/kafka_binding_test.go b/e2e/kafka/kafka_binding_test.go new file mode 100644 index 000000000..1542a8028 --- /dev/null +++ b/e2e/kafka/kafka_binding_test.go @@ -0,0 +1,67 @@ +//go:build integration +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + + . "github.com/apache/camel-k/v2/e2e/support" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" +) + +func TestKafka(t *testing.T) { + WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { + // NOTE: all resources are local to kafka namespace + t.Run("Strimzi Kafka resource", func(t *testing.T) { + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/timer-to-kafka.yaml")) + // Wait for the readiness of the Integration + g.Eventually(IntegrationConditionStatus(t, ctx, "kafka", "timer-to-kafka", v1.IntegrationConditionReady), TestTimeoutMedium). + Should(Equal(corev1.ConditionTrue)) + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/kafka-to-log.yaml")) + g.Eventually(IntegrationConditionStatus(t, ctx, "kafka", "kafka-to-log", v1.IntegrationConditionReady), TestTimeoutMedium). + Should(Equal(corev1.ConditionTrue)) + // Verify we are consuming some record (the body is null as the timer is pushing nothing) + g.Eventually(IntegrationLogs(t, ctx, "kafka", "kafka-to-log")).Should(ContainSubstring("Body is null")) + + g.Expect(Kamel(t, ctx, "delete", "kafka-to-log", "-n", "kafka").Execute()).To(Succeed()) + }) + + t.Run("Strimzi KafkaTopic resource", func(t *testing.T) { + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/timer-to-kafkatopic.yaml")) + // Wait for the readiness of the Integration + g.Eventually(IntegrationConditionStatus(t, ctx, "kafka", "timer-to-kafkatopic", v1.IntegrationConditionReady), TestTimeoutMedium). + Should(Equal(corev1.ConditionTrue)) + ExpectExecSucceed(t, g, Kubectl("apply", "-f", "files/kafkatopic-to-log.yaml")) + g.Eventually(IntegrationConditionStatus(t, ctx, "kafka", "kafkatopic-to-log", v1.IntegrationConditionReady), TestTimeoutMedium). + Should(Equal(corev1.ConditionTrue)) + // Verify we are consuming some record (the body is null as the timer is pushing nothing) + g.Eventually(IntegrationLogs(t, ctx, "kafka", "kafkatopic-to-log")).Should(ContainSubstring("Body is null")) + + g.Expect(Kamel(t, ctx, "delete", "kafkatopic-to-log", "-n", "kafka").Execute()).To(Succeed()) + }) + }) +} diff --git a/e2e/kafka/setup/kafka-ephemeral.yaml b/e2e/kafka/setup/kafka-ephemeral.yaml new file mode 100644 index 000000000..458259e4b --- /dev/null +++ b/e2e/kafka/setup/kafka-ephemeral.yaml @@ -0,0 +1,50 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster + namespace: kafka +spec: + kafka: + version: 3.8.0 + replicas: 3 + listeners: + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true + config: + offsets.topic.replication.factor: 3 + transaction.state.log.replication.factor: 3 + transaction.state.log.min.isr: 2 + default.replication.factor: 3 + min.insync.replicas: 2 + inter.broker.protocol.version: "3.8" + storage: + type: ephemeral + zookeeper: + replicas: 3 + storage: + type: ephemeral + entityOperator: + topicOperator: {} + userOperator: {} \ No newline at end of file diff --git a/script/gen_client_strimzi.sh b/e2e/kafka/setup/kafka-topic.yaml old mode 100755 new mode 100644 similarity index 55% rename from script/gen_client_strimzi.sh rename to e2e/kafka/setup/kafka-topic.yaml index 70eceba40..896e43e4c --- a/script/gen_client_strimzi.sh +++ b/e2e/kafka/setup/kafka-topic.yaml @@ -1,5 +1,4 @@ -#!/bin/sh - +# --------------------------------------------------------------------------- # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -7,34 +6,24 @@ # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -set -e - -location=$(dirname $0) -rootdir=$location/.. - -GO111MODULE=on - -echo "Generating Go client code for Strimzi addon..." - -cd $rootdir - -$(go env GOPATH)/bin/client-gen \ - -h script/headers/default.txt \ - --input duck/v1beta2 \ - --input-base=github.com/apache/camel-k/v2/addons/strimzi \ - --output-base=. \ - --output-package=github.com/apache/camel-k/v2/addons/strimzi/duck/client - -rm -r ./addons/strimzi/duck/client || true - -mv github.com/apache/camel-k/v2/addons/strimzi/duck/client ./addons/strimzi/duck/ - -rm -r ./github.com +# --------------------------------------------------------------------------- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: my-topic + namespace: kafka + labels: + strimzi.io/cluster: my-cluster +spec: + partitions: 1 + replicas: 1 + config: + retention.ms: 7200000 + segment.bytes: 1073741824 \ No newline at end of file diff --git a/e2e/kafka/setup/setup.sh b/e2e/kafka/setup/setup.sh new file mode 100755 index 000000000..d14a4c0ce --- /dev/null +++ b/e2e/kafka/setup/setup.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +#### +# +# This script takes care of Strimzi setup as described in https://strimzi.io/quickstarts/ +# +#### + +kubectl create namespace kafka +kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka +kubectl rollout status deployment strimzi-cluster-operator -n kafka --timeout=180s + +#### Setup a Kafka cluster which we'll use for testing +kubectl apply -f ./e2e/kafka/setup/kafka-ephemeral.yaml +kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka + +#### Setup a Kafka topic which we'll use for testing +kubectl apply -f ./e2e/kafka/setup/kafka-topic.yaml +kubectl wait kafkatopic/my-topic --for=condition=Ready --timeout=60s -n kafka diff --git a/addons/strimzi/addtoscheme_strimzi_duck_v1beta2.go b/pkg/apis/addtoscheme_strimzi_duck_v1beta2.go similarity index 81% rename from addons/strimzi/addtoscheme_strimzi_duck_v1beta2.go rename to pkg/apis/addtoscheme_strimzi_duck_v1beta2.go index 5fc640085..17dcc9de5 100644 --- a/addons/strimzi/addtoscheme_strimzi_duck_v1beta2.go +++ b/pkg/apis/addtoscheme_strimzi_duck_v1beta2.go @@ -15,14 +15,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package strimzi +package apis import ( - "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" - "github.com/apache/camel-k/v2/pkg/apis" + "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" ) func init() { // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back - apis.AddToSchemes = append(apis.AddToSchemes, v1beta2.SchemeBuilder.AddToScheme) + AddToSchemes = append(AddToSchemes, v1beta2.SchemeBuilder.AddToScheme) } diff --git a/addons/strimzi/duck/v1beta2/doc.go b/pkg/apis/duck/strimzi/v1beta2/doc.go similarity index 100% rename from addons/strimzi/duck/v1beta2/doc.go rename to pkg/apis/duck/strimzi/v1beta2/doc.go diff --git a/addons/strimzi/duck/v1beta2/duck_types.go b/pkg/apis/duck/strimzi/v1beta2/duck_types.go similarity index 100% rename from addons/strimzi/duck/v1beta2/duck_types.go rename to pkg/apis/duck/strimzi/v1beta2/duck_types.go diff --git a/addons/strimzi/duck/v1beta2/register.go b/pkg/apis/duck/strimzi/v1beta2/register.go similarity index 100% rename from addons/strimzi/duck/v1beta2/register.go rename to pkg/apis/duck/strimzi/v1beta2/register.go diff --git a/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go b/pkg/apis/duck/strimzi/v1beta2/zz_generated.deepcopy.go similarity index 100% rename from addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go rename to pkg/apis/duck/strimzi/v1beta2/zz_generated.deepcopy.go diff --git a/addons/strimzi/duck/client/internalclientset/clientset.go b/pkg/client/duck/strimzi/clientset/internalclientset/clientset.go similarity index 97% rename from addons/strimzi/duck/client/internalclientset/clientset.go rename to pkg/client/duck/strimzi/clientset/internalclientset/clientset.go index 89356372c..cb7601118 100644 --- a/addons/strimzi/duck/client/internalclientset/clientset.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/clientset.go @@ -23,7 +23,7 @@ import ( "fmt" "net/http" - kafkav1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2" + kafkav1beta2 "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2" discovery "k8s.io/client-go/discovery" rest "k8s.io/client-go/rest" flowcontrol "k8s.io/client-go/util/flowcontrol" diff --git a/addons/strimzi/duck/client/internalclientset/fake/clientset_generated.go b/pkg/client/duck/strimzi/clientset/internalclientset/fake/clientset_generated.go similarity index 88% rename from addons/strimzi/duck/client/internalclientset/fake/clientset_generated.go rename to pkg/client/duck/strimzi/clientset/internalclientset/fake/clientset_generated.go index 09946104c..a9821fc89 100644 --- a/addons/strimzi/duck/client/internalclientset/fake/clientset_generated.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/fake/clientset_generated.go @@ -20,9 +20,9 @@ limitations under the License. package fake import ( - clientset "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset" - kafkav1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2" - fakekafkav1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake" + clientset "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset" + kafkav1beta2 "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2" + fakekafkav1beta2 "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" diff --git a/addons/strimzi/duck/client/internalclientset/fake/doc.go b/pkg/client/duck/strimzi/clientset/internalclientset/fake/doc.go similarity index 100% rename from addons/strimzi/duck/client/internalclientset/fake/doc.go rename to pkg/client/duck/strimzi/clientset/internalclientset/fake/doc.go diff --git a/addons/strimzi/duck/client/internalclientset/fake/register.go b/pkg/client/duck/strimzi/clientset/internalclientset/fake/register.go similarity index 96% rename from addons/strimzi/duck/client/internalclientset/fake/register.go rename to pkg/client/duck/strimzi/clientset/internalclientset/fake/register.go index e6ac15499..1795e1629 100644 --- a/addons/strimzi/duck/client/internalclientset/fake/register.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/fake/register.go @@ -20,7 +20,7 @@ limitations under the License. package fake import ( - kafkav1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" + kafkav1beta2 "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" diff --git a/addons/strimzi/duck/client/internalclientset/scheme/doc.go b/pkg/client/duck/strimzi/clientset/internalclientset/scheme/doc.go similarity index 100% rename from addons/strimzi/duck/client/internalclientset/scheme/doc.go rename to pkg/client/duck/strimzi/clientset/internalclientset/scheme/doc.go diff --git a/addons/strimzi/duck/client/internalclientset/scheme/register.go b/pkg/client/duck/strimzi/clientset/internalclientset/scheme/register.go similarity index 96% rename from addons/strimzi/duck/client/internalclientset/scheme/register.go rename to pkg/client/duck/strimzi/clientset/internalclientset/scheme/register.go index 6abeca247..71e7cd0e5 100644 --- a/addons/strimzi/duck/client/internalclientset/scheme/register.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/scheme/register.go @@ -20,7 +20,7 @@ limitations under the License. package scheme import ( - kafkav1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" + kafkav1beta2 "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/doc.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/doc.go similarity index 100% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/doc.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/doc.go diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/doc.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/doc.go similarity index 100% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/doc.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/doc.go diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_kafka.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_kafka.go similarity index 97% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_kafka.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_kafka.go index 5eb5f48a8..4e38dfbce 100644 --- a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_kafka.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_kafka.go @@ -22,7 +22,7 @@ package fake import ( "context" - v1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" + v1beta2 "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" labels "k8s.io/apimachinery/pkg/labels" watch "k8s.io/apimachinery/pkg/watch" diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_kafkatopic.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_kafkatopic.go similarity index 97% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_kafkatopic.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_kafkatopic.go index 12c8f17a5..403246b6c 100644 --- a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_kafkatopic.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_kafkatopic.go @@ -22,7 +22,7 @@ package fake import ( "context" - v1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" + v1beta2 "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" labels "k8s.io/apimachinery/pkg/labels" watch "k8s.io/apimachinery/pkg/watch" diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_duck_client.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_strimzi_client.go similarity index 92% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_duck_client.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_strimzi_client.go index f2d068c00..e4db52dc8 100644 --- a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/fake/fake_duck_client.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/fake/fake_strimzi_client.go @@ -20,7 +20,7 @@ limitations under the License. package fake import ( - v1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2" + v1beta2 "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2" rest "k8s.io/client-go/rest" testing "k8s.io/client-go/testing" ) diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/generated_expansion.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/generated_expansion.go similarity index 100% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/generated_expansion.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/generated_expansion.go diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/kafka.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/kafka.go similarity index 94% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/kafka.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/kafka.go index e4f711608..021f11fbc 100644 --- a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/kafka.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/kafka.go @@ -23,8 +23,8 @@ import ( "context" "time" - scheme "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/scheme" - v1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" + v1beta2 "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" + scheme "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/scheme" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" watch "k8s.io/apimachinery/pkg/watch" rest "k8s.io/client-go/rest" diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/kafkatopic.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/kafkatopic.go similarity index 95% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/kafkatopic.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/kafkatopic.go index 984bc9501..7e2edc8ee 100644 --- a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/kafkatopic.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/kafkatopic.go @@ -23,8 +23,8 @@ import ( "context" "time" - scheme "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/scheme" - v1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" + v1beta2 "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" + scheme "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/scheme" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" watch "k8s.io/apimachinery/pkg/watch" rest "k8s.io/client-go/rest" diff --git a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/duck_client.go b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/strimzi_client.go similarity index 95% rename from addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/duck_client.go rename to pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/strimzi_client.go index 71d1ced2a..f9543801d 100644 --- a/addons/strimzi/duck/client/internalclientset/typed/duck/v1beta2/duck_client.go +++ b/pkg/client/duck/strimzi/clientset/internalclientset/typed/strimzi/v1beta2/strimzi_client.go @@ -22,8 +22,8 @@ package v1beta2 import ( "net/http" - "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/scheme" - v1beta2 "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" + v1beta2 "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" + "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/scheme" rest "k8s.io/client-go/rest" ) diff --git a/pkg/controller/pipe/initialize_test.go b/pkg/controller/pipe/initialize_test.go index 315d899fe..eb9832a7f 100644 --- a/pkg/controller/pipe/initialize_test.go +++ b/pkg/controller/pipe/initialize_test.go @@ -67,7 +67,7 @@ func TestNewPipeError(t *testing.T) { assert.Equal(t, "no ref or URI specified in endpoint", cond.Message) } -func TestNewPipeWithComponentsCreating(t *testing.T) { +func TestNewPipeCamelURIBinding(t *testing.T) { pipe := &v1.Pipe{ TypeMeta: metav1.TypeMeta{ APIVersion: v1.SchemeGroupVersion.String(), @@ -114,7 +114,7 @@ func TestNewPipeWithComponentsCreating(t *testing.T) { assert.Equal(t, "", pipe.Annotations[v1.AnnotationIcon]) } -func TestNewPipeWithKameletsCreating(t *testing.T) { +func TestNewPipeKameletBinding(t *testing.T) { source := v1.NewKamelet("ns", "my-source") source.Annotations = map[string]string{ v1.AnnotationIcon: "my-source-icon-base64", @@ -275,12 +275,222 @@ func TestNewPipeUnsupportedRef(t *testing.T) { handledPipe, err := a.Handle(context.TODO(), pipe) require.Error(t, err) assert.Equal(t, "could not find any suitable binding provider for v1/Service my-svc in namespace ns. "+ - "Bindings available: [\"kamelet\" \"knative-uri\" \"camel-uri\" \"knative-ref\"]", err.Error()) + "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" \"camel-uri\" \"knative-ref\"]", err.Error()) assert.Equal(t, v1.PipePhaseError, handledPipe.Status.Phase) cond := handledPipe.Status.GetCondition(v1.PipeConditionReady) assert.NotNil(t, cond) assert.Equal(t, corev1.ConditionFalse, cond.Status) assert.Equal(t, "IntegrationError", cond.Reason) assert.Equal(t, "could not find any suitable binding provider for v1/Service my-svc in namespace ns. "+ - "Bindings available: [\"kamelet\" \"knative-uri\" \"camel-uri\" \"knative-ref\"]", cond.Message) + "Bindings available: [\"kamelet\" \"knative-uri\" \"strimzi\" \"camel-uri\" \"knative-ref\"]", cond.Message) +} + +func TestNewPipeKnativeURIBinding(t *testing.T) { + pipe := &v1.Pipe{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.PipeKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pipe", + }, + Spec: v1.PipeSpec{ + Sink: v1.Endpoint{ + URI: ptr.To("http://my-knative-uri/"), + }, + Source: v1.Endpoint{ + URI: ptr.To("direct:something"), + }, + }, + } + c, err := test.NewFakeClient(pipe) + require.NoError(t, err) + + a := NewInitializeAction() + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(pipe)) + handledPipe, err := a.Handle(context.TODO(), pipe) + require.NoError(t, err) + assert.Equal(t, v1.PipePhaseCreating, handledPipe.Status.Phase) + // Check integration which should have been created + expectedIT := v1.NewIntegration(pipe.Namespace, pipe.Name) + err = c.Get(context.Background(), ctrl.ObjectKeyFromObject(&expectedIT), &expectedIT) + require.NoError(t, err) + assert.Equal(t, pipe.Name, expectedIT.Name) + assert.Equal(t, v1.IntegrationPhaseNone, expectedIT.Status.Phase) + assert.Equal(t, "Pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelKind]) + assert.Equal(t, "my-pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelName]) + flow, err := json.Marshal(expectedIT.Spec.Flows[0].RawMessage) + require.NoError(t, err) + assert.Equal(t, "{\"route\":{\"from\":{\"steps\":[{\"to\":\"knative:endpoint/sink\"}],\"uri\":\"direct:something\"},\"id\":\"binding\"}}", string(flow)) + assert.Equal(t, + "{\"services\":[{\"type\":\"endpoint\",\"name\":\"sink\",\"url\":\"http://my-knative-uri/\","+ + "\"metadata\":{\"camel.endpoint.kind\":\"sink\",\"knative.apiVersion\":\"\",\"knative.kind\":\"\",\"knative.name\":\"sink\"}}]}", + expectedIT.Spec.Traits.Knative.Configuration, + ) + assert.Equal(t, false, *expectedIT.Spec.Traits.Knative.SinkBinding) +} + +func TestNewPipeKnativeRefBinding(t *testing.T) { + pipe := &v1.Pipe{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.PipeKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pipe", + }, + Spec: v1.PipeSpec{ + Sink: v1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + }, + Source: v1.Endpoint{ + URI: ptr.To("direct:something"), + }, + }, + } + c, err := test.NewFakeClient(pipe) + require.NoError(t, err) + + a := NewInitializeAction() + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(pipe)) + handledPipe, err := a.Handle(context.TODO(), pipe) + require.NoError(t, err) + assert.Equal(t, v1.PipePhaseCreating, handledPipe.Status.Phase) + // Check integration which should have been created + expectedIT := v1.NewIntegration(pipe.Namespace, pipe.Name) + err = c.Get(context.Background(), ctrl.ObjectKeyFromObject(&expectedIT), &expectedIT) + require.NoError(t, err) + assert.Equal(t, pipe.Name, expectedIT.Name) + assert.Equal(t, v1.IntegrationPhaseNone, expectedIT.Status.Phase) + assert.Equal(t, "Pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelKind]) + assert.Equal(t, "my-pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelName]) + flow, err := json.Marshal(expectedIT.Spec.Flows[0].RawMessage) + require.NoError(t, err) + assert.Equal(t, "{\"route\":{\"from\":{\"steps\":[{\"to\":\"knative:event?apiVersion=eventing.knative.dev%2Fv1\\u0026kind=Broker\\u0026name=default\"}],"+ + "\"uri\":\"direct:something\"},\"id\":\"binding\"}}", string(flow)) +} + +func TestNewPipeStrimziKafkaTopicBinding(t *testing.T) { + pipe := &v1.Pipe{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.PipeKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pipe", + }, + Spec: v1.PipeSpec{ + Sink: v1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "KafkaTopic", + Name: "mytopic", + APIVersion: "kafka.strimzi.io/v1beta2", + }, + Properties: asEndpointProperties(map[string]string{ + "brokers": "my-cluster-kafka-bootstrap:9092", + }), + }, + Source: v1.Endpoint{ + URI: ptr.To("direct:something"), + }, + }, + } + c, err := test.NewFakeClient(pipe) + require.NoError(t, err) + + a := NewInitializeAction() + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(pipe)) + handledPipe, err := a.Handle(context.TODO(), pipe) + require.NoError(t, err) + assert.Equal(t, v1.PipePhaseCreating, handledPipe.Status.Phase) + // Check integration which should have been created + expectedIT := v1.NewIntegration(pipe.Namespace, pipe.Name) + err = c.Get(context.Background(), ctrl.ObjectKeyFromObject(&expectedIT), &expectedIT) + require.NoError(t, err) + assert.Equal(t, pipe.Name, expectedIT.Name) + assert.Equal(t, v1.IntegrationPhaseNone, expectedIT.Status.Phase) + assert.Equal(t, "Pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelKind]) + assert.Equal(t, "my-pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelName]) + flow, err := json.Marshal(expectedIT.Spec.Flows[0].RawMessage) + require.NoError(t, err) + assert.Equal(t, "{\"route\":{\"from\":{\"steps\":[{\"to\":\"kafka:mytopic?brokers=my-cluster-kafka-bootstrap%3A9092\"}],"+ + "\"uri\":\"direct:something\"},\"id\":\"binding\"}}", string(flow)) +} + +func TestNewPipeStrimziKafkaBinding(t *testing.T) { + pipe := &v1.Pipe{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.PipeKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pipe", + }, + Spec: v1.PipeSpec{ + Sink: v1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "Kafka", + Name: "my-kafka", + APIVersion: "kafka.strimzi.io/v1beta2", + }, + Properties: asEndpointProperties(map[string]string{ + "topic": "mytopic", + "brokers": "my-cluster-kafka-bootstrap:9092", + }), + }, + Source: v1.Endpoint{ + URI: ptr.To("direct:something"), + }, + }, + } + c, err := test.NewFakeClient(pipe) + require.NoError(t, err) + + a := NewInitializeAction() + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(pipe)) + handledPipe, err := a.Handle(context.TODO(), pipe) + require.NoError(t, err) + assert.Equal(t, v1.PipePhaseCreating, handledPipe.Status.Phase) + // Check integration which should have been created + expectedIT := v1.NewIntegration(pipe.Namespace, pipe.Name) + err = c.Get(context.Background(), ctrl.ObjectKeyFromObject(&expectedIT), &expectedIT) + require.NoError(t, err) + assert.Equal(t, pipe.Name, expectedIT.Name) + assert.Equal(t, v1.IntegrationPhaseNone, expectedIT.Status.Phase) + assert.Equal(t, "Pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelKind]) + assert.Equal(t, "my-pipe", expectedIT.Labels[kubernetes.CamelCreatorLabelName]) + flow, err := json.Marshal(expectedIT.Spec.Flows[0].RawMessage) + require.NoError(t, err) + assert.Equal(t, "{\"route\":{\"from\":{\"steps\":[{\"to\":\"kafka:mytopic?brokers=my-cluster-kafka-bootstrap%3A9092\"}],"+ + "\"uri\":\"direct:something\"},\"id\":\"binding\"}}", string(flow)) +} + +func asEndpointProperties(props map[string]string) *v1.EndpointProperties { + serialized, err := json.Marshal(props) + if err != nil { + panic(err) + } + return &v1.EndpointProperties{ + RawMessage: serialized, + } } diff --git a/pkg/util/bindings/knative_uri.go b/pkg/util/bindings/knative_uri.go index ae5a626db..2727c033a 100644 --- a/pkg/util/bindings/knative_uri.go +++ b/pkg/util/bindings/knative_uri.go @@ -58,7 +58,6 @@ func (k KnativeURIBindingProvider) Translate(ctx BindingContext, endpointCtx End // HTTP/HTTPS uri are translated to Knative endpoints only when used as sinks return nil, nil } - originalURI, err := url.Parse(*e.URI) if err != nil { return nil, err diff --git a/addons/strimzi/strimzi.go b/pkg/util/bindings/strimzi.go similarity index 80% rename from addons/strimzi/strimzi.go rename to pkg/util/bindings/strimzi.go index 623fd5782..6def99c3a 100644 --- a/addons/strimzi/strimzi.go +++ b/pkg/util/bindings/strimzi.go @@ -15,23 +15,26 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package strimzi contains integrations with the Strimzi project for running Apache Kafka on Kubernetes -package strimzi +package bindings import ( "fmt" - "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset" - "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" camelv1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1" - "github.com/apache/camel-k/v2/pkg/util/bindings" + "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" + "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset" "github.com/apache/camel-k/v2/pkg/util/uri" k8serrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" ) +func init() { + RegisterBindingProvider(StrimziBindingProvider{}) + V1alpha1RegisterBindingProvider(V1alpha1StrimziBindingProvider{}) +} + // camelKafka represent the configuration required by Camel Kafka component. type camelKafka struct { topicName string @@ -39,15 +42,15 @@ type camelKafka struct { } // BindingProvider allows to connect to a Kafka topic via Binding. -type BindingProvider struct { +type StrimziBindingProvider struct { Client internalclientset.Interface } -func (s BindingProvider) ID() string { +func (s StrimziBindingProvider) ID() string { return "strimzi" } -func (s BindingProvider) Translate(ctx bindings.BindingContext, _ bindings.EndpointContext, endpoint camelv1.Endpoint) (*bindings.Binding, error) { +func (s StrimziBindingProvider) Translate(ctx BindingContext, _ EndpointContext, endpoint camelv1.Endpoint) (*Binding, error) { if endpoint.Ref == nil { // IMPORTANT: just pass through if this provider cannot manage the binding. Another provider in the chain may take care or it. return nil, nil @@ -68,13 +71,13 @@ func (s BindingProvider) Translate(ctx bindings.BindingContext, _ bindings.Endpo kafkaURI := fmt.Sprintf("kafka:%s", camelKafka.topicName) kafkaURI = uri.AppendParameters(kafkaURI, camelKafka.properties) - return &bindings.Binding{ + return &Binding{ URI: kafkaURI, }, nil } // toCamelKafka serialize an endpoint to a camelKafka struct. -func (s BindingProvider) toCamelKafka(ctx bindings.BindingContext, endpoint camelv1.Endpoint) (*camelKafka, error) { +func (s StrimziBindingProvider) toCamelKafka(ctx BindingContext, endpoint camelv1.Endpoint) (*camelKafka, error) { switch endpoint.Ref.Kind { case v1beta2.StrimziKindKafkaCluster: return s.fromKafkaToCamel(ctx, endpoint) @@ -86,7 +89,7 @@ func (s BindingProvider) toCamelKafka(ctx bindings.BindingContext, endpoint came } // Verify and transform a Kafka resource to Camel Kafka endpoint parameters. -func (s BindingProvider) fromKafkaToCamel(ctx bindings.BindingContext, endpoint camelv1.Endpoint) (*camelKafka, error) { +func (s StrimziBindingProvider) fromKafkaToCamel(ctx BindingContext, endpoint camelv1.Endpoint) (*camelKafka, error) { props, err := endpoint.Properties.GetPropertyMap() if err != nil { return nil, err @@ -119,7 +122,7 @@ func (s BindingProvider) fromKafkaToCamel(ctx bindings.BindingContext, endpoint } // Verify and transform a KafkaTopic resource to Camel Kafka endpoint parameters. -func (s BindingProvider) fromKafkaTopicToCamel(ctx bindings.BindingContext, endpoint camelv1.Endpoint) (*camelKafka, error) { +func (s StrimziBindingProvider) fromKafkaTopicToCamel(ctx BindingContext, endpoint camelv1.Endpoint) (*camelKafka, error) { props, err := endpoint.Properties.GetPropertyMap() if err != nil { return nil, err @@ -142,7 +145,7 @@ func (s BindingProvider) fromKafkaTopicToCamel(ctx bindings.BindingContext, endp }, nil } -func (s BindingProvider) lookupBootstrapServers(ctx bindings.BindingContext, endpoint camelv1.Endpoint) (string, error) { +func (s StrimziBindingProvider) lookupBootstrapServers(ctx BindingContext, endpoint camelv1.Endpoint) (string, error) { // build the client if needed if s.Client == nil { kafkaClient, err := internalclientset.NewForConfig(ctx.Client.GetConfig()) @@ -169,7 +172,7 @@ func (s BindingProvider) lookupBootstrapServers(ctx bindings.BindingContext, end return bootstrapServers, nil } -func (s BindingProvider) getBootstrapServers(ctx bindings.BindingContext, clusterName string) (string, error) { +func (s StrimziBindingProvider) getBootstrapServers(ctx BindingContext, clusterName string) (string, error) { cluster, err := s.Client.KafkaV1beta2().Kafkas(ctx.Namespace).Get(ctx.Ctx, clusterName, v1.GetOptions{}) if err != nil { return "", err @@ -188,7 +191,7 @@ func (s BindingProvider) getBootstrapServers(ctx bindings.BindingContext, cluste return "", fmt.Errorf("cluster %q has no listeners of name %q", clusterName, v1beta2.StrimziListenerNamePlain) } -func (s BindingProvider) lookupTopic(ctx bindings.BindingContext, endpoint camelv1.Endpoint) (*v1beta2.KafkaTopic, error) { +func (s StrimziBindingProvider) lookupTopic(ctx BindingContext, endpoint camelv1.Endpoint) (*v1beta2.KafkaTopic, error) { // first check by KafkaTopic name topic, err := s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx, endpoint.Ref.Name, v1.GetOptions{}) if err != nil && !k8serrors.IsNotFound(err) { @@ -214,25 +217,25 @@ func (s BindingProvider) lookupTopic(ctx bindings.BindingContext, endpoint camel } // Order --. -func (s BindingProvider) Order() int { - return bindings.OrderStandard +func (s StrimziBindingProvider) Order() int { + return OrderStandard } // V1alpha1BindingProvider allows to connect to a Kafka topic via Binding. // Deprecated. -type V1alpha1BindingProvider struct { +type V1alpha1StrimziBindingProvider struct { Client internalclientset.Interface } // ID --. // Deprecated. -func (s V1alpha1BindingProvider) ID() string { +func (s V1alpha1StrimziBindingProvider) ID() string { return "strimzi" } // Translate --. // Deprecated. -func (s V1alpha1BindingProvider) Translate(ctx bindings.V1alpha1BindingContext, _ bindings.V1alpha1EndpointContext, endpoint camelv1alpha1.Endpoint) (*bindings.Binding, error) { +func (s V1alpha1StrimziBindingProvider) Translate(ctx V1alpha1BindingContext, _ V1alpha1EndpointContext, endpoint camelv1alpha1.Endpoint) (*Binding, error) { if endpoint.Ref == nil { // React only on refs return nil, nil @@ -267,14 +270,14 @@ func (s V1alpha1BindingProvider) Translate(ctx bindings.V1alpha1BindingContext, kafkaURI := fmt.Sprintf("kafka:%s", endpoint.Ref.Name) kafkaURI = uri.AppendParameters(kafkaURI, props) - return &bindings.Binding{ + return &Binding{ URI: kafkaURI, }, nil } // getBootstrapServers --. // Deprecated. -func (s V1alpha1BindingProvider) getBootstrapServers(ctx bindings.V1alpha1BindingContext, clusterName string) (string, error) { +func (s V1alpha1StrimziBindingProvider) getBootstrapServers(ctx V1alpha1BindingContext, clusterName string) (string, error) { cluster, err := s.Client.KafkaV1beta2().Kafkas(ctx.Namespace).Get(ctx.Ctx, clusterName, v1.GetOptions{}) if err != nil { return "", err @@ -295,11 +298,11 @@ func (s V1alpha1BindingProvider) getBootstrapServers(ctx bindings.V1alpha1Bindin // Order --. // Deprecated. -func (s V1alpha1BindingProvider) Order() int { - return bindings.OrderStandard +func (s V1alpha1StrimziBindingProvider) Order() int { + return OrderStandard } -func (s V1alpha1BindingProvider) lookupBootstrapServers(ctx bindings.V1alpha1BindingContext, endpoint camelv1alpha1.Endpoint) (string, error) { +func (s V1alpha1StrimziBindingProvider) lookupBootstrapServers(ctx V1alpha1BindingContext, endpoint camelv1alpha1.Endpoint) (string, error) { // build the client if needed if s.Client == nil { kafkaClient, err := internalclientset.NewForConfig(ctx.Client.GetConfig()) diff --git a/addons/strimzi/strimzi_test.go b/pkg/util/bindings/strimzi_test.go similarity index 84% rename from addons/strimzi/strimzi_test.go rename to pkg/util/bindings/strimzi_test.go index d3e8a4d98..7730a2781 100644 --- a/addons/strimzi/strimzi_test.go +++ b/pkg/util/bindings/strimzi_test.go @@ -15,18 +15,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package strimzi +package bindings import ( "context" - "encoding/json" "testing" - "github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset/fake" - "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2" camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/apis/duck/strimzi/v1beta2" + "github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset/internalclientset/fake" - "github.com/apache/camel-k/v2/pkg/util/bindings" "github.com/apache/camel-k/v2/pkg/util/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,7 +39,7 @@ func TestStrimziDirect(t *testing.T) { client, err := test.NewFakeClient() require.NoError(t, err) - bindingContext := bindings.BindingContext{ + bindingContext := BindingContext{ Ctx: ctx, Client: client, Namespace: "test", @@ -59,7 +57,7 @@ func TestStrimziDirect(t *testing.T) { }), } - binding, err := BindingProvider{}.Translate(bindingContext, bindings.EndpointContext{ + binding, err := StrimziBindingProvider{}.Translate(bindingContext, EndpointContext{ Type: camelv1.EndpointTypeSink, }, endpoint) require.NoError(t, err) @@ -101,11 +99,11 @@ func TestStrimziLookup(t *testing.T) { } client := fake.NewSimpleClientset(&cluster, &topic) - provider := BindingProvider{ + provider := StrimziBindingProvider{ Client: client, } - bindingContext := bindings.BindingContext{ + bindingContext := BindingContext{ Ctx: ctx, Namespace: "test", Profile: camelv1.TraitProfileKubernetes, @@ -119,7 +117,7 @@ func TestStrimziLookup(t *testing.T) { }, } - binding, err := provider.Translate(bindingContext, bindings.EndpointContext{ + binding, err := provider.Translate(bindingContext, EndpointContext{ Type: camelv1.EndpointTypeSink, }, endpoint) require.NoError(t, err) @@ -128,16 +126,6 @@ func TestStrimziLookup(t *testing.T) { assert.Equal(t, camelv1.Traits{}, binding.Traits) } -func asEndpointProperties(props map[string]string) *camelv1.EndpointProperties { - serialized, err := json.Marshal(props) - if err != nil { - panic(err) - } - return &camelv1.EndpointProperties{ - RawMessage: serialized, - } -} - func TestStrimziLookupByTopicName(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -174,11 +162,11 @@ func TestStrimziLookupByTopicName(t *testing.T) { } client := fake.NewSimpleClientset(&cluster, &topic) - provider := BindingProvider{ + provider := StrimziBindingProvider{ Client: client, } - bindingContext := bindings.BindingContext{ + bindingContext := BindingContext{ Ctx: ctx, Namespace: "test", Profile: camelv1.TraitProfileKubernetes, @@ -192,7 +180,7 @@ func TestStrimziLookupByTopicName(t *testing.T) { }, } - binding, err := provider.Translate(bindingContext, bindings.EndpointContext{ + binding, err := provider.Translate(bindingContext, EndpointContext{ Type: camelv1.EndpointTypeSink, }, endpoint) require.NoError(t, err) @@ -224,11 +212,11 @@ func TestStrimziKafkaCR(t *testing.T) { } client := fake.NewSimpleClientset(&cluster) - provider := BindingProvider{ + provider := StrimziBindingProvider{ Client: client, } - bindingContext := bindings.BindingContext{ + bindingContext := BindingContext{ Ctx: ctx, Namespace: "test", Profile: camelv1.TraitProfileKubernetes, @@ -245,7 +233,7 @@ func TestStrimziKafkaCR(t *testing.T) { }), } - binding, err := provider.Translate(bindingContext, bindings.EndpointContext{ + binding, err := provider.Translate(bindingContext, EndpointContext{ Type: camelv1.EndpointTypeSink, }, endpoint) require.NoError(t, err) @@ -277,11 +265,11 @@ func TestStrimziPassThrough(t *testing.T) { } client := fake.NewSimpleClientset(&cluster) - provider := BindingProvider{ + provider := StrimziBindingProvider{ Client: client, } - bindingContext := bindings.BindingContext{ + bindingContext := BindingContext{ Ctx: ctx, Namespace: "test", Profile: camelv1.TraitProfileKubernetes, @@ -295,7 +283,7 @@ func TestStrimziPassThrough(t *testing.T) { }, } - binding, err := provider.Translate(bindingContext, bindings.EndpointContext{ + binding, err := provider.Translate(bindingContext, EndpointContext{ Type: camelv1.EndpointTypeSink, }, endpoint) require.NoError(t, err) diff --git a/script/Makefile b/script/Makefile index 7f6be3c4b..1820541ce 100644 --- a/script/Makefile +++ b/script/Makefile @@ -217,8 +217,6 @@ generate: codegen-tools-install ./script/gen_doc.sh cd pkg/apis/camel && $(CONTROLLER_GEN) paths="./..." object cd addons/keda/duck && $(CONTROLLER_GEN) paths="./..." object - cd addons/strimzi/duck && $(CONTROLLER_GEN) paths="./..." object - ./script/gen_client_strimzi.sh codegen-tools-install: controller-gen @# We must force the installation to make sure we are using the correct version @@ -260,12 +258,6 @@ else @echo "####### Skipping unit test..." endif -# -# Setup the Knative test environment -# -setup-knative: - ./e2e/knative/files/setup.sh - # # Common tests that do not require any customized operator setting. They can leverage a unique namespaced operator installation to reduce # the time to complete (they are used also as smoke test for nightly release) @@ -321,6 +313,14 @@ test-knative: go test -timeout 60m -v ./e2e/knative -tags=integration $(GOTESTFMT) || ((FAILED++)); \ exit $${FAILED} +# +# Kafka tests that require the presence of Strimzi operator configuration +# +test-kafka: + FAILED=0; \ + go test -timeout 20m -v ./e2e/kafka -tags=integration $(GOTESTFMT) || ((FAILED++)); \ + exit $${FAILED} + # # Telemetry tests that require the configuration of telemetry endpoints # diff --git a/script/gen_client.sh b/script/gen_client.sh index 9389ff0b9..3950646a3 100755 --- a/script/gen_client.sh +++ b/script/gen_client.sh @@ -39,6 +39,13 @@ $(go env GOPATH)/bin/client-gen \ --output-base=. \ --output-package=github.com/apache/camel-k/v2/pkg/client/camel/clientset +$(go env GOPATH)/bin/client-gen \ + --input strimzi/v1beta2 \ + --go-header-file=../../../script/headers/default.txt \ + --input-base=github.com/apache/camel-k/v2/pkg/apis/duck \ + --output-base=. \ + --output-package=github.com/apache/camel-k/v2/pkg/client/duck/strimzi/clientset + $(go env GOPATH)/bin/lister-gen \ --input-dirs=github.com/apache/camel-k/v2/pkg/apis/camel/v1,github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1 \ --go-header-file=../../../script/headers/default.txt \ @@ -53,10 +60,10 @@ $(go env GOPATH)/bin/informer-gen \ --output-base=. \ --output-package=github.com/apache/camel-k/v2/pkg/client/camel/informers - # hack to fix non go-module compliance rm -rf ./clientset rm -rf ./informers rm -rf ./listers cp -R ./github.com/apache/camel-k/v2/pkg/client/camel/* . +cp -R ./github.com/apache/camel-k/v2/pkg/client/* .. rm -rf ./github.com