This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a782cf49d39 CAMEL-19665: camel-github should be batch poll consumer a782cf49d39 is described below commit a782cf49d3968ce79f52fc73a663d0ac559cf863 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Dec 27 13:08:04 2024 +0100 CAMEL-19665: camel-github should be batch poll consumer --- .../github/consumer/AbstractGitHubConsumer.java | 30 ++++++++++++++++++-- .../component/github/consumer/CommitConsumer.java | 32 ++++++++++++++++++---- .../component/github/consumer/EventsConsumer.java | 20 ++++++++------ .../consumer/PullRequestCommentConsumer.java | 10 +++---- .../github/consumer/PullRequestConsumer.java | 10 +++---- .../component/github/consumer/TagConsumer.java | 13 +++++---- 6 files changed, 83 insertions(+), 32 deletions(-) diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java index cf6d9fb872d..596c6ebe605 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/AbstractGitHubConsumer.java @@ -16,18 +16,22 @@ */ package org.apache.camel.component.github.consumer; +import java.util.Queue; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Processor; import org.apache.camel.component.github.GitHubConstants; import org.apache.camel.component.github.GitHubEndpoint; import org.apache.camel.spi.Registry; -import org.apache.camel.support.ScheduledPollConsumer; +import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.eclipse.egit.github.core.Repository; import org.eclipse.egit.github.core.service.GitHubService; import org.eclipse.egit.github.core.service.RepositoryService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractGitHubConsumer extends ScheduledPollConsumer { +public abstract class AbstractGitHubConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(AbstractGitHubConsumer.class); private final GitHubEndpoint endpoint; @@ -66,5 +70,25 @@ public abstract class AbstractGitHubConsumer extends ScheduledPollConsumer { } @Override - protected abstract int poll() throws Exception; + public int processBatch(Queue<Object> exchanges) throws Exception { + int total = exchanges.size(); + int answer = total; + if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) { + LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", + this.maxMessagesPerPoll, total); + total = this.maxMessagesPerPoll; + } + + for (int index = 0; index < total && this.isBatchAllowed(); ++index) { + Exchange exchange = (Exchange) exchanges.poll(); + exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index); + exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total); + exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1); + this.pendingExchanges = total - index - 1; + getProcessor().process(exchange); + } + + return answer; + } + } diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java index ec03b96ee36..55f2ffc19fd 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java @@ -22,6 +22,7 @@ import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Processor; import org.apache.camel.component.github.GitHubConstants; import org.apache.camel.component.github.GitHubEndpoint; @@ -32,8 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CommitConsumer extends AbstractGitHubConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(CommitConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(CommitConsumer.class); private static final int CAPACITY = 1000; // in case there is a lot of commits and this runs not very frequently private CommitService commitService; @@ -156,10 +157,9 @@ public class CommitConsumer extends AbstractGitHubConsumer { } } - int counter = 0; + Queue<Object> exchanges = new ArrayDeque<>(); while (!newCommits.isEmpty()) { RepositoryCommit newCommit = newCommits.pop(); - lastSha = newCommit.getSha(); Exchange e = createExchange(true); if (newCommit.getAuthor() != null) { e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, newCommit.getAuthor().getName()); @@ -170,9 +170,9 @@ public class CommitConsumer extends AbstractGitHubConsumer { e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, newCommit.getSha()); e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, newCommit.getUrl()); e.getMessage().setBody(newCommit.getCommit().getMessage()); - getProcessor().process(e); - counter++; + exchanges.add(e); } + int counter = processBatch(exchanges); LOG.debug("Last sha: {}", lastSha); return counter; } finally { @@ -180,4 +180,26 @@ public class CommitConsumer extends AbstractGitHubConsumer { } } + @Override + public int processBatch(Queue<Object> exchanges) throws Exception { + int total = exchanges.size(); + int answer = total; + if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) { + LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", + this.maxMessagesPerPoll, total); + total = this.maxMessagesPerPoll; + } + + for (int index = 0; index < total && this.isBatchAllowed(); ++index) { + Exchange exchange = (Exchange) exchanges.poll(); + exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index); + exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total); + exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1); + this.pendingExchanges = total - index - 1; + this.lastSha = exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_SHA, String.class); + getProcessor().process(exchange); + } + + return answer; + } } diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java index e1b77776626..66a2f7e557f 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java @@ -16,9 +16,11 @@ */ package org.apache.camel.component.github.consumer; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Queue; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -30,9 +32,13 @@ import org.eclipse.egit.github.core.Repository; import org.eclipse.egit.github.core.client.PageIterator; import org.eclipse.egit.github.core.event.Event; import org.eclipse.egit.github.core.service.EventService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EventsConsumer extends AbstractGitHubConsumer { + private static final Logger LOG = LoggerFactory.getLogger(EventsConsumer.class); + private final EventService eventService; private final GitHubEventFetchStrategy eventFetchStrategy; private long lastEventId; @@ -72,22 +78,20 @@ public class EventsConsumer extends AbstractGitHubConsumer { } } - int counter = 0; + Queue<Object> exchanges = new ArrayDeque<>(); if (!newEvents.isEmpty()) { newEvents.sort((e1, e2) -> Long.valueOf(e1.getId()).compareTo(Long.parseLong(e2.getId()))); Event latestEvent = newEvents.get(newEvents.size() - 1); lastEventId = Long.parseLong(latestEvent.getId()); for (Event event : newEvents) { - Exchange exchange = createExchange(true); - exchange.getMessage().setBody(event.getType()); - exchange.getMessage().setHeader(GitHubConstants.GITHUB_EVENT_PAYLOAD, event.getPayload()); - getProcessor().process(exchange); - counter++; + Exchange e = createExchange(true); + e.getMessage().setBody(event.getType()); + e.getMessage().setHeader(GitHubConstants.GITHUB_EVENT_PAYLOAD, event.getPayload()); + exchanges.add(e); } } - - return counter; + return processBatch(exchanges); } /** diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java index e6a4f16f24e..b0c888c092c 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -108,7 +109,7 @@ public class PullRequestCommentConsumer extends AbstractGitHubConsumer { } } - int counter = 0; + Queue<Object> exchanges = new ArrayDeque<>(); while (!newComments.isEmpty()) { Comment newComment = newComments.pop(); Exchange e = createExchange(true); @@ -116,10 +117,9 @@ public class PullRequestCommentConsumer extends AbstractGitHubConsumer { // Required by the producers. Set it here for convenience. e.getIn().setHeader(GitHubConstants.GITHUB_PULLREQUEST, commentIdToPullRequest.get(newComment.getId())); - - getProcessor().process(e); - counter++; + exchanges.add(e); } - return counter; + return processBatch(exchanges); } + } diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java index a78aaf221c7..965a88ee436 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.github.consumer; import java.util.ArrayDeque; import java.util.List; +import java.util.Queue; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -74,7 +75,7 @@ public class PullRequestConsumer extends AbstractGitHubConsumer { lastOpenPullRequest = openPullRequests.get(0).getNumber(); } - int counter = 0; + Queue<Object> exchanges = new ArrayDeque<>(); while (!newPullRequests.isEmpty()) { PullRequest newPullRequest = newPullRequests.pop(); Exchange e = createExchange(true); @@ -86,10 +87,9 @@ public class PullRequestConsumer extends AbstractGitHubConsumer { if (newPullRequest.getHead() != null) { e.getIn().setHeader(GitHubConstants.GITHUB_PULLREQUEST_HEAD_COMMIT_SHA, newPullRequest.getHead().getSha()); } - - getProcessor().process(e); - counter++; + exchanges.add(e); } - return counter; + return processBatch(exchanges); } + } diff --git a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java index 0d57505696c..98429a49c2b 100644 --- a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java +++ b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.github.consumer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -28,9 +29,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TagConsumer extends AbstractGitHubConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(TagConsumer.class); - private List<String> tagNames = new ArrayList<>(); + private static final Logger LOG = LoggerFactory.getLogger(TagConsumer.class); + + private final List<String> tagNames = new ArrayList<>(); public TagConsumer(GitHubEndpoint endpoint, Processor processor) throws Exception { super(endpoint, processor); @@ -54,14 +56,13 @@ public class TagConsumer extends AbstractGitHubConsumer { } } - int counter = 0; + Queue<Object> exchanges = new ArrayDeque<>(); while (!newTags.isEmpty()) { RepositoryTag newTag = newTags.pop(); Exchange e = createExchange(true); e.getIn().setBody(newTag); - getProcessor().process(e); - counter++; + exchanges.add(e); } - return counter; + return processBatch(exchanges); } }