Repository: camel Updated Branches: refs/heads/CAMEL-11342 b28ea66e6 -> 386d57ebc
CAMEL-11340: Optimize JMX performance statistics to not use synchronized block but use atomic counter/values from JDK. This avoid thread contention when having many concurrent threads on the same routes competing to update the JMX counters. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/386d57eb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/386d57eb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/386d57eb Branch: refs/heads/CAMEL-11342 Commit: 386d57ebcbe020a4cb7220a705554e62f96323d7 Parents: b28ea66 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 26 17:48:51 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri May 26 17:48:51 2017 +0200 ---------------------------------------------------------------------- .../camel/management/mbean/ManagedChoice.java | 2 +- .../camel/management/mbean/ManagedCounter.java | 4 +- .../management/mbean/ManagedDynamicRouter.java | 2 +- .../camel/management/mbean/ManagedEnricher.java | 2 +- .../mbean/ManagedFailoverLoadBalancer.java | 2 +- .../camel/management/mbean/ManagedFilter.java | 2 +- .../mbean/ManagedPerformanceCounter.java | 8 +- .../management/mbean/ManagedPollEnricher.java | 2 +- .../management/mbean/ManagedRecipientList.java | 2 +- .../management/mbean/ManagedRoutingSlip.java | 2 +- .../mbean/ManagedSendDynamicProcessor.java | 2 +- .../management/mbean/ManagedSendProcessor.java | 2 +- .../mbean/ManagedThroughputLogger.java | 2 +- .../mbean/ManagedWireTapProcessor.java | 2 +- .../camel/management/mbean/Statistic.java | 101 ++++++++++--------- 15 files changed, 72 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java index e3acb65..fda397c 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedChoice.java @@ -52,7 +52,7 @@ public class ManagedChoice extends ManagedProcessor implements ManagedChoiceMBea } @Override - public synchronized void reset() { + public void reset() { processor.reset(); super.reset(); } http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCounter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCounter.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCounter.java index 84907b9..eeb3591 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCounter.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCounter.java @@ -36,7 +36,7 @@ public abstract class ManagedCounter implements ManagedCounterMBean { resetTimestamp.updateValue(new Date().getTime()); } - public synchronized void reset() { + public void reset() { exchangesTotal.reset(); resetTimestamp.updateValue(new Date().getTime()); } @@ -55,7 +55,7 @@ public abstract class ManagedCounter implements ManagedCounterMBean { return exchangesTotal.getValue(); } - public synchronized void increment() { + public void increment() { exchangesTotal.increment(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java index 21bfdc1..05cb90e 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedDynamicRouter.java @@ -64,7 +64,7 @@ public class ManagedDynamicRouter extends ManagedProcessor implements ManagedDyn } @Override - public synchronized void reset() { + public void reset() { super.reset(); if (processor.getEndpointUtilizationStatistics() != null) { processor.getEndpointUtilizationStatistics().clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java index 657b4f9..b231622 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedEnricher.java @@ -58,7 +58,7 @@ public class ManagedEnricher extends ManagedProcessor implements ManagedEnricher } @Override - public synchronized void reset() { + public void reset() { super.reset(); if (processor.getEndpointUtilizationStatistics() != null) { processor.getEndpointUtilizationStatistics().clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java index 51e305d..a47240b 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFailoverLoadBalancer.java @@ -54,7 +54,7 @@ public class ManagedFailoverLoadBalancer extends ManagedProcessor implements Man } @Override - public synchronized void reset() { + public void reset() { super.reset(); processor.reset(); } http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java index 2d253b6..4b01d79 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedFilter.java @@ -40,7 +40,7 @@ public class ManagedFilter extends ManagedProcessor implements ManagedFilterMBea } @Override - public synchronized void reset() { + public void reset() { processor.reset(); super.reset(); } http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java index f5124a8..6e020c1 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java @@ -77,7 +77,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement } @Override - public synchronized void reset() { + public void reset() { super.reset(); exchangesCompleted.reset(); exchangesFailed.reset(); @@ -193,11 +193,11 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement this.statisticsEnabled = statisticsEnabled; } - public synchronized void processExchange(Exchange exchange) { + public void processExchange(Exchange exchange) { exchangesInflight.increment(); } - public synchronized void completedExchange(Exchange exchange, long time) { + public void completedExchange(Exchange exchange, long time) { increment(); exchangesCompleted.increment(); exchangesInflight.decrement(); @@ -233,7 +233,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement meanProcessingTime.updateValue(mean); } - public synchronized void failedExchange(Exchange exchange) { + public void failedExchange(Exchange exchange) { increment(); exchangesFailed.increment(); exchangesInflight.decrement(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java index ae0b7f1..d884d4a 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPollEnricher.java @@ -58,7 +58,7 @@ public class ManagedPollEnricher extends ManagedProcessor implements ManagedPoll } @Override - public synchronized void reset() { + public void reset() { super.reset(); if (processor.getEndpointUtilizationStatistics() != null) { processor.getEndpointUtilizationStatistics().clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java index 775e318..6ecab67 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRecipientList.java @@ -59,7 +59,7 @@ public class ManagedRecipientList extends ManagedProcessor implements ManagedRec } @Override - public synchronized void reset() { + public void reset() { super.reset(); if (processor.getEndpointUtilizationStatistics() != null) { processor.getEndpointUtilizationStatistics().clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java index bbc7a13..51a2c68 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoutingSlip.java @@ -59,7 +59,7 @@ public class ManagedRoutingSlip extends ManagedProcessor implements ManagedRouti } @Override - public synchronized void reset() { + public void reset() { super.reset(); if (processor.getEndpointUtilizationStatistics() != null) { processor.getEndpointUtilizationStatistics().clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java index deecd72..73f4cf1 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendDynamicProcessor.java @@ -59,7 +59,7 @@ public class ManagedSendDynamicProcessor extends ManagedProcessor implements Man } @Override - public synchronized void reset() { + public void reset() { super.reset(); if (processor.getEndpointUtilizationStatistics() != null) { processor.getEndpointUtilizationStatistics().clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendProcessor.java index b762cea..612abfa 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedSendProcessor.java @@ -61,7 +61,7 @@ public class ManagedSendProcessor extends ManagedProcessor implements ManagedSen } @Override - public synchronized void reset() { + public void reset() { super.reset(); processor.reset(); } http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java index 5f82d0b..b9d4996 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java @@ -40,7 +40,7 @@ public class ManagedThroughputLogger extends ManagedProcessor implements Managed } @Override - public synchronized void reset() { + public void reset() { super.reset(); logger.reset(); } http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java index 55639ed..ceaa0e2 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java @@ -59,7 +59,7 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed } @Override - public synchronized void reset() { + public void reset() { super.reset(); if (processor.getEndpointUtilizationStatistics() != null) { processor.getEndpointUtilizationStatistics().clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/386d57eb/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java index 2f00bf3..92760e9 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,6 +16,9 @@ */ package org.apache.camel.management.mbean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + /** * Default implementation of {@link Statistic} */ @@ -26,9 +29,6 @@ public class Statistic { * <ul> * <li>VALUE - A statistic with this update mode is a simple value that is a straight forward * representation of the updated value.</li> - * <li>DIFFERENCE - A statistic with this update mode is a value that represents the difference - * between the last two recorded values (or the initial value if two updates have - * not been recorded).</li> * <li>DELTA - A statistic with this update mode is a value that represents the delta * between the last two recorded values (or the initial value if two updates have * not been recorded). This value can be negative if the delta goes up or down.</li> @@ -41,13 +41,13 @@ public class Statistic { * <ul> */ public enum UpdateMode { - VALUE, DIFFERENCE, DELTA, COUNTER, MAXIMUM, MINIMUM + VALUE, DELTA, COUNTER, MAXIMUM, MINIMUM } private final UpdateMode updateMode; - private long lastValue; - private long value; - private long updateCount; + private final AtomicLong value = new AtomicLong(); + private final AtomicLong lastValue; + private final LongAdder updateCount = new LongAdder(); /** * Instantiates a new statistic. @@ -58,77 +58,84 @@ public class Statistic { */ public Statistic(String name, Object owner, UpdateMode updateMode) { this.updateMode = updateMode; + if (UpdateMode.DELTA == updateMode) { + this.lastValue = new AtomicLong(); + } else { + this.lastValue = null; + } } - public synchronized void updateValue(long newValue) { - switch (this.updateMode) { + public void updateValue(long newValue) { + switch (updateMode) { case COUNTER: - this.value += newValue; + value.addAndGet(newValue); break; case VALUE: - this.value = newValue; - break; - case DIFFERENCE: - this.value -= newValue; - if (this.value < 0) { - this.value = -this.value; - } + value.set(newValue); break; case DELTA: - if (updateCount > 0) { - this.lastValue = this.value; + if (updateCount.longValue() > 0) { + // remember previous value before updating it + lastValue.set(value.longValue()); } - this.value = newValue; + value.set(newValue); break; case MAXIMUM: - // initialize value at first time - if (this.updateCount == 0 || this.value < newValue) { - this.value = newValue; - } + value.updateAndGet(value -> { + if (updateCount.longValue() == 0 || value < newValue) { + return newValue; + } else { + return value; + } + }); break; case MINIMUM: - // initialize value at first time - if (this.updateCount == 0 || this.value > newValue) { - this.value = newValue; - } + value.updateAndGet(value -> { + if (updateCount.longValue() == 0 || value > newValue) { + return newValue; + } else { + return value; + } + }); break; default: } - this.updateCount++; + updateCount.add(1); } - public synchronized void increment() { + public void increment() { updateValue(1); } - public synchronized void decrement() { + public void decrement() { updateValue(-1); } - public synchronized long getValue() { + public long getValue() { if (updateMode == UpdateMode.DELTA) { - if (updateCount == 0) { - return this.value; + if (updateCount.longValue() == 0) { + return value.get(); } else { - return this.value - this.lastValue; + return value.get() - lastValue.get(); } - } else { - return this.value; } + return value.get(); } - public synchronized long getUpdateCount() { - return this.updateCount; + public long getUpdateCount() { + return updateCount.longValue(); } - public synchronized void reset() { - this.value = 0; - this.lastValue = 0; - this.updateCount = 0; + public void reset() { + value.set(0); + if (lastValue != null) { + lastValue.set(0); + } + updateCount.reset(); } public String toString() { - return "" + value; + return "" + value.get(); } }