This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a782cf49d39 CAMEL-19665: camel-github should be batch poll consumer
a782cf49d39 is described below

commit a782cf49d3968ce79f52fc73a663d0ac559cf863
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Dec 27 13:08:04 2024 +0100

    CAMEL-19665: camel-github should be batch poll consumer
---
 .../github/consumer/AbstractGitHubConsumer.java    | 30 ++++++++++++++++++--
 .../component/github/consumer/CommitConsumer.java  | 32 ++++++++++++++++++----
 .../component/github/consumer/EventsConsumer.java  | 20 ++++++++------
 .../consumer/PullRequestCommentConsumer.java       | 10 +++----
 .../github/consumer/PullRequestConsumer.java       | 10 +++----
 .../component/github/consumer/TagConsumer.java     | 13 +++++----
 6 files changed, 83 insertions(+), 32 deletions(-)

diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java
index cf6d9fb872d..596c6ebe605 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java
@@ -16,18 +16,22 @@
  */
 package org.apache.camel.component.github.consumer;
 
+import java.util.Queue;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Processor;
 import org.apache.camel.component.github.GitHubConstants;
 import org.apache.camel.component.github.GitHubEndpoint;
 import org.apache.camel.spi.Registry;
-import org.apache.camel.support.ScheduledPollConsumer;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.eclipse.egit.github.core.Repository;
 import org.eclipse.egit.github.core.service.GitHubService;
 import org.eclipse.egit.github.core.service.RepositoryService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractGitHubConsumer extends ScheduledPollConsumer {
+public abstract class AbstractGitHubConsumer extends 
ScheduledBatchPollingConsumer {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractGitHubConsumer.class);
 
     private final GitHubEndpoint endpoint;
@@ -66,5 +70,25 @@ public abstract class AbstractGitHubConsumer extends 
ScheduledPollConsumer {
     }
 
     @Override
-    protected abstract int poll() throws Exception;
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int total = exchanges.size();
+        int answer = total;
+        if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
+            LOG.debug("Limiting to maximum messages to poll {} as there were 
{} messages in this poll.",
+                    this.maxMessagesPerPoll, total);
+            total = this.maxMessagesPerPoll;
+        }
+
+        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
+            Exchange exchange = (Exchange) exchanges.poll();
+            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index);
+            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total);
+            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == 
total - 1);
+            this.pendingExchanges = total - index - 1;
+            getProcessor().process(exchange);
+        }
+
+        return answer;
+    }
+
 }
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
index ec03b96ee36..55f2ffc19fd 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
@@ -22,6 +22,7 @@ import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Processor;
 import org.apache.camel.component.github.GitHubConstants;
 import org.apache.camel.component.github.GitHubEndpoint;
@@ -32,8 +33,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CommitConsumer extends AbstractGitHubConsumer {
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(CommitConsumer.class);
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CommitConsumer.class);
     private static final int CAPACITY = 1000; // in case there is a lot of 
commits and this runs not very frequently
 
     private CommitService commitService;
@@ -156,10 +157,9 @@ public class CommitConsumer extends AbstractGitHubConsumer 
{
                 }
             }
 
-            int counter = 0;
+            Queue<Object> exchanges = new ArrayDeque<>();
             while (!newCommits.isEmpty()) {
                 RepositoryCommit newCommit = newCommits.pop();
-                lastSha = newCommit.getSha();
                 Exchange e = createExchange(true);
                 if (newCommit.getAuthor() != null) {
                     
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, 
newCommit.getAuthor().getName());
@@ -170,9 +170,9 @@ public class CommitConsumer extends AbstractGitHubConsumer {
                 e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, 
newCommit.getSha());
                 e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, 
newCommit.getUrl());
                 e.getMessage().setBody(newCommit.getCommit().getMessage());
-                getProcessor().process(e);
-                counter++;
+                exchanges.add(e);
             }
+            int counter = processBatch(exchanges);
             LOG.debug("Last sha: {}", lastSha);
             return counter;
         } finally {
@@ -180,4 +180,26 @@ public class CommitConsumer extends AbstractGitHubConsumer 
{
         }
     }
 
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int total = exchanges.size();
+        int answer = total;
+        if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
+            LOG.debug("Limiting to maximum messages to poll {} as there were 
{} messages in this poll.",
+                    this.maxMessagesPerPoll, total);
+            total = this.maxMessagesPerPoll;
+        }
+
+        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
+            Exchange exchange = (Exchange) exchanges.poll();
+            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index);
+            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total);
+            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == 
total - 1);
+            this.pendingExchanges = total - index - 1;
+            this.lastSha = 
exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_SHA, 
String.class);
+            getProcessor().process(exchange);
+        }
+
+        return answer;
+    }
 }
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
index e1b77776626..66a2f7e557f 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.component.github.consumer;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Queue;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -30,9 +32,13 @@ import org.eclipse.egit.github.core.Repository;
 import org.eclipse.egit.github.core.client.PageIterator;
 import org.eclipse.egit.github.core.event.Event;
 import org.eclipse.egit.github.core.service.EventService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class EventsConsumer extends AbstractGitHubConsumer {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventsConsumer.class);
+
     private final EventService eventService;
     private final GitHubEventFetchStrategy eventFetchStrategy;
     private long lastEventId;
@@ -72,22 +78,20 @@ public class EventsConsumer extends AbstractGitHubConsumer {
             }
         }
 
-        int counter = 0;
+        Queue<Object> exchanges = new ArrayDeque<>();
         if (!newEvents.isEmpty()) {
             newEvents.sort((e1, e2) -> 
Long.valueOf(e1.getId()).compareTo(Long.parseLong(e2.getId())));
             Event latestEvent = newEvents.get(newEvents.size() - 1);
             lastEventId = Long.parseLong(latestEvent.getId());
 
             for (Event event : newEvents) {
-                Exchange exchange = createExchange(true);
-                exchange.getMessage().setBody(event.getType());
-                
exchange.getMessage().setHeader(GitHubConstants.GITHUB_EVENT_PAYLOAD, 
event.getPayload());
-                getProcessor().process(exchange);
-                counter++;
+                Exchange e = createExchange(true);
+                e.getMessage().setBody(event.getType());
+                e.getMessage().setHeader(GitHubConstants.GITHUB_EVENT_PAYLOAD, 
event.getPayload());
+                exchanges.add(e);
             }
         }
-
-        return counter;
+        return processBatch(exchanges);
     }
 
     /**
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
index e6a4f16f24e..b0c888c092c 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -108,7 +109,7 @@ public class PullRequestCommentConsumer extends 
AbstractGitHubConsumer {
             }
         }
 
-        int counter = 0;
+        Queue<Object> exchanges = new ArrayDeque<>();
         while (!newComments.isEmpty()) {
             Comment newComment = newComments.pop();
             Exchange e = createExchange(true);
@@ -116,10 +117,9 @@ public class PullRequestCommentConsumer extends 
AbstractGitHubConsumer {
 
             // Required by the producers.  Set it here for convenience.
             e.getIn().setHeader(GitHubConstants.GITHUB_PULLREQUEST, 
commentIdToPullRequest.get(newComment.getId()));
-
-            getProcessor().process(e);
-            counter++;
+            exchanges.add(e);
         }
-        return counter;
+        return processBatch(exchanges);
     }
+
 }
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
index a78aaf221c7..965a88ee436 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.github.consumer;
 
 import java.util.ArrayDeque;
 import java.util.List;
+import java.util.Queue;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -74,7 +75,7 @@ public class PullRequestConsumer extends 
AbstractGitHubConsumer {
             lastOpenPullRequest = openPullRequests.get(0).getNumber();
         }
 
-        int counter = 0;
+        Queue<Object> exchanges = new ArrayDeque<>();
         while (!newPullRequests.isEmpty()) {
             PullRequest newPullRequest = newPullRequests.pop();
             Exchange e = createExchange(true);
@@ -86,10 +87,9 @@ public class PullRequestConsumer extends 
AbstractGitHubConsumer {
             if (newPullRequest.getHead() != null) {
                 
e.getIn().setHeader(GitHubConstants.GITHUB_PULLREQUEST_HEAD_COMMIT_SHA, 
newPullRequest.getHead().getSha());
             }
-
-            getProcessor().process(e);
-            counter++;
+            exchanges.add(e);
         }
-        return counter;
+        return processBatch(exchanges);
     }
+
 }
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
index 0d57505696c..98429a49c2b 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.github.consumer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -28,9 +29,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TagConsumer extends AbstractGitHubConsumer {
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(TagConsumer.class);
 
-    private List<String> tagNames = new ArrayList<>();
+    private static final Logger LOG = 
LoggerFactory.getLogger(TagConsumer.class);
+
+    private final List<String> tagNames = new ArrayList<>();
 
     public TagConsumer(GitHubEndpoint endpoint, Processor processor) throws 
Exception {
         super(endpoint, processor);
@@ -54,14 +56,13 @@ public class TagConsumer extends AbstractGitHubConsumer {
             }
         }
 
-        int counter = 0;
+        Queue<Object> exchanges = new ArrayDeque<>();
         while (!newTags.isEmpty()) {
             RepositoryTag newTag = newTags.pop();
             Exchange e = createExchange(true);
             e.getIn().setBody(newTag);
-            getProcessor().process(e);
-            counter++;
+            exchanges.add(e);
         }
-        return counter;
+        return processBatch(exchanges);
     }
 }

Reply via email to