This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch batch in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8a5db34e166c4f337f11f52b6a3420f8518bd460 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jul 24 20:25:30 2023 +0200 CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed. --- .../aws2/ddbstream/Ddb2StreamConsumer.java | 28 ++++++++++++++++------ .../aws2/ddbstream/Ddb2StreamEndpoint.java | 1 - 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java index 7cc4714b580..d6b0b1b8da1 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java @@ -25,12 +25,12 @@ import java.util.Queue; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Processor; import org.apache.camel.health.HealthCheckHelper; import org.apache.camel.health.WritableHealthCheckRepository; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; -import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException; @@ -99,16 +99,29 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { @Override public int processBatch(Queue<Object> exchanges) throws Exception { - int processedExchanges = 0; - while (!exchanges.isEmpty()) { - final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); + int total = exchanges.size(); + int answer = 0; + + for (int index = 0; index < total && isBatchAllowed(); index++) { + // only loop if we are started (allowed to run) + // use poll to remove the head so it does not consume memory even + // after we have processed it + Exchange exchange = (Exchange) exchanges.poll(); + // add current index and total as properties + exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index); + exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total); + exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1); + + // update pending number of exchanges + pendingExchanges = total - index - 1; // use default consumer callback AsyncCallback cb = defaultConsumerCallback(exchange, true); getAsyncProcessor().process(exchange, cb); - processedExchanges++; + answer++; } - return processedExchanges; + + return answer; } @Override @@ -122,7 +135,8 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { if (healthCheckRepository != null) { consumerHealthCheck = new Ddb2StreamConsumerHealthCheck(this, getRouteId()); - consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled() && getEndpoint().getComponent().isHealthCheckConsumerEnabled()); + consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled() + && getEndpoint().getComponent().isHealthCheckConsumerEnabled()); healthCheckRepository.addHealthCheck(consumerHealthCheck); } } diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java index 261ab43c344..715234651ed 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java @@ -19,7 +19,6 @@ package org.apache.camel.component.aws2.ddbstream; import java.net.URI; import org.apache.camel.Category; -import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer;