This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d8978186f54dbd380a28690ec7df7ced5c5c98c4
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Fri Jan 19 16:08:48 2024 +0100

    CAMEL-16044: added documentation
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 40 ++++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_4.adoc     |  4 +++
 2 files changed, 44 insertions(+)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 2f44d00f73a..d8d8879bb4a 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -493,4 +493,44 @@ static {
     KafkaComponent.setKerberosConfigLocation("path/to/config/file");
 }
 ----
+
+=== Batching Consumer
+
+To use a Kafka batching consumer with Camel, an application has to set the 
configuration `batching` to `true` and use manual commits.
+
+The received records are stored in a list in the exchange used in the 
pipeline. As such, it is possible to commit individually
+every record or the whole batch at once by committing the last exchange on the 
list.
+
+When working with batch processing, it's up to the application to commit the 
records, and handle the outcome of potentially invalid records.
+
+The size of the batch is controlled by the option `maxPollRecords`.
+
+In order to avoid blocking for too long, waiting for the a whole set of 
records to fill the batch, it is is possible to use the `pollTimeoutMs` option 
to set a timeout for the polling. In this case, the batch may contain less 
messages than set in the `maxPollRecords`.
+
+[source,java]
+----
+    
from("kafka:topic?batching=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+    .process(e -> {
+        // The received records are stored as exchanges in a list. This gets 
the list of those exchanges
+        final List<?> exchanges = e.getMessage().getBody(List.class);
+
+        // Ensure we are actually receiving what we are asking for
+        if (exchanges == null || exchanges.isEmpty()) {
+            return;
+        }
+
+        /*
+        Every exchange in that list should contain a reference to the manual 
commit object. We use the reference
+        for the last exchange in the list to commit the whole batch
+         */
+        final Object tmp = exchanges.getLast();
+        if (tmp instanceof Exchange exchange) {
+            KafkaManualCommit manual =
+                    
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+            LOG.debug("Performing manual commit");
+            manual.commit();
+            LOG.debug("Done performing manual commit");
+        }
+    });
+----
 include::spring-boot:partial$starter.adoc[]
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
index 2384b73e98b..9bbb3a90cbe 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
@@ -134,6 +134,10 @@ The JdbcAggregationRepository now provides a 
deserializationFilter parameter. Th
 
 The component was removed without deprecation. The library supporting this 
component has been unmaintained for a long time. We found no indications that 
the library itself nor the component are working with modern Facebook, along 
with the absence of community interest, which lead us to decide to remove this 
component without deprecation.
 
+=== camel-kafka
+
+The component now has support for batch processing.
+
 == Camel Spring Boot
 
 === Auto Configuration

Reply via email to