This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new d72465a3ed6 CAMEL-19665: camel-couchbase should be batch poll consumer
d72465a3ed6 is described below

commit d72465a3ed6e6991a668c420b271c61648e0e874
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Dec 27 13:49:03 2024 +0100

    CAMEL-19665: camel-couchbase should be batch poll consumer
---
 .../component/couchbase/CouchbaseConsumer.java     | 104 ++++++++++++---------
 .../integration/CouchbaseIntegrationTestBase.java  |   2 +-
 2 files changed, 63 insertions(+), 43 deletions(-)

diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index ed171b5fffc..fde0271f3ad 100644
--- 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.couchbase;
 
+import java.util.ArrayDeque;
+import java.util.Queue;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -28,10 +30,11 @@ import com.couchbase.client.java.view.ViewOrdering;
 import com.couchbase.client.java.view.ViewResult;
 import com.couchbase.client.java.view.ViewRow;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Processor;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.support.DefaultScheduledPollConsumer;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.support.resume.ResumeStrategyHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,14 +45,14 @@ import static 
org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_ID;
 import static 
org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_KEY;
 import static 
org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_VIEWNAME;
 
-public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements 
ResumeAware<ResumeStrategy> {
+public class CouchbaseConsumer extends ScheduledBatchPollingConsumer 
implements ResumeAware<ResumeStrategy> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CouchbaseConsumer.class);
 
     private final Lock lock = new ReentrantLock();
     private final CouchbaseEndpoint endpoint;
-    private final Bucket bucket;
-    private final Collection collection;
+    private Bucket bucket;
+    private Collection collection;
     private ViewOptions viewOptions;
 
     private ResumeStrategy resumeStrategy;
@@ -58,22 +61,23 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer implements R
         super(endpoint, processor);
         this.bucket = client;
         this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doInit() {
         Scope scope;
         if (endpoint.getScope() != null) {
-            scope = client.scope(endpoint.getScope());
+            scope = bucket.scope(endpoint.getScope());
         } else {
-            scope = client.defaultScope();
+            scope = bucket.defaultScope();
         }
 
         if (endpoint.getCollection() != null) {
             this.collection = scope.collection(endpoint.getCollection());
         } else {
-            this.collection = client.defaultCollection();
+            this.collection = bucket.defaultCollection();
         }
-    }
 
-    @Override
-    protected void doInit() {
         this.viewOptions = ViewOptions.viewOptions();
         int limit = endpoint.getLimit();
         if (limit > 0) {
@@ -122,10 +126,12 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer implements R
             forceConsumerAsReady();
 
             if (LOG.isTraceEnabled()) {
-                LOG.trace("ViewResponse =  {}", result);
+                LOG.trace("ViewResponse: {}", result);
             }
 
             String consumerProcessedStrategy = 
endpoint.getConsumerProcessedStrategy();
+
+            Queue<Object> exchanges = new ArrayDeque<>();
             for (ViewRow row : result.rows()) {
                 Object doc;
                 String id = row.id().get();
@@ -139,45 +145,60 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer implements R
                 String designDocumentName = endpoint.getDesignDocumentName();
                 String viewName = endpoint.getViewName();
 
-                Exchange exchange = createExchange(false);
-                try {
-                    exchange.getIn().setBody(doc);
-                    exchange.getIn().setHeader(HEADER_ID, id);
-                    exchange.getIn().setHeader(HEADER_KEY, key);
-                    exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, 
designDocumentName);
-                    exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
-
-                    if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Deleting doc with ID {}", id);
-                        }
-                        
CouchbaseCollectionOperation.removeDocument(collection, id, 
endpoint.getWriteQueryTimeout(),
-                                endpoint.getProducerRetryPause());
-                    } else if 
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Filtering out ID {}", id);
-                        }
-                        // add filter for already processed docs
-                    } else {
-                        LOG.trace("No strategy set for already processed docs, 
beware of duplicates!");
-                    }
+                Exchange exchange = createExchange(true);
+                exchange.getIn().setBody(doc);
+                exchange.getIn().setHeader(HEADER_ID, id);
+                exchange.getIn().setHeader(HEADER_KEY, key);
+                exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, 
designDocumentName);
+                exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
 
-                    logDetails(id, doc, key, designDocumentName, viewName, 
exchange);
-
-                    getProcessor().process(exchange);
-                } catch (Exception e) {
-                    this.getExceptionHandler().handleException("Error 
processing exchange.", exchange, e);
-                } finally {
-                    releaseExchange(exchange, false);
+                if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Deleting doc with ID {}", id);
+                    }
+                    CouchbaseCollectionOperation.removeDocument(collection, 
id, endpoint.getWriteQueryTimeout(),
+                            endpoint.getProducerRetryPause());
+                } else if 
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Filtering out ID {}", id);
+                    }
+                    // add filter for already processed docs
+                } else {
+                    LOG.trace("No strategy set for already processed docs, 
beware of duplicates!");
                 }
+
+                logDetails(id, doc, key, designDocumentName, viewName, 
exchange);
+                exchanges.add(exchange);
             }
 
-            return result.rows().size();
+            return processBatch(exchanges);
         } finally {
             lock.unlock();
         }
     }
 
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int total = exchanges.size();
+        int answer = total;
+        if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
+            LOG.debug("Limiting to maximum messages to poll {} as there were 
{} messages in this poll.",
+                    this.maxMessagesPerPoll, total);
+            total = this.maxMessagesPerPoll;
+        }
+
+        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
+            Exchange exchange = (Exchange) exchanges.poll();
+            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index);
+            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total);
+            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == 
total - 1);
+            this.pendingExchanges = total - index - 1;
+            getProcessor().process(exchange);
+        }
+
+        return answer;
+    }
+
     private void logDetails(String id, Object doc, String key, String 
designDocumentName, String viewName, Exchange exchange) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Created exchange = {}", exchange);
@@ -188,7 +209,6 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer implements R
             LOG.trace("Design Document Name = {}", designDocumentName);
             LOG.trace("View Name = {}", viewName);
         }
-
     }
 
     @Override
diff --git 
a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java
 
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java
index 4d9e3d80282..851cc87ade8 100644
--- 
a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java
+++ 
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/CouchbaseIntegrationTestBase.java
@@ -51,7 +51,7 @@ public class CouchbaseIntegrationTestBase extends 
CamelTestSupport {
         cluster.buckets().createBucket(
                 
BucketSettings.create(bucketName).bucketType(BucketType.COUCHBASE).flushEnabled(true));
 
-        Bucket bucket = cluster.bucket(bucketName);
+        cluster.bucket(bucketName);
         DesignDocument designDoc = new DesignDocument(
                 bucketName,
                 Collections.singletonMap(bucketName, new View("function (doc, 
meta) {  emit(meta.id, doc);}")));

Reply via email to