This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push: new 1d89a636653 Remove test that is pain 1d89a636653 is described below commit 1d89a63665360069533a1e20d183201b5b16bbfe Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jun 2 20:06:09 2025 +0200 Remove test that is pain --- .../integration/KafkaConsumerIdempotentIT.java | 95 ---------------------- .../KafkaConsumerIdempotentTestSupport.java | 66 --------------- ...kaConsumerIdempotentWithCustomSerializerIT.java | 91 --------------------- .../KafkaConsumerIdempotentWithProcessorIT.java | 95 ---------------------- 4 files changed, 347 deletions(-) diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java deleted file mode 100644 index 92b1d73a392..00000000000 --- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka.integration; - -import java.util.Arrays; -import org.apache.camel.EndpointInject; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; -import org.apache.camel.spring.boot.CamelAutoConfiguration; -import org.apache.camel.test.spring.junit5.CamelSpringBootTest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.annotation.DirtiesContext; - -import static org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader; - -@EnabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "true") -@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) -@CamelSpringBootTest -@SpringBootTest(classes = { CamelAutoConfiguration.class, BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class, - KafkaConsumerIdempotentIT.class, KafkaConsumerIdempotentIT.TestConfiguration.class, }) -@DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", disabledReason = "Disabled on GH Action due to Docker limit") -public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSupport { - - public static final String TOPIC = "idempt"; - - private final String from = "kafka:" + TOPIC + "?groupId=group2&autoOffsetReset=earliest" - + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true" - + "&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor"; - - @EndpointInject("mock:result") - private MockEndpoint to; - - private int size = 200; - - @BeforeEach - public void before() { - kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); - doSend(size, TOPIC); - } - - @AfterEach - public void after() { - kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); - } - - @Test - @DisplayName("Numeric headers is consumable when using idempotent (CAMEL-16914)") - public void kafkaIdempotentMessageIsConsumedByCamel() throws InterruptedException { - doRun(to, size); - } - - @Configuration - public class TestConfiguration { - @Bean - public RouteBuilder routeBuilder() { - return new RouteBuilder() { - @Override - public void configure() { - from(from).routeId("foo").idempotentConsumer(numericHeader("id")) - .idempotentRepository("kafkaIdempotentRepository").to(to); - } - }; - } - - @Bean("kafkaIdempotentRepository") - public KafkaIdempotentRepository createKafkaIdempotentRepository() { - return new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers()); - } - } -} diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java deleted file mode 100644 index 60beb0ab897..00000000000 --- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.kafka.integration; - -import java.math.BigInteger; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.apache.camel.Exchange; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.internals.RecordHeader; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public abstract class KafkaConsumerIdempotentTestSupport extends BaseEmbeddedKafkaTestSupport { - - protected void doSend(int size, String topic) { - Properties props = getDefaultProperties(); - org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>( - props); - - try { - for (int k = 0; k < size; k++) { - String msg = "message-" + k; - ProducerRecord<String, String> data = new ProducerRecord<>(topic, String.valueOf(k), msg); - - data.headers().add(new RecordHeader("id", BigInteger.valueOf(k).toByteArray())); - producer.send(data); - } - } finally { - if (producer != null) { - producer.close(); - } - } - } - - protected void doRun(MockEndpoint mockEndpoint, int size) throws InterruptedException { - mockEndpoint.expectedMessageCount(size); - - List<Exchange> exchangeList = mockEndpoint.getReceivedExchanges(); - - mockEndpoint.assertIsSatisfied(10000); - mockEndpoint.setResultWaitTime(20_000); - assertEquals(size, exchangeList.size()); - - Map<String, Object> headers = mockEndpoint.getExchanges().get(0).getIn().getHeaders(); - assertTrue(headers.containsKey("id"), "0"); - } -} diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java deleted file mode 100644 index c09b1e56364..00000000000 --- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka.integration; - -import java.util.Arrays; -import org.apache.camel.EndpointInject; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; -import org.apache.camel.spring.boot.CamelAutoConfiguration; -import org.apache.camel.test.spring.junit5.CamelSpringBootTest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.annotation.DirtiesContext; - -@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) -@CamelSpringBootTest -@SpringBootTest(classes = { CamelAutoConfiguration.class, BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class, - KafkaConsumerIdempotentWithCustomSerializerIT.class, - KafkaConsumerIdempotentWithCustomSerializerIT.TestConfiguration.class, }) -@DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", disabledReason = "Disabled on GH Action due to Docker limit") -public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumerIdempotentTestSupport { - - public static final String TOPIC = "idempt2"; - - private final String from = "kafka:" + TOPIC + "?groupId=group2&autoOffsetReset=earliest" - + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&headerDeserializer=#class:org.apache.camel.component.kafka.integration.CustomHeaderDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true" - + "&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor"; - - @EndpointInject("mock:result") - private MockEndpoint to; - - private int size = 200; - - @BeforeEach - public void before() { - kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); - doSend(size, TOPIC); - } - - @AfterEach - public void after() { - kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); - } - - @Test - public void kafkaMessageIsConsumedByCamel() throws InterruptedException { - doRun(to, size); - } - - @Configuration - public class TestConfiguration { - @Bean - public RouteBuilder routeBuilder() { - return new RouteBuilder() { - @Override - public void configure() { - from(from).routeId("foo").idempotentConsumer(header("id")) - .idempotentRepository("kafkaIdempotentRepository").to(to); - } - }; - } - - @Bean("kafkaIdempotentRepository") - public KafkaIdempotentRepository createKafkaIdempotentRepository() { - return new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers()); - } - } -} diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java deleted file mode 100644 index 76de624fa5e..00000000000 --- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka.integration; - -import java.math.BigInteger; -import java.util.Arrays; -import org.apache.camel.EndpointInject; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; -import org.apache.camel.spring.boot.CamelAutoConfiguration; -import org.apache.camel.test.spring.junit5.CamelSpringBootTest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.annotation.DirtiesContext; - -@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) -@CamelSpringBootTest -@SpringBootTest(classes = { CamelAutoConfiguration.class, BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class, - KafkaConsumerIdempotentWithProcessorIT.class, KafkaConsumerIdempotentWithProcessorIT.TestConfiguration.class, }) -@DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", disabledReason = "Disabled on GH Action due to Docker limit") -public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempotentTestSupport { - public static final String TOPIC = "testidemp3"; - - private final String from = "kafka:" + TOPIC + "?groupId=group2&autoOffsetReset=earliest" - + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true" - + "&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor"; - - @EndpointInject("mock:resulti") - private MockEndpoint to; - - private int size = 200; - - @BeforeEach - public void before() { - kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); - doSend(size, TOPIC); - } - - @AfterEach - public void after() { - // clean all test topics - kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC, "TEST_IDEMPOTENT")).all(); - } - - @Test - public void kafkaMessageIsConsumedByCamel() throws InterruptedException { - doRun(to, size); - } - - @Configuration - public class TestConfiguration { - @Bean - public RouteBuilder routeBuilder() { - return new RouteBuilder() { - @Override - public void configure() { - from(from).routeId("idemp-with-prop").process(exchange -> { - byte[] id = exchange.getIn().getHeader("id", byte[].class); - - BigInteger bi = new BigInteger(id); - - exchange.getIn().setHeader("id", String.valueOf(bi.longValue())); - }).idempotentConsumer(header("id")).idempotentRepository("kafkaIdempotentRepository").to(to); - } - }; - } - - @Bean("kafkaIdempotentRepository") - public KafkaIdempotentRepository createKafkaIdempotentRepository() { - return new KafkaIdempotentRepository("TEST_IDEMPOTENT", getBootstrapServers()); - } - } -}