This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4fdd73f592d496789afb95b767703dca741ef4dd Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Aug 6 11:21:16 2019 +0200 CAMEL-10533: AggregateController - Add forceDiscardOfGroup method --- .../camel/processor/aggregate/AggregateProcessor.java | 13 ++++++++++++- .../processor/aggregate/AggregateProcessorStatistics.java | 5 +++++ .../management/mbean/ManagedAggregateProcessorMBean.java | 3 +++ .../camel/management/mbean/ManagedAggregateProcessor.java | 5 +++++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index edcd71c..ad0f96d 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -133,6 +133,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat private final AtomicLong completedByPredicate = new AtomicLong(); private final AtomicLong completedByBatchConsumer = new AtomicLong(); private final AtomicLong completedByForce = new AtomicLong(); + private final AtomicLong discarded = new AtomicLong(); // keep booking about redelivery private class RedeliveryData { @@ -189,6 +190,11 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } @Override + public long getDiscarded() { + return discarded.get(); + } + + @Override public void reset() { totalIn.set(0); totalCompleted.set(0); @@ -198,6 +204,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat completedByPredicate.set(0); completedByBatchConsumer.set(0); completedByForce.set(0); + discarded.set(0); } @Override @@ -733,6 +740,8 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat Exchange answer; if (fromTimeout && isDiscardOnCompletionTimeout()) { + // this exchange is discarded + discarded.incrementAndGet(); // discard due timeout log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); // must confirm the discarded exchange @@ -742,7 +751,9 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // the completion was from timeout and we should just discard it answer = null; } else if (aggregateFailed && isDiscardOnAggregationFailure()) { - // discard due aggregation failed + // this exchange is discarded + discarded.incrementAndGet(); + // discard due aggregation failed (or by force) log.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); // must confirm the discarded exchange aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java index 9cb3d50..c082613 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessorStatistics.java @@ -67,6 +67,11 @@ public interface AggregateProcessorStatistics { long getCompletedByForce(); /** + * Total number of exchanged discarded + */ + long getDiscarded(); + + /** * Reset the counters */ void reset(); diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java index fabf1d3..1db37b2 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java @@ -138,6 +138,9 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean { @ManagedAttribute(description = "Total number of exchanged completed by completion force trigger") long getCompletedByForce(); + @ManagedAttribute(description = "Total number of exchanged discarded") + long getDiscarded(); + @ManagedOperation(description = " Reset the statistics counters") void resetStatistics(); diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java index e509319..468c00e 100644 --- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java +++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java @@ -298,6 +298,11 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag } @Override + public long getDiscarded() { + return processor.getStatistics().getDiscarded(); + } + + @Override public void resetStatistics() { processor.getStatistics().reset(); }