This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5a599f98ef1e75d6ce1124af4858f3044095070f
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Fri Jan 31 10:43:09 2025 +0100

    (chores) camel-core: refactor large methods to help inlining
---
 .../processor/aggregate/AggregateProcessor.java    | 89 +++++++++++-----------
 1 file changed, 45 insertions(+), 44 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 7ea0c785188..6e8afe0fecf 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -805,25 +805,10 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
 
         Exchange answer;
         if (fromTimeout && isDiscardOnCompletionTimeout()) {
-            // this exchange is discarded
-            discarded.incrementAndGet();
-            // discard due timeout
-            LOG.debug("Aggregation for correlation key {} discarding 
aggregated exchange: {}", key, aggregated);
-            // must confirm the discarded exchange
-            aggregationRepository.confirm(aggregated.getContext(), 
aggregated.getExchangeId());
-            // and remove redelivery state as well
-            redeliveryState.remove(aggregated.getExchangeId());
-            // the completion was from timeout and we should just discard it
+            discard(key, aggregated);
             answer = null;
         } else if (aggregateFailed && isDiscardOnAggregationFailure()) {
-            // this exchange is discarded
-            discarded.incrementAndGet();
-            // discard due aggregation failed (or by force)
-            LOG.debug("Aggregation for correlation key {} discarding 
aggregated exchange: {}", key, aggregated);
-            // must confirm the discarded exchange
-            aggregationRepository.confirm(aggregated.getContext(), 
aggregated.getExchangeId());
-            // and remove redelivery state as well
-            redeliveryState.remove(aggregated.getExchangeId());
+            discard(key, aggregated);
             // the completion was failed during aggregation and we should just 
discard it
             answer = null;
         } else {
@@ -834,6 +819,18 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         return answer;
     }
 
+    private void discard(String key, Exchange aggregated) {
+        // this exchange is discarded
+        discarded.incrementAndGet();
+        // discard due timeout
+        LOG.debug("Aggregation for correlation key {} discarding aggregated 
exchange: {}", key, aggregated);
+        // must confirm the discarded exchange
+        aggregationRepository.confirm(aggregated.getContext(), 
aggregated.getExchangeId());
+        // and remove redelivery state as well
+        redeliveryState.remove(aggregated.getExchangeId());
+        // the completion was from timeout and we should just discard it
+    }
+
     private void onSubmitCompletion(final String key, final Exchange exchange) 
{
         LOG.debug("Aggregation complete for correlation key {} sending 
aggregated exchange: {}", key, exchange);
 
@@ -849,33 +846,7 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         if (getStatistics().isStatisticsEnabled()) {
             totalCompleted.incrementAndGet();
 
-            String completedBy = 
exchange.getProperty(ExchangePropertyKey.AGGREGATED_COMPLETED_BY, String.class);
-            switch (completedBy) {
-                case COMPLETED_BY_INTERVAL:
-                    completedByInterval.incrementAndGet();
-                    break;
-                case COMPLETED_BY_TIMEOUT:
-                    completedByTimeout.incrementAndGet();
-                    break;
-                case COMPLETED_BY_FORCE:
-                    completedByForce.incrementAndGet();
-                    break;
-                case COMPLETED_BY_CONSUMER:
-                    completedByBatchConsumer.incrementAndGet();
-                    break;
-                case COMPLETED_BY_PREDICATE:
-                    completedByPredicate.incrementAndGet();
-                    break;
-                case COMPLETED_BY_SIZE:
-                    completedBySize.incrementAndGet();
-                    break;
-                case COMPLETED_BY_STRATEGY:
-                    completedByStrategy.incrementAndGet();
-                    break;
-                default:
-                    LOG.error("Invalid value of {} property: {}", 
Exchange.AGGREGATED_COMPLETED_BY, exchange);
-                    break;
-            }
+            aggregateCompletionCounter(exchange);
         }
 
         LOG.debug("Processing aggregated exchange: {}", exchange);
@@ -902,6 +873,36 @@ public class AggregateProcessor extends 
AsyncProcessorSupport
         });
     }
 
+    private void aggregateCompletionCounter(Exchange exchange) {
+        String completedBy = 
exchange.getProperty(ExchangePropertyKey.AGGREGATED_COMPLETED_BY, String.class);
+        switch (completedBy) {
+            case COMPLETED_BY_INTERVAL:
+                completedByInterval.incrementAndGet();
+                break;
+            case COMPLETED_BY_TIMEOUT:
+                completedByTimeout.incrementAndGet();
+                break;
+            case COMPLETED_BY_FORCE:
+                completedByForce.incrementAndGet();
+                break;
+            case COMPLETED_BY_CONSUMER:
+                completedByBatchConsumer.incrementAndGet();
+                break;
+            case COMPLETED_BY_PREDICATE:
+                completedByPredicate.incrementAndGet();
+                break;
+            case COMPLETED_BY_SIZE:
+                completedBySize.incrementAndGet();
+                break;
+            case COMPLETED_BY_STRATEGY:
+                completedByStrategy.incrementAndGet();
+                break;
+            default:
+                LOG.error("Invalid value of {} property: {}", 
Exchange.AGGREGATED_COMPLETED_BY, exchange);
+                break;
+        }
+    }
+
     /**
      * Restores the timeout map with timeout values from the aggregation 
repository.
      * <p/>

Reply via email to