This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.21.x by this push:
     new 5e5de7afc90 CAMEL-20864: camel-kafka - Add option to producer for 
useIterator so you can force send message as a single kafka record. (#14492)
5e5de7afc90 is described below

commit 5e5de7afc900fdd027271f68218a3839674c1226
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Jun 12 14:21:53 2024 +0200

    CAMEL-20864: camel-kafka - Add option to producer for useIterator so you 
can force send message as a single kafka record. (#14492)
---
 .../org/apache/camel/catalog/components/kafka.json |  2 +
 .../component/kafka/KafkaComponentConfigurer.java  |  6 ++
 .../component/kafka/KafkaEndpointConfigurer.java   |  6 ++
 .../component/kafka/KafkaEndpointUriFactory.java   |  3 +-
 .../org/apache/camel/component/kafka/kafka.json    |  2 +
 .../camel/component/kafka/KafkaConfiguration.java  | 14 ++++
 .../camel/component/kafka/KafkaProducer.java       |  5 +-
 .../KafkaProducerUseIteratorFalseIT.java           | 83 ++++++++++++++++++++++
 .../integration/KafkaProducerUseIteratorIT.java    | 83 ++++++++++++++++++++++
 .../dsl/KafkaComponentBuilderFactory.java          | 18 +++++
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  | 35 +++++++++
 11 files changed, 253 insertions(+), 4 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 1b6ac7fce5b..d1b764b5713 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -96,6 +96,7 @@
     "retries": { "kind": "property", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
err [...]
     "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", 
"group": "producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a n [...]
     "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
+    "useIterator": { "kind": "property", "displayName": "Use Iterator", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Sets whether sending to kafka should send the 
message body as a single record, or use a java.util.Iterato [...]
     "valueSerializer": { "kind": "property", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
     "workerPool": { "kind": "property", "displayName": "Worker Pool", "group": 
"producer", "label": "producer", "required": false, "type": "object", 
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom worker pool for continue 
routing Exchange after kafka server has acknowledge the mess [...]
     "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool 
Core Size", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "10", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Number of core threads 
for the worker pool for continue routing Exchange after  [...]
@@ -217,6 +218,7 @@
     "retries": { "kind": "parameter", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
er [...]
     "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a  [...]
     "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
+    "useIterator": { "kind": "parameter", "displayName": "Use Iterator", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Sets whether sending to kafka should send the 
message body as a single record, or use a java.util.Iterat [...]
     "valueSerializer": { "kind": "parameter", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
     "workerPool": { "kind": "parameter", "displayName": "Worker Pool", 
"group": "producer", "label": "producer", "required": false, "type": "object", 
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom worker pool for continue 
routing Exchange after kafka server has acknowledge the mes [...]
     "workerPoolCoreSize": { "kind": "parameter", "displayName": "Worker Pool 
Core Size", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "10", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Number of core threads 
for the worker pool for continue routing Exchange after [...]
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 8fc0bf484fc..86a94e15242 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -228,6 +228,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "topicIsPattern": 
getOrCreateConfiguration(target).setTopicIsPattern(property(camelContext, 
boolean.class, value)); return true;
         case "useglobalsslcontextparameters":
         case "useGlobalSslContextParameters": 
target.setUseGlobalSslContextParameters(property(camelContext, boolean.class, 
value)); return true;
+        case "useiterator":
+        case "useIterator": 
getOrCreateConfiguration(target).setUseIterator(property(camelContext, 
boolean.class, value)); return true;
         case "valuedeserializer":
         case "valueDeserializer": 
getOrCreateConfiguration(target).setValueDeserializer(property(camelContext, 
java.lang.String.class, value)); return true;
         case "valueserializer":
@@ -450,6 +452,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "topicIsPattern": return boolean.class;
         case "useglobalsslcontextparameters":
         case "useGlobalSslContextParameters": return boolean.class;
+        case "useiterator":
+        case "useIterator": return boolean.class;
         case "valuedeserializer":
         case "valueDeserializer": return java.lang.String.class;
         case "valueserializer":
@@ -668,6 +672,8 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "topicIsPattern": return 
getOrCreateConfiguration(target).isTopicIsPattern();
         case "useglobalsslcontextparameters":
         case "useGlobalSslContextParameters": return 
target.isUseGlobalSslContextParameters();
+        case "useiterator":
+        case "useIterator": return 
getOrCreateConfiguration(target).isUseIterator();
         case "valuedeserializer":
         case "valueDeserializer": return 
getOrCreateConfiguration(target).getValueDeserializer();
         case "valueserializer":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index aa76beb09f8..9011564e733 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -210,6 +210,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "synchronous": 
target.getConfiguration().setSynchronous(property(camelContext, boolean.class, 
value)); return true;
         case "topicispattern":
         case "topicIsPattern": 
target.getConfiguration().setTopicIsPattern(property(camelContext, 
boolean.class, value)); return true;
+        case "useiterator":
+        case "useIterator": 
target.getConfiguration().setUseIterator(property(camelContext, boolean.class, 
value)); return true;
         case "valuedeserializer":
         case "valueDeserializer": 
target.getConfiguration().setValueDeserializer(property(camelContext, 
java.lang.String.class, value)); return true;
         case "valueserializer":
@@ -416,6 +418,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "synchronous": return boolean.class;
         case "topicispattern":
         case "topicIsPattern": return boolean.class;
+        case "useiterator":
+        case "useIterator": return boolean.class;
         case "valuedeserializer":
         case "valueDeserializer": return java.lang.String.class;
         case "valueserializer":
@@ -623,6 +627,8 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "synchronous": return target.getConfiguration().isSynchronous();
         case "topicispattern":
         case "topicIsPattern": return 
target.getConfiguration().isTopicIsPattern();
+        case "useiterator":
+        case "useIterator": return target.getConfiguration().isUseIterator();
         case "valuedeserializer":
         case "valueDeserializer": return 
target.getConfiguration().getValueDeserializer();
         case "valueserializer":
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index 6a70d0dc1da..87b0b5ab1cd 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(103);
+        Set<String> props = new HashSet<>(104);
         props.add("additionalProperties");
         props.add("allowManualCommit");
         props.add("autoCommitEnable");
@@ -120,6 +120,7 @@ public class KafkaEndpointUriFactory extends 
org.apache.camel.support.component.
         props.add("synchronous");
         props.add("topic");
         props.add("topicIsPattern");
+        props.add("useIterator");
         props.add("valueDeserializer");
         props.add("valueSerializer");
         props.add("workerPool");
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 1b6ac7fce5b..d1b764b5713 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -96,6 +96,7 @@
     "retries": { "kind": "property", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
err [...]
     "retryBackoffMs": { "kind": "property", "displayName": "Retry Backoff Ms", 
"group": "producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a n [...]
     "sendBufferBytes": { "kind": "property", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
+    "useIterator": { "kind": "property", "displayName": "Use Iterator", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Sets whether sending to kafka should send the 
message body as a single record, or use a java.util.Iterato [...]
     "valueSerializer": { "kind": "property", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
     "workerPool": { "kind": "property", "displayName": "Worker Pool", "group": 
"producer", "label": "producer", "required": false, "type": "object", 
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom worker pool for continue 
routing Exchange after kafka server has acknowledge the mess [...]
     "workerPoolCoreSize": { "kind": "property", "displayName": "Worker Pool 
Core Size", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "10", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Number of core threads 
for the worker pool for continue routing Exchange after  [...]
@@ -217,6 +218,7 @@
     "retries": { "kind": "parameter", "displayName": "Retries", "group": 
"producer", "label": "producer", "required": false, "type": "integer", 
"javaType": "java.lang.Integer", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Setting a value greater than zero will cause 
the client to resend any record whose send fails with a potentially transient 
er [...]
     "retryBackoffMs": { "kind": "parameter", "displayName": "Retry Backoff 
Ms", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "100", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Before each retry, the producer refreshes the 
metadata of relevant topics to see if a  [...]
     "sendBufferBytes": { "kind": "parameter", "displayName": "Send Buffer 
Bytes", "group": "producer", "label": "producer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "131072", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Socket write buffer size" },
+    "useIterator": { "kind": "parameter", "displayName": "Use Iterator", 
"group": "producer", "label": "producer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Sets whether sending to kafka should send the 
message body as a single record, or use a java.util.Iterat [...]
     "valueSerializer": { "kind": "parameter", "displayName": "Value 
Serializer", "group": "producer", "label": "producer", "required": false, 
"type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": 
"org.apache.kafka.common.serialization.StringSerializer", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The serializer class for messages." },
     "workerPool": { "kind": "parameter", "displayName": "Worker Pool", 
"group": "producer", "label": "producer", "required": false, "type": "object", 
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "To use a custom worker pool for continue 
routing Exchange after kafka server has acknowledge the mes [...]
     "workerPoolCoreSize": { "kind": "parameter", "displayName": "Worker Pool 
Core Size", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": "10", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "Number of core threads 
for the worker pool for continue routing Exchange after [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 38a4687c2f4..6b84414c7bb 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -174,6 +174,8 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     private String key;
     @UriParam(label = "producer")
     private Integer partitionKey;
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean useIterator = true;
     @UriParam(label = "producer", enums = "all,-1,0,1", defaultValue = "all")
     private String requestRequiredAcks = "all";
     // buffer.memory
@@ -1311,6 +1313,18 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         this.partitionKey = partitionKey;
     }
 
+    public boolean isUseIterator() {
+        return useIterator;
+    }
+
+    /**
+     * Sets whether sending to kafka should send the message body as a single 
record, or use a java.util.Iterator to
+     * send multiple records to kafka (if the message body can be iterated).
+     */
+    public void setUseIterator(boolean useIterator) {
+        this.useIterator = useIterator;
+    }
+
     public String getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index d1dcaa30bbe..524d8e50254 100755
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -391,7 +391,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
             startKafkaTransaction(exchange);
         }
 
-        if (isIterable(message.getBody())) {
+        if (endpoint.getConfiguration().isUseIterator() && 
isIterable(message.getBody())) {
             processIterableSync(exchange, message);
         } else {
             processSingleMessageSync(exchange, message);
@@ -470,11 +470,10 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         try {
             // is the message body a list or something that contains multiple 
values
-            if (isIterable(body)) {
+            if (endpoint.getConfiguration().isUseIterator() && 
isIterable(body)) {
                 processIterableAsync(exchange, producerCallBack, message);
             } else {
                 final ProducerRecord<Object, Object> record = 
createRecord(exchange, message);
-
                 doSend(exchange, record, producerCallBack);
             }
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorFalseIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorFalseIT.java
new file mode 100644
index 00000000000..20ab059a1ce
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorFalseIT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.StreamSupport;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class KafkaProducerUseIteratorFalseIT extends 
BaseEmbeddedKafkaTestSupport {
+
+    private static final String TOPIC = "use-iterator-false";
+
+    private static final String FROM_URI = "kafka:" + TOPIC
+                                           + 
"?groupId=KafkaProducerUseIteratorFalseIT&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                                           + 
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                                           + 
"&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor";
+
+    @BeforeEach
+    public void init() {
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Test
+    public void testUseIteratorFalse() throws Exception {
+        List<String> body = new ArrayList<>();
+        body.add("first");
+        body.add("second");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived(body.toString());
+
+        template.sendBody("direct:start", body);
+
+        mock.assertIsSatisfied(5000);
+
+        assertEquals(1, MockConsumerInterceptor.recordsCaptured.stream()
+                .flatMap(i -> 
StreamSupport.stream(i.records(TOPIC).spliterator(), false)).count());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start").to("kafka:" + TOPIC + 
"?groupId=KafkaProducerUseIteratorFalseIT&useIterator=false");
+
+                from(FROM_URI)
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorIT.java
new file mode 100644
index 00000000000..fcd1763eccf
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaProducerUseIteratorIT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.StreamSupport;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class KafkaProducerUseIteratorIT extends BaseEmbeddedKafkaTestSupport {
+
+    private static final String TOPIC = "use-iterator";
+
+    private static final String FROM_URI = "kafka:" + TOPIC
+                                           + 
"?groupId=KafkaProducerUseIteratorIT&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+                                           + 
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                                           + 
"&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor";
+
+    @BeforeEach
+    public void init() {
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Test
+    public void testUseIteratorTrue() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("first", "second");
+
+        List<String> body = new ArrayList<>();
+        body.add("first");
+        body.add("second");
+
+        template.sendBody("direct:start", body);
+
+        mock.assertIsSatisfied(5000);
+
+        assertEquals(2, MockConsumerInterceptor.recordsCaptured.stream()
+                .flatMap(i -> 
StreamSupport.stream(i.records(TOPIC).spliterator(), false)).count());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start").to("kafka:" + TOPIC + 
"?groupId=KafkaProducerUseIteratorIT");
+
+                from(FROM_URI)
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 81e0bb08f4c..bb9c2cec8ab 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -1509,6 +1509,23 @@ public interface KafkaComponentBuilderFactory {
             doSetProperty("sendBufferBytes", sendBufferBytes);
             return this;
         }
+        /**
+         * Sets whether sending to kafka should send the message body as a
+         * single record, or use a java.util.Iterator to send multiple records
+         * to kafka (if the message body can be iterated).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: true
+         * Group: producer
+         * 
+         * @param useIterator the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder useIterator(boolean useIterator) {
+            doSetProperty("useIterator", useIterator);
+            return this;
+        }
         /**
          * The serializer class for messages.
          * 
@@ -2215,6 +2232,7 @@ public interface KafkaComponentBuilderFactory {
             case "retries": getOrCreateConfiguration((KafkaComponent) 
component).setRetries((java.lang.Integer) value); return true;
             case "retryBackoffMs": getOrCreateConfiguration((KafkaComponent) 
component).setRetryBackoffMs((java.lang.Integer) value); return true;
             case "sendBufferBytes": getOrCreateConfiguration((KafkaComponent) 
component).setSendBufferBytes((java.lang.Integer) value); return true;
+            case "useIterator": getOrCreateConfiguration((KafkaComponent) 
component).setUseIterator((boolean) value); return true;
             case "valueSerializer": getOrCreateConfiguration((KafkaComponent) 
component).setValueSerializer((java.lang.String) value); return true;
             case "workerPool": getOrCreateConfiguration((KafkaComponent) 
component).setWorkerPool((java.util.concurrent.ExecutorService) value); return 
true;
             case "workerPoolCoreSize": 
getOrCreateConfiguration((KafkaComponent) 
component).setWorkerPoolCoreSize((java.lang.Integer) value); return true;
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 57410c412e8..0f045544d4d 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -3330,6 +3330,41 @@ public interface KafkaEndpointBuilderFactory {
             doSetProperty("sendBufferBytes", sendBufferBytes);
             return this;
         }
+        /**
+         * Sets whether sending to kafka should send the message body as a
+         * single record, or use a java.util.Iterator to send multiple records
+         * to kafka (if the message body can be iterated).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: true
+         * Group: producer
+         * 
+         * @param useIterator the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointProducerBuilder useIterator(boolean useIterator) {
+            doSetProperty("useIterator", useIterator);
+            return this;
+        }
+        /**
+         * Sets whether sending to kafka should send the message body as a
+         * single record, or use a java.util.Iterator to send multiple records
+         * to kafka (if the message body can be iterated).
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: true
+         * Group: producer
+         * 
+         * @param useIterator the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointProducerBuilder useIterator(String useIterator) {
+            doSetProperty("useIterator", useIterator);
+            return this;
+        }
         /**
          * The serializer class for messages.
          * 


Reply via email to