Author: davsclaus Date: Sun Nov 25 18:43:55 2012 New Revision: 1413385 URL: http://svn.apache.org/viewvc?rev=1413385&view=rev Log: CAMEL-5822: Added JMX operations to throughput logger.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java - copied, changed from r1413338, camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java - copied, changed from r1413338, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java (from r1413338, camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java&r1=1413338&r2=1413385&rev=1413385&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThroughputLoggerMBean.java Sun Nov 25 18:43:55 2012 @@ -17,19 +17,23 @@ package org.apache.camel.api.management.mbean; import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; -public interface ManagedThrottlerMBean extends ManagedProcessorMBean { +public interface ManagedThroughputLoggerMBean extends ManagedProcessorMBean { - @ManagedAttribute(description = "Maximum requires per period") - long getMaximumRequestsPerPeriod(); + @ManagedAttribute(description = "The received number of messages") + int getReceivedCounter(); - @ManagedAttribute(description = "Maximum requires per period") - void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod); + @ManagedAttribute(description = "The average throughput") + double getAverage(); - @ManagedAttribute(description = "Time period in millis") - long getTimePeriodMillis(); + @ManagedAttribute(description = "The throughput rate") + double getRate(); - @ManagedAttribute(description = "Time period in millis") - void setTimePeriodMillis(long timePeriodMillis); + @ManagedAttribute(description = "The last log message") + String getLastLogMessage(); + + @ManagedOperation(description = "Resets the throughput logger statistics") + void resetThroughputLogger(); } \ No newline at end of file Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java?rev=1413385&r1=1413384&r2=1413385&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java Sun Nov 25 18:43:55 2012 @@ -57,6 +57,10 @@ public class LogEndpoint extends Process setProcessor(this.logger); } + public Processor getLogger() { + return logger; + } + @Override public Producer createProducer() throws Exception { return new LogProducer(this, this.logger); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java?rev=1413385&r1=1413384&r2=1413385&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java Sun Nov 25 18:43:55 2012 @@ -44,4 +44,8 @@ public class LogProducer extends Default } return true; } + + public Processor getLogger() { + return logger; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java?rev=1413385&r1=1413384&r2=1413385&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java Sun Nov 25 18:43:55 2012 @@ -29,6 +29,7 @@ import org.apache.camel.Producer; import org.apache.camel.Route; import org.apache.camel.Service; import org.apache.camel.component.bean.BeanProcessor; +import org.apache.camel.component.log.LogEndpoint; import org.apache.camel.impl.ScheduledPollConsumer; import org.apache.camel.management.mbean.ManagedBeanProcessor; import org.apache.camel.management.mbean.ManagedBrowsableEndpoint; @@ -49,12 +50,14 @@ import org.apache.camel.management.mbean import org.apache.camel.management.mbean.ManagedSuspendableRoute; import org.apache.camel.management.mbean.ManagedThreadPool; import org.apache.camel.management.mbean.ManagedThrottler; +import org.apache.camel.management.mbean.ManagedThroughputLogger; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.Delayer; import org.apache.camel.processor.ErrorHandler; import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.Throttler; +import org.apache.camel.processor.ThroughputLogger; import org.apache.camel.processor.idempotent.IdempotentConsumer; import org.apache.camel.spi.BrowsableEndpoint; import org.apache.camel.spi.EventNotifier; @@ -177,7 +180,19 @@ public class DefaultManagementObjectStra } else if (target instanceof Throttler) { answer = new ManagedThrottler(context, (Throttler) target, definition); } else if (target instanceof SendProcessor) { - answer = new ManagedSendProcessor(context, (SendProcessor) target, definition); + SendProcessor sp = (SendProcessor) target; + // special for sending to throughput logger + if (sp.getDestination() instanceof LogEndpoint) { + LogEndpoint le = (LogEndpoint) sp.getDestination(); + if (le.getLogger() instanceof ThroughputLogger) { + ThroughputLogger tl = (ThroughputLogger) le.getLogger(); + answer = new ManagedThroughputLogger(context, tl, definition); + } + } + // regular send processor + if (answer == null) { + answer = new ManagedSendProcessor(context, (SendProcessor) target, definition); + } } else if (target instanceof BeanProcessor) { answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition); } else if (target instanceof IdempotentConsumer) { Added: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java?rev=1413385&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java Sun Nov 25 18:43:55 2012 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedThroughputLoggerMBean; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.ThroughputLogger; + +/** + * + */ +@ManagedResource(description = "Managed ThroughputLogger") +public class ManagedThroughputLogger extends ManagedProcessor implements ManagedThroughputLoggerMBean { + + private final ThroughputLogger logger; + + public ManagedThroughputLogger(CamelContext context, ThroughputLogger logger, ProcessorDefinition<?> definition) { + super(context, logger, definition); + this.logger = logger; + } + + public ThroughputLogger getLogger() { + return logger; + } + + @Override + public synchronized void reset() { + super.reset(); + logger.reset(); + } + + public int getReceivedCounter() { + return logger.getReceivedCounter(); + } + + public double getAverage() { + return logger.getAverage(); + } + + public double getRate() { + return logger.getRate(); + } + + public String getLastLogMessage() { + return logger.getLastLogMessage(); + } + + public void resetThroughputLogger() { + logger.reset(); + } +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThroughputLogger.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1413385&r1=1413384&r2=1413385&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Sun Nov 25 18:43:55 2012 @@ -51,6 +51,9 @@ public class ThroughputLogger extends Se private CamelContext camelContext; private ScheduledExecutorService logSchedulerService; private CamelLogger log; + private String lastLogMessage; + private double rate; + private double average; public ThroughputLogger(CamelLogger log) { this.log = log; @@ -81,7 +84,8 @@ public class ThroughputLogger extends Se //only process if groupSize is set...otherwise we're in groupInterval mode if (groupSize != null) { if (receivedCount % groupSize == 0) { - log.log(createLogMessage(exchange, receivedCount)); + lastLogMessage = createLogMessage(exchange, receivedCount); + log.log(lastLogMessage); } } } @@ -139,7 +143,33 @@ public class ThroughputLogger extends Se public void setAction(String action) { this.action = action; } - + + public void reset() { + startTime = 0; + receivedCounter.set(0); + groupStartTime = 0; + groupReceivedCount = 0; + average = 0.0d; + rate = 0.0d; + lastLogMessage = null; + } + + public double getRate() { + return rate; + } + + public double getAverage() { + return average; + } + + public int getReceivedCounter() { + return receivedCounter.get(); + } + + public String getLastLogMessage() { + return lastLogMessage; + } + @Override public void doStart() throws Exception { // if an interval was specified, create a background thread @@ -168,8 +198,8 @@ public class ThroughputLogger extends Se groupStartTime = startTime; } - double rate = messagesPerSecond(groupSize, groupStartTime, time); - double average = messagesPerSecond(receivedCount, startTime, time); + rate = messagesPerSecond(groupSize, groupStartTime, time); + average = messagesPerSecond(receivedCount, startTime, time); long duration = time - groupStartTime; groupStartTime = time; @@ -216,16 +246,16 @@ public class ThroughputLogger extends Se long duration = time - groupStartTime; long currentCount = receivedCount - groupReceivedCount; - double rate = messagesPerSecond(currentCount, groupStartTime, time); - double average = messagesPerSecond(receivedCount, startTime, time); + rate = messagesPerSecond(currentCount, groupStartTime, time); + average = messagesPerSecond(receivedCount, startTime, time); groupStartTime = time; groupReceivedCount = receivedCount; - String message = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration + lastLogMessage = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration + " millis which is: " + numberFormat.format(rate) + " messages per second. average: " + numberFormat.format(average); - log.log(message); + log.log(lastLogMessage); } protected double messagesPerSecond(long messageCount, long startTime, long endTime) { Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java (from r1413338, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java&r1=1413338&r2=1413385&rev=1413385&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLoadBalancerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedLogEndpointTest.java Sun Nov 25 18:43:55 2012 @@ -24,33 +24,60 @@ import org.apache.camel.builder.RouteBui /** * @version */ -public class ManagedLoadBalancerTest extends ManagementTestSupport { +public class ManagedLogEndpointTest extends ManagementTestSupport { - public void testLoadBalancer() throws Exception { - getMockEndpoint("mock:a").expectedBodiesReceived("Hello World", "Hi World"); - getMockEndpoint("mock:b").expectedBodiesReceived("Bye World"); - - template.sendBody("direct:start", "Hello World"); - template.sendBody("direct:start", "Bye World"); - template.sendBody("direct:start", "Hi World"); + public void testLogEndpoint() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(10); + + for (int i = 0; i < 10; i++) { + template.sendBody("direct:start", "" + i); + Thread.sleep(100); + } assertMockEndpointsSatisfied(); MBeanServer mbeanServer = getMBeanServer(); - ObjectName name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://a\""); - Long queueSize = (Long) mbeanServer.invoke(name, "queueSize", null, null); - assertEquals(2, queueSize.intValue()); - - name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=endpoints,name=\"mock://b\""); - queueSize = (Long) mbeanServer.invoke(name, "queueSize", null, null); - assertEquals(1, queueSize.intValue()); - - name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"myBalancer\""); + ObjectName name = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"log-foo\""); mbeanServer.isRegistered(name); Long total = (Long) mbeanServer.getAttribute(name, "ExchangesTotal"); - assertEquals(3, total.intValue()); + assertEquals(10, total.intValue()); + + Integer received = (Integer) mbeanServer.getAttribute(name, "ReceivedCounter"); + assertEquals(10, received.intValue()); + + String last = (String) mbeanServer.getAttribute(name, "LastLogMessage"); + assertNotNull(last); + assertTrue(last.startsWith("Received: 10 messages so far.")); + + Double rate = (Double) mbeanServer.getAttribute(name, "Rate"); + assertNotNull(rate); + assertTrue(rate > 0); + + Double average = (Double) mbeanServer.getAttribute(name, "Average"); + assertNotNull(average); + assertTrue(average > 0); + + // reset + mbeanServer.invoke(name, "resetThroughputLogger", null, null); + + // total not reset + total = (Long) mbeanServer.getAttribute(name, "ExchangesTotal"); + assertEquals(10, total.intValue()); + + // but the last log message is + last = (String) mbeanServer.getAttribute(name, "LastLogMessage"); + assertNull(last); + + received = (Integer) mbeanServer.getAttribute(name, "ReceivedCounter"); + assertEquals(0, received.intValue()); + + rate = (Double) mbeanServer.getAttribute(name, "Rate"); + assertEquals(0.0d, rate); + + average = (Double) mbeanServer.getAttribute(name, "Average"); + assertEquals(0.0d, average); } @Override @@ -59,8 +86,8 @@ public class ManagedLoadBalancerTest ext @Override public void configure() throws Exception { from("direct:start").routeId("foo") - .loadBalance().id("myBalancer").roundRobin() - .to("mock:a").to("mock:b"); + .to("log:foo?groupSize=10").id("log-foo") + .to("mock:a"); } }; }