This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5a599f98ef1e75d6ce1124af4858f3044095070f Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Jan 31 10:43:09 2025 +0100 (chores) camel-core: refactor large methods to help inlining --- .../processor/aggregate/AggregateProcessor.java | 89 +++++++++++----------- 1 file changed, 45 insertions(+), 44 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 7ea0c785188..6e8afe0fecf 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -805,25 +805,10 @@ public class AggregateProcessor extends AsyncProcessorSupport Exchange answer; if (fromTimeout && isDiscardOnCompletionTimeout()) { - // this exchange is discarded - discarded.incrementAndGet(); - // discard due timeout - LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); - // must confirm the discarded exchange - aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); - // and remove redelivery state as well - redeliveryState.remove(aggregated.getExchangeId()); - // the completion was from timeout and we should just discard it + discard(key, aggregated); answer = null; } else if (aggregateFailed && isDiscardOnAggregationFailure()) { - // this exchange is discarded - discarded.incrementAndGet(); - // discard due aggregation failed (or by force) - LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); - // must confirm the discarded exchange - aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); - // and remove redelivery state as well - redeliveryState.remove(aggregated.getExchangeId()); + discard(key, aggregated); // the completion was failed during aggregation and we should just discard it answer = null; } else { @@ -834,6 +819,18 @@ public class AggregateProcessor extends AsyncProcessorSupport return answer; } + private void discard(String key, Exchange aggregated) { + // this exchange is discarded + discarded.incrementAndGet(); + // discard due timeout + LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); + // must confirm the discarded exchange + aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); + // and remove redelivery state as well + redeliveryState.remove(aggregated.getExchangeId()); + // the completion was from timeout and we should just discard it + } + private void onSubmitCompletion(final String key, final Exchange exchange) { LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange); @@ -849,33 +846,7 @@ public class AggregateProcessor extends AsyncProcessorSupport if (getStatistics().isStatisticsEnabled()) { totalCompleted.incrementAndGet(); - String completedBy = exchange.getProperty(ExchangePropertyKey.AGGREGATED_COMPLETED_BY, String.class); - switch (completedBy) { - case COMPLETED_BY_INTERVAL: - completedByInterval.incrementAndGet(); - break; - case COMPLETED_BY_TIMEOUT: - completedByTimeout.incrementAndGet(); - break; - case COMPLETED_BY_FORCE: - completedByForce.incrementAndGet(); - break; - case COMPLETED_BY_CONSUMER: - completedByBatchConsumer.incrementAndGet(); - break; - case COMPLETED_BY_PREDICATE: - completedByPredicate.incrementAndGet(); - break; - case COMPLETED_BY_SIZE: - completedBySize.incrementAndGet(); - break; - case COMPLETED_BY_STRATEGY: - completedByStrategy.incrementAndGet(); - break; - default: - LOG.error("Invalid value of {} property: {}", Exchange.AGGREGATED_COMPLETED_BY, exchange); - break; - } + aggregateCompletionCounter(exchange); } LOG.debug("Processing aggregated exchange: {}", exchange); @@ -902,6 +873,36 @@ public class AggregateProcessor extends AsyncProcessorSupport }); } + private void aggregateCompletionCounter(Exchange exchange) { + String completedBy = exchange.getProperty(ExchangePropertyKey.AGGREGATED_COMPLETED_BY, String.class); + switch (completedBy) { + case COMPLETED_BY_INTERVAL: + completedByInterval.incrementAndGet(); + break; + case COMPLETED_BY_TIMEOUT: + completedByTimeout.incrementAndGet(); + break; + case COMPLETED_BY_FORCE: + completedByForce.incrementAndGet(); + break; + case COMPLETED_BY_CONSUMER: + completedByBatchConsumer.incrementAndGet(); + break; + case COMPLETED_BY_PREDICATE: + completedByPredicate.incrementAndGet(); + break; + case COMPLETED_BY_SIZE: + completedBySize.incrementAndGet(); + break; + case COMPLETED_BY_STRATEGY: + completedByStrategy.incrementAndGet(); + break; + default: + LOG.error("Invalid value of {} property: {}", Exchange.AGGREGATED_COMPLETED_BY, exchange); + break; + } + } + /** * Restores the timeout map with timeout values from the aggregation repository. * <p/>