This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 3a49782a95e [CAMEL-20139]: fix for incorrect correlation key in first aggregated message (#12121) 3a49782a95e is described below commit 3a49782a95e560ca99d503ff9f372d6e0623adbb Author: dinurp <din...@yahoo.com> AuthorDate: Wed Nov 22 12:11:27 2023 +0400 [CAMEL-20139]: fix for incorrect correlation key in first aggregated message (#12121) * [CAMEL-20139]: fix for incorrect correlation key in first aggregated message * [CAMEL-20139]: remove SNIPPET tags --- .../processor/aggregate/AggregateProcessor.java | 5 +- .../AggregateCompletionByBatchConsumerTest.java | 113 +++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index f5bfb2ebeeb..cf3a29a31ff 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -619,16 +619,19 @@ public class AggregateProcessor extends AsyncProcessorSupport if (COMPLETED_BY_CONSUMER.equals(complete)) { for (String batchKey : batchConsumerCorrelationKeys) { Exchange batchAnswer; + Exchange batchOriginalExchange; if (batchKey.equals(key)) { // skip the current aggregated key as we have already aggregated it and have the answer batchAnswer = answer; + batchOriginalExchange = originalExchange; } else { batchAnswer = aggregationRepository.get(camelContext, batchKey); + batchOriginalExchange = batchAnswer; } if (batchAnswer != null) { batchAnswer.setProperty(ExchangePropertyKey.AGGREGATED_COMPLETED_BY, complete); - onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed); + onCompletion(batchKey, batchOriginalExchange, batchAnswer, false, aggregateFailed); list.add(batchAnswer); } } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionByBatchConsumerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionByBatchConsumerTest.java new file mode 100644 index 00000000000..2233e902231 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionByBatchConsumerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class AggregateCompletionByBatchConsumerTest extends ContextTestSupport { + + @SuppressWarnings("unchecked") + @Test + public void testCorrelationKey() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + + // we expect 4 messages since we group 4 batches + result.expectedMessageCount(4); + + //BATCH_SIZE and not BATCH_COMPLETE is used by aggregate to test for batch completion + final Integer batch_size = Integer.valueOf(8); + + // then we sent the batch of message + template.sendBodyAndProperty("direct:start", "batch-4", Exchange.BATCH_SIZE, batch_size); + template.sendBodyAndProperty("direct:start", "batch-4", Exchange.BATCH_SIZE, batch_size); + template.sendBodyAndProperty("direct:start", "batch-3", Exchange.BATCH_SIZE, batch_size); + template.sendBodyAndProperty("direct:start", "batch-3", Exchange.BATCH_SIZE, batch_size); + template.sendBodyAndProperty("direct:start", "batch-2", Exchange.BATCH_SIZE, batch_size); + template.sendBodyAndProperty("direct:start", "batch-2", Exchange.BATCH_SIZE, batch_size); + template.sendBodyAndProperty("direct:start", "batch-1", Exchange.BATCH_SIZE, batch_size); + template.sendBodyAndProperty("direct:start", "batch-1", Exchange.BATCH_SIZE, batch_size); + + assertMockEndpointsSatisfied(); + + Exchange out; + List<Message> grouped; + + out = result.getExchanges().get(1); + grouped = out.getIn().getBody(List.class); + + assertEquals(2, grouped.size()); + + assertEquals("batch-2", grouped.get(0).getBody(String.class)); + assertEquals("batch-2", grouped.get(1).getBody(String.class)); + assertEquals("batch-2", out.getProperty(Exchange.AGGREGATED_CORRELATION_KEY)); + + out = result.getExchanges().get(2); + grouped = out.getIn().getBody(List.class); + + assertEquals(2, grouped.size()); + + assertEquals("batch-3", grouped.get(0).getBody(String.class)); + assertEquals("batch-3", grouped.get(1).getBody(String.class)); + assertEquals("batch-3", out.getProperty(Exchange.AGGREGATED_CORRELATION_KEY)); + + out = result.getExchanges().get(3); + grouped = out.getIn().getBody(List.class); + + assertEquals(2, grouped.size()); + + assertEquals("batch-4", grouped.get(0).getBody(String.class)); + assertEquals("batch-4", grouped.get(1).getBody(String.class)); + assertEquals("batch-4", out.getProperty(Exchange.AGGREGATED_CORRELATION_KEY)); + + out = result.getExchanges().get(0); + grouped = out.getIn().getBody(List.class); + + assertEquals(2, grouped.size()); + + assertEquals("batch-1", grouped.get(0).getBody(String.class)); + assertEquals("batch-1", grouped.get(1).getBody(String.class)); + assertEquals("batch-1", out.getProperty(Exchange.AGGREGATED_CORRELATION_KEY)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // our route is aggregating from the direct queue and sending + // the response to the mock + from("direct:start") + // aggregate all using body and group the + // exchanges so we get one single exchange containing all + .aggregate(body(), new GroupedMessageAggregationStrategy()) + // we are simulating a batch consumer + .completionFromBatchConsumer() + .eagerCheckCompletion() + .to("mock:result"); + } + }; + } +}