This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push: new 5e5de7afc90 CAMEL-20864: camel-kafka - Add option to producer for useIterator so you can force send message as a single kafka record. (#14492) 5e5de7afc90 is described below commit 5e5de7afc900fdd027271f68218a3839674c1226 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jun 12 14:21:53 2024 +0200 CAMEL-20864: camel-kafka - Add option to producer for useIterator so you can force send message as a single kafka record. (#14492) --- .../org/apache/camel/catalog/components/kafka.json | 2 + .../component/kafka/KafkaComponentConfigurer.java | 6 ++ .../component/kafka/KafkaEndpointConfigurer.java | 6 ++ .../component/kafka/KafkaEndpointUriFactory.java | 3 +- .../org/apache/camel/component/kafka/kafka.json | 2 + .../camel/component/kafka/KafkaConfiguration.java | 14 ++++ .../camel/component/kafka/KafkaProducer.java | 5 +- .../KafkaProducerUseIteratorFalseIT.java | 83 ++++++++++++++++++++++ .../integration/KafkaProducerUseIteratorIT.java | 83 ++++++++++++++++++++++ .../dsl/KafkaComponentBuilderFactory.java | 18 +++++ .../endpoint/dsl/KafkaEndpointBuilderFactory.java | 35 +++++++++ 11 files changed, 253 insertions(+), 4 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index 1b6ac7fce5b..d1b764b5713 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -96,6 +96,7 @@ "retries": { "kind": "property", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient err [...] "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a n [...] "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, + "useIterator": { "kind": "property", "displayName": "Use Iterator", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether sending to kafka should send the message body as a single record, or use a java.util.Iterato [...] "valueSerializer": { "kind": "property", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, "workerPool": { "kind": "property", "displayName": "Worker Pool", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the mess [...] "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...] @@ -217,6 +218,7 @@ "retries": { "kind": "parameter", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient er [...] "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a [...] "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, + "useIterator": { "kind": "parameter", "displayName": "Use Iterator", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether sending to kafka should send the message body as a single record, or use a java.util.Iterat [...] "valueSerializer": { "kind": "parameter", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, "workerPool": { "kind": "parameter", "displayName": "Worker Pool", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the mes [...] "workerPoolCoreSize": { "kind": "parameter", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...] diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index 8fc0bf484fc..86a94e15242 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -228,6 +228,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "topicIsPattern": getOrCreateConfiguration(target).setTopicIsPattern(property(camelContext, boolean.class, value)); return true; case "useglobalsslcontextparameters": case "useGlobalSslContextParameters": target.setUseGlobalSslContextParameters(property(camelContext, boolean.class, value)); return true; + case "useiterator": + case "useIterator": getOrCreateConfiguration(target).setUseIterator(property(camelContext, boolean.class, value)); return true; case "valuedeserializer": case "valueDeserializer": getOrCreateConfiguration(target).setValueDeserializer(property(camelContext, java.lang.String.class, value)); return true; case "valueserializer": @@ -450,6 +452,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "topicIsPattern": return boolean.class; case "useglobalsslcontextparameters": case "useGlobalSslContextParameters": return boolean.class; + case "useiterator": + case "useIterator": return boolean.class; case "valuedeserializer": case "valueDeserializer": return java.lang.String.class; case "valueserializer": @@ -668,6 +672,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "topicIsPattern": return getOrCreateConfiguration(target).isTopicIsPattern(); case "useglobalsslcontextparameters": case "useGlobalSslContextParameters": return target.isUseGlobalSslContextParameters(); + case "useiterator": + case "useIterator": return getOrCreateConfiguration(target).isUseIterator(); case "valuedeserializer": case "valueDeserializer": return getOrCreateConfiguration(target).getValueDeserializer(); case "valueserializer": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java index aa76beb09f8..9011564e733 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java @@ -210,6 +210,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "synchronous": target.getConfiguration().setSynchronous(property(camelContext, boolean.class, value)); return true; case "topicispattern": case "topicIsPattern": target.getConfiguration().setTopicIsPattern(property(camelContext, boolean.class, value)); return true; + case "useiterator": + case "useIterator": target.getConfiguration().setUseIterator(property(camelContext, boolean.class, value)); return true; case "valuedeserializer": case "valueDeserializer": target.getConfiguration().setValueDeserializer(property(camelContext, java.lang.String.class, value)); return true; case "valueserializer": @@ -416,6 +418,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "synchronous": return boolean.class; case "topicispattern": case "topicIsPattern": return boolean.class; + case "useiterator": + case "useIterator": return boolean.class; case "valuedeserializer": case "valueDeserializer": return java.lang.String.class; case "valueserializer": @@ -623,6 +627,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "synchronous": return target.getConfiguration().isSynchronous(); case "topicispattern": case "topicIsPattern": return target.getConfiguration().isTopicIsPattern(); + case "useiterator": + case "useIterator": return target.getConfiguration().isUseIterator(); case "valuedeserializer": case "valueDeserializer": return target.getConfiguration().getValueDeserializer(); case "valueserializer": diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java index 6a70d0dc1da..87b0b5ab1cd 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java @@ -21,7 +21,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component. private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(103); + Set<String> props = new HashSet<>(104); props.add("additionalProperties"); props.add("allowManualCommit"); props.add("autoCommitEnable"); @@ -120,6 +120,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component. props.add("synchronous"); props.add("topic"); props.add("topicIsPattern"); + props.add("useIterator"); props.add("valueDeserializer"); props.add("valueSerializer"); props.add("workerPool"); diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 1b6ac7fce5b..d1b764b5713 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -96,6 +96,7 @@ "retries": { "kind": "property", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient err [...] "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a n [...] "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, + "useIterator": { "kind": "property", "displayName": "Use Iterator", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether sending to kafka should send the message body as a single record, or use a java.util.Iterato [...] "valueSerializer": { "kind": "property", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, "workerPool": { "kind": "property", "displayName": "Worker Pool", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the mess [...] "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...] @@ -217,6 +218,7 @@ "retries": { "kind": "parameter", "displayName": "Retries", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient er [...] "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Before each retry, the producer refreshes the metadata of relevant topics to see if a [...] "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer Bytes", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "131072", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Socket write buffer size" }, + "useIterator": { "kind": "parameter", "displayName": "Use Iterator", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets whether sending to kafka should send the message body as a single record, or use a java.util.Iterat [...] "valueSerializer": { "kind": "parameter", "displayName": "Value Serializer", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringSerializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The serializer class for messages." }, "workerPool": { "kind": "parameter", "displayName": "Worker Pool", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the mes [...] "workerPoolCoreSize": { "kind": "parameter", "displayName": "Worker Pool Core Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Number of core threads for the worker pool for continue routing Exchange after [...] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 38a4687c2f4..6b84414c7bb 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -174,6 +174,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware private String key; @UriParam(label = "producer") private Integer partitionKey; + @UriParam(label = "producer", defaultValue = "true") + private boolean useIterator = true; @UriParam(label = "producer", enums = "all,-1,0,1", defaultValue = "all") private String requestRequiredAcks = "all"; // buffer.memory @@ -1311,6 +1313,18 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware this.partitionKey = partitionKey; } + public boolean isUseIterator() { + return useIterator; + } + + /** + * Sets whether sending to kafka should send the message body as a single record, or use a java.util.Iterator to + * send multiple records to kafka (if the message body can be iterated). + */ + public void setUseIterator(boolean useIterator) { + this.useIterator = useIterator; + } + public String getRequestRequiredAcks() { return requestRequiredAcks; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index d1dcaa30bbe..524d8e50254 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -391,7 +391,7 @@ public class KafkaProducer extends DefaultAsyncProducer { startKafkaTransaction(exchange); } - if (isIterable(message.getBody())) { + if (endpoint.getConfiguration().isUseIterator() && isIterable(message.getBody())) { processIterableSync(exchange, message); } else { processSingleMessageSync(exchange, message); @@ -470,11 +470,10 @@ public class KafkaProducer extends DefaultAsyncProducer { try { // is the message body a list or something that contains multiple values - if (isIterable(body)) { + if (endpoint.getConfiguration().isUseIterator() && isIterable(body)) { processIterableAsync(exchange, producerCallBack, message); } else { final ProducerRecord<Object, Object> record = createRecord(exchange, message); - doSend(exchange, record, producerCallBack); } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorFalseIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorFalseIT.java new file mode 100644 index 00000000000..20ab059a1ce --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorFalseIT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.StreamSupport; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class KafkaProducerUseIteratorFalseIT extends BaseEmbeddedKafkaTestSupport { + + private static final String TOPIC = "use-iterator-false"; + + private static final String FROM_URI = "kafka:" + TOPIC + + "?groupId=KafkaProducerUseIteratorFalseIT&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor"; + + @BeforeEach + public void init() { + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + } + + @Test + public void testUseIteratorFalse() throws Exception { + List<String> body = new ArrayList<>(); + body.add("first"); + body.add("second"); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived(body.toString()); + + template.sendBody("direct:start", body); + + mock.assertIsSatisfied(5000); + + assertEquals(1, MockConsumerInterceptor.recordsCaptured.stream() + .flatMap(i -> StreamSupport.stream(i.records(TOPIC).spliterator(), false)).count()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("kafka:" + TOPIC + "?groupId=KafkaProducerUseIteratorFalseIT&useIterator=false"); + + from(FROM_URI) + .to("mock:result"); + } + }; + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorIT.java new file mode 100644 index 00000000000..fcd1763eccf --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorIT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.StreamSupport; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class KafkaProducerUseIteratorIT extends BaseEmbeddedKafkaTestSupport { + + private static final String TOPIC = "use-iterator"; + + private static final String FROM_URI = "kafka:" + TOPIC + + "?groupId=KafkaProducerUseIteratorIT&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor"; + + @BeforeEach + public void init() { + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + } + + @Test + public void testUseIteratorTrue() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceivedInAnyOrder("first", "second"); + + List<String> body = new ArrayList<>(); + body.add("first"); + body.add("second"); + + template.sendBody("direct:start", body); + + mock.assertIsSatisfied(5000); + + assertEquals(2, MockConsumerInterceptor.recordsCaptured.stream() + .flatMap(i -> StreamSupport.stream(i.records(TOPIC).spliterator(), false)).count()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("kafka:" + TOPIC + "?groupId=KafkaProducerUseIteratorIT"); + + from(FROM_URI) + .to("mock:result"); + } + }; + } + +} diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index 81e0bb08f4c..bb9c2cec8ab 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -1509,6 +1509,23 @@ public interface KafkaComponentBuilderFactory { doSetProperty("sendBufferBytes", sendBufferBytes); return this; } + /** + * Sets whether sending to kafka should send the message body as a + * single record, or use a java.util.Iterator to send multiple records + * to kafka (if the message body can be iterated). + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: producer + * + * @param useIterator the value to set + * @return the dsl builder + */ + default KafkaComponentBuilder useIterator(boolean useIterator) { + doSetProperty("useIterator", useIterator); + return this; + } /** * The serializer class for messages. * @@ -2215,6 +2232,7 @@ public interface KafkaComponentBuilderFactory { case "retries": getOrCreateConfiguration((KafkaComponent) component).setRetries((java.lang.Integer) value); return true; case "retryBackoffMs": getOrCreateConfiguration((KafkaComponent) component).setRetryBackoffMs((java.lang.Integer) value); return true; case "sendBufferBytes": getOrCreateConfiguration((KafkaComponent) component).setSendBufferBytes((java.lang.Integer) value); return true; + case "useIterator": getOrCreateConfiguration((KafkaComponent) component).setUseIterator((boolean) value); return true; case "valueSerializer": getOrCreateConfiguration((KafkaComponent) component).setValueSerializer((java.lang.String) value); return true; case "workerPool": getOrCreateConfiguration((KafkaComponent) component).setWorkerPool((java.util.concurrent.ExecutorService) value); return true; case "workerPoolCoreSize": getOrCreateConfiguration((KafkaComponent) component).setWorkerPoolCoreSize((java.lang.Integer) value); return true; diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index 57410c412e8..0f045544d4d 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -3330,6 +3330,41 @@ public interface KafkaEndpointBuilderFactory { doSetProperty("sendBufferBytes", sendBufferBytes); return this; } + /** + * Sets whether sending to kafka should send the message body as a + * single record, or use a java.util.Iterator to send multiple records + * to kafka (if the message body can be iterated). + * + * The option is a: <code>boolean</code> type. + * + * Default: true + * Group: producer + * + * @param useIterator the value to set + * @return the dsl builder + */ + default KafkaEndpointProducerBuilder useIterator(boolean useIterator) { + doSetProperty("useIterator", useIterator); + return this; + } + /** + * Sets whether sending to kafka should send the message body as a + * single record, or use a java.util.Iterator to send multiple records + * to kafka (if the message body can be iterated). + * + * The option will be converted to a <code>boolean</code> + * type. + * + * Default: true + * Group: producer + * + * @param useIterator the value to set + * @return the dsl builder + */ + default KafkaEndpointProducerBuilder useIterator(String useIterator) { + doSetProperty("useIterator", useIterator); + return this; + } /** * The serializer class for messages. *