This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.20.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.20.x by this push: new 54350f22acc CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9596) 54350f22acc is described below commit 54350f22accc3444410913dac0a41ad3ed7f08e0 Author: Christoph Deppisch <cdeppi...@redhat.com> AuthorDate: Wed Mar 22 06:24:35 2023 +0100 CAMEL-19174: camel-jira - Fix duplicate messages created by Jira issues consumer (#9589) (#9596) - Issues returned are already ordered descendant - do not revert the order of issues a 2nd time - Make sure to exit polling loop early when no further issues are returned (total # of issues returned is lower than query page size) - Avoid duplicates in returned issue list - Adjust query page size according to given max results limitation - Fix ordering of mocked issue return values in unit tests (mocks should return issues in descendant order) - Add more unit tests on new issues consumer (testing filter offset, pagination, duplicates) --- .../jira/consumer/AbstractJiraConsumer.java | 22 +++-- .../jira/consumer/NewCommentsConsumer.java | 2 +- .../component/jira/consumer/NewIssuesConsumer.java | 14 +-- .../jira/consumer/WatchUpdatesConsumer.java | 2 +- .../jira/consumer/NewCommentsConsumerTest.java | 14 +-- .../jira/consumer/NewIssuesConsumerTest.java | 108 ++++++++++++++++++--- .../jira/consumer/WatchUpdatesConsumerTest.java | 36 +++---- 7 files changed, 147 insertions(+), 51 deletions(-) diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java index d19fbd9817f..29b74c9d72b 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/AbstractJiraConsumer.java @@ -17,7 +17,9 @@ package org.apache.camel.component.jira.consumer; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import com.atlassian.jira.rest.client.api.JiraRestClient; import com.atlassian.jira.rest.client.api.RestClientException; @@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory; public abstract class AbstractJiraConsumer extends ScheduledPollConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(AbstractJiraConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractJiraConsumer.class); private final JiraEndpoint endpoint; @@ -79,25 +81,33 @@ public abstract class AbstractJiraConsumer extends ScheduledPollConsumer { protected List<Issue> getIssues(String jql, int start, int maxPerQuery, int maxResults) { LOG.debug("Start indexing current JIRA issues..."); - List<Issue> issues = new ArrayList<>(); + if (maxResults < maxPerQuery) { + maxPerQuery = maxResults; + } + + // Avoid duplicates + Set<Issue> issues = new LinkedHashSet<>(); while (true) { SearchRestClient searchRestClient = endpoint.getClient().getSearchClient(); - SearchResult searchResult = searchRestClient.searchJql(jql, maxResults, start, null).claim(); + SearchResult searchResult = searchRestClient.searchJql(jql, maxPerQuery, start, null).claim(); for (Issue issue : searchResult.getIssues()) { issues.add(issue); } // Note: #getTotal == the total # the query would return *without* pagination, effectively telling us - // we've reached the end. Also exit early if we're limiting the # of results. - if (start >= searchResult.getTotal() || maxResults > 0 && issues.size() >= maxResults) { + // we've reached the end. Also exit early if we're limiting the # of results or + // if total # of returned issues is lower than the actual page size. + if (maxPerQuery >= searchResult.getTotal() || + start >= searchResult.getTotal() || + maxResults > 0 && issues.size() >= maxResults) { break; } start += maxPerQuery; } LOG.debug("End indexing current JIRA issues. {} issues indexed.", issues.size()); - return issues; + return new ArrayList<>(issues); } protected JiraRestClient client() { diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java index a25121902ae..78dd7c865dc 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; */ public class NewCommentsConsumer extends AbstractJiraConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(NewCommentsConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(NewCommentsConsumer.class); private Long lastCommentId = -1L; diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java index ec76103d688..4347da19342 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; */ public class NewIssuesConsumer extends AbstractJiraConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(NewIssuesConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(NewIssuesConsumer.class); private final String jql; private long latestIssueId = -1; @@ -58,13 +58,14 @@ public class NewIssuesConsumer extends AbstractJiraConsumer { // grab only the top try { List<Issue> issues = getIssues(jql, 0, 1, 1); - // in case there aren't any issues... if (!issues.isEmpty()) { + // Issues returned are ordered descendant so this is the newest issue return issues.get(0).getId(); } } catch (Exception e) { // ignore } + // in case there aren't any issues... return -1; } @@ -72,9 +73,8 @@ public class NewIssuesConsumer extends AbstractJiraConsumer { // it may happen the poll() is called while the route is doing the initial load, // this way we need to wait for the latestIssueId being associated to the last indexed issue id List<Issue> newIssues = getNewIssues(); - // In the end, we want only *new* issues oldest to newest. - for (int i = newIssues.size() - 1; i > -1; i--) { - Issue newIssue = newIssues.get(i); + // In the end, we want only *new* issues oldest to newest. New issues returned are ordered descendant already. + for (Issue newIssue : newIssues) { Exchange e = createExchange(true); e.getIn().setBody(newIssue); getProcessor().process(e); @@ -113,8 +113,8 @@ public class NewIssuesConsumer extends AbstractJiraConsumer { if (!issues.isEmpty()) { // remember last id we have processed - int last = issues.size() - 1; - latestIssueId = issues.get(last).getId(); + // issues are ordered descendant so save the first issue in the list as the newest + latestIssueId = issues.get(0).getId(); } return issues; } diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java index f91dc8a2aac..774eea83cf3 100644 --- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java +++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; public class WatchUpdatesConsumer extends AbstractJiraConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(WatchUpdatesConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(WatchUpdatesConsumer.class); HashMap<Long, Issue> watchedIssues; List<String> watchedFieldsList; String watchedIssuesKeys; diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java index c0accd0f9cf..485c46a2a0b 100644 --- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java +++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewCommentsConsumerTest.java @@ -57,7 +57,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class NewCommentsConsumerTest extends CamelTestSupport { - private static List<Issue> issues = new ArrayList<>(); + private static final List<Issue> ISSUES = new ArrayList<>(); @Mock private JiraRestClient jiraClient; @@ -81,13 +81,13 @@ public class NewCommentsConsumerTest extends CamelTestSupport { @BeforeAll public static void beforeAll() { - issues.add(createIssueWithComments(1L, 1)); - issues.add(createIssueWithComments(2L, 1)); - issues.add(createIssueWithComments(3L, 1)); + ISSUES.add(createIssueWithComments(3L, 1)); + ISSUES.add(createIssueWithComments(2L, 1)); + ISSUES.add(createIssueWithComments(1L, 1)); } public void setMocks() { - SearchResult result = new SearchResult(0, 50, 100, issues); + SearchResult result = new SearchResult(0, 50, 100, ISSUES); Promise<SearchResult> promiseSearchResult = Promises.promise(result); Issue issue = createIssueWithComments(4L, 1); Promise<Issue> promiseIssue = Promises.promise(issue); @@ -159,9 +159,9 @@ public class NewCommentsConsumerTest extends CamelTestSupport { Issue issue2 = createIssueWithComments(21L, 3000); Issue issue3 = createIssueWithComments(22L, 1000); List<Issue> newIssues = new ArrayList<>(); - newIssues.add(issue1); - newIssues.add(issue2); newIssues.add(issue3); + newIssues.add(issue2); + newIssues.add(issue1); Issue issueWithNoComments = createIssue(31L); reset(searchRestClient); diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java index 76a4cfcb105..c3931b8003a 100644 --- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java +++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/NewIssuesConsumerTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jira.consumer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +35,7 @@ import org.apache.camel.component.jira.JiraComponent; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.spi.Registry; import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -52,7 +54,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class NewIssuesConsumerTest extends CamelTestSupport { - private static List<Issue> issues = new ArrayList<>(); + private static final List<Issue> ISSUES = new ArrayList<>(); @Mock private JiraRestClient jiraClient; @@ -73,13 +75,13 @@ public class NewIssuesConsumerTest extends CamelTestSupport { @BeforeAll public static void beforeAll() { - issues.add(createIssue(1L)); - issues.add(createIssue(2L)); - issues.add(createIssue(3L)); + ISSUES.add(createIssue(3L)); + ISSUES.add(createIssue(2L)); + ISSUES.add(createIssue(1L)); } public void setMocks() { - SearchResult result = new SearchResult(0, 50, 100, issues); + SearchResult result = new SearchResult(0, 50, 3, ISSUES); Promise<SearchResult> promiseSearchResult = Promises.promise(result); when(jiraClient.getSearchClient()).thenReturn(searchRestClient); @@ -102,7 +104,7 @@ public class NewIssuesConsumerTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() { - from("jira://newIssues?jiraUrl=" + JIRA_CREDENTIALS + "&jql=project=" + PROJECT + "&delay=5000") + from("jira://newIssues?jiraUrl=" + JIRA_CREDENTIALS + "&jql=project=" + PROJECT + "&delay=1000") .to(mockResult); } }; @@ -121,12 +123,12 @@ public class NewIssuesConsumerTest extends CamelTestSupport { reset(searchRestClient); AtomicBoolean searched = new AtomicBoolean(); when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> { - List<Issue> newIissues = new ArrayList<>(); + List<Issue> newIssues = new ArrayList<>(); if (!searched.get()) { - newIissues.add(issue); + newIssues.add(issue); searched.set(true); } - SearchResult result = new SearchResult(0, 50, 100, newIissues); + SearchResult result = new SearchResult(0, 50, 100, newIssues); return Promises.promise(result); }); mockResult.expectedBodiesReceived(issue); @@ -144,9 +146,9 @@ public class NewIssuesConsumerTest extends CamelTestSupport { when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> { List<Issue> newIssues = new ArrayList<>(); if (!searched.get()) { - newIssues.add(issue1); - newIssues.add(issue2); newIssues.add(issue3); + newIssues.add(issue2); + newIssues.add(issue1); searched.set(true); } SearchResult result = new SearchResult(0, 50, 3, newIssues); @@ -157,4 +159,88 @@ public class NewIssuesConsumerTest extends CamelTestSupport { mockResult.assertIsSatisfied(); } + @Test + public void multipleIssuesPaginationTest() throws Exception { + Issue issue1 = createIssue(31); + Issue issue2 = createIssue(32); + Issue issue3 = createIssue(33); + Issue issue4 = createIssue(34); + Issue issue5 = createIssue(35); + + reset(searchRestClient); + when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> { + int startAt = invocation.getArgument(2); + Assertions.assertEquals(0, startAt); + + // return getTotal=100 to force next page query + SearchResult result = new SearchResult(0, 50, 100, List.of(issue5, issue4, issue3)); + return Promises.promise(result); + }).then(invocation -> { + int startAt = invocation.getArgument(2); + Assertions.assertEquals(50, startAt); + SearchResult result = new SearchResult(0, 50, 100, List.of(issue2, issue1)); + return Promises.promise(result); + }).then(invocation -> { + int startAt = invocation.getArgument(2); + Assertions.assertEquals(100, startAt); + SearchResult result = new SearchResult(0, 50, 0, Collections.emptyList()); + return Promises.promise(result); + }); + + mockResult.expectedBodiesReceived(issue5, issue4, issue3, issue2, issue1); + mockResult.assertIsSatisfied(); + } + + @Test + public void multipleIssuesAvoidDuplicatesTest() throws Exception { + Issue issue1 = createIssue(41); + Issue issue2 = createIssue(42); + Issue issue3 = createIssue(43); + + reset(searchRestClient); + when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> { + // return getTotal=100 to force next page query + SearchResult result = new SearchResult(0, 50, 100, List.of(issue3, issue2)); + return Promises.promise(result); + }).then(invocation -> { + SearchResult result = new SearchResult(0, 50, 100, List.of(issue3, issue2, issue1)); + return Promises.promise(result); + }).then(invocation -> { + SearchResult result = new SearchResult(0, 50, 0, Collections.emptyList()); + return Promises.promise(result); + }); + + mockResult.expectedBodiesReceived(issue3, issue2, issue1); + mockResult.assertIsSatisfied(); + } + + @Test + public void multipleQueriesOffsetFilterTest() throws Exception { + Issue issue1 = createIssue(51); + Issue issue2 = createIssue(52); + Issue issue3 = createIssue(53); + Issue issue4 = createIssue(54); + + reset(searchRestClient); + when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> { + SearchResult result = new SearchResult(0, 50, 3, List.of(issue3, issue2, issue1)); + return Promises.promise(result); + }).then(invocation -> { + int startAt = invocation.getArgument(2); + Assertions.assertEquals(0, startAt); + + String jqlFilter = invocation.getArgument(0); + Assertions.assertTrue(jqlFilter.startsWith("id > 53")); + SearchResult result = new SearchResult(0, 50, 1, Collections.singletonList(issue4)); + return Promises.promise(result); + }); + + mockResult.expectedBodiesReceived(issue3, issue2, issue1); + mockResult.assertIsSatisfied(); + + mockResult.reset(); + + mockResult.expectedBodiesReceived(issue4); + mockResult.assertIsSatisfied(); + } } diff --git a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java index 369393fae28..b74cb5d3bae 100644 --- a/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java +++ b/components/camel-jira/src/test/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumerTest.java @@ -52,7 +52,7 @@ import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) public class WatchUpdatesConsumerTest extends CamelTestSupport { - private static List<Issue> issues = new ArrayList<>(); + private static final List<Issue> ISSUES = new ArrayList<>(); @Mock private JiraRestClient jiraClient; @@ -73,14 +73,14 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport { @BeforeAll public static void beforeAll() { - issues.clear(); - issues.add(createIssue(1L)); - issues.add(createIssue(2L)); - issues.add(createIssue(3L)); + ISSUES.clear(); + ISSUES.add(createIssue(1L)); + ISSUES.add(createIssue(2L)); + ISSUES.add(createIssue(3L)); } public void setMocks() { - SearchResult result = new SearchResult(0, 50, 100, issues); + SearchResult result = new SearchResult(0, 50, 100, ISSUES); Promise<SearchResult> promiseSearchResult = Promises.promise(result); when(jiraClient.getSearchClient()).thenReturn(searchRestClient); @@ -118,17 +118,17 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport { @Test public void singleChangeTest() throws Exception { - Issue issue = setPriority(issues.get(0), new Priority( + Issue issue = setPriority(ISSUES.get(0), new Priority( null, 4L, "High", null, null, null)); reset(searchRestClient); AtomicBoolean searched = new AtomicBoolean(); when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> { if (!searched.get()) { - issues.remove(0); - issues.add(0, issue); + ISSUES.remove(0); + ISSUES.add(0, issue); } - SearchResult result = new SearchResult(0, 50, 100, issues); + SearchResult result = new SearchResult(0, 50, 100, ISSUES); return Promises.promise(result); }); @@ -141,23 +141,23 @@ public class WatchUpdatesConsumerTest extends CamelTestSupport { @Test public void multipleChangesWithAddedNewIssueTest() throws Exception { - final Issue issue = transitionIssueDone(issues.get(1)); - final Issue issue2 = setPriority(issues.get(2), new Priority( + final Issue issue = transitionIssueDone(ISSUES.get(1)); + final Issue issue2 = setPriority(ISSUES.get(2), new Priority( null, 4L, "High", null, null, null)); reset(searchRestClient); AtomicBoolean searched = new AtomicBoolean(); when(searchRestClient.searchJql(any(), any(), any(), any())).then(invocation -> { if (!searched.get()) { - issues.add(createIssue(4L)); - issues.remove(1); - issues.add(1, issue); - issues.remove(2); - issues.add(2, issue2); + ISSUES.add(createIssue(4L)); + ISSUES.remove(1); + ISSUES.add(1, issue); + ISSUES.remove(2); + ISSUES.add(2, issue2); searched.set(true); } - SearchResult result = new SearchResult(0, 50, 3, issues); + SearchResult result = new SearchResult(0, 50, 3, ISSUES); return Promises.promise(result); });