Author: davsclaus Date: Wed Jan 5 07:49:55 2011 New Revision: 1055340 URL: http://svn.apache.org/viewvc?rev=1055340&view=rev Log: CAMEL-1902: Throughput logger can now also log using a scheduled interval. Thanks to Ben ODay for patch.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java?rev=1055340&r1=1055339&r2=1055340&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java Wed Jan 5 07:49:55 2011 @@ -36,10 +36,15 @@ public class LogComponent extends Defaul protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { LoggingLevel level = getLoggingLevel(parameters); Integer groupSize = getAndRemoveParameter(parameters, "groupSize", Integer.class); + Long groupInterval = getAndRemoveParameter(parameters, "groupInterval", Long.class); Logger logger; if (groupSize != null) { logger = new ThroughputLogger(remaining, level, groupSize); + } else if (groupInterval != null) { + Boolean groupActiveOnly = getAndRemoveParameter(parameters, "groupActiveOnly", Boolean.class, Boolean.TRUE); + Long groupDelay = getAndRemoveParameter(parameters, "groupDelay", Long.class); + logger = new ThroughputLogger(this.getCamelContext(), remaining, level, groupInterval, groupDelay, groupActiveOnly); } else { LogFormatter formatter = new LogFormatter(); IntrospectionSupport.setProperties(formatter, parameters); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java?rev=1055340&r1=1055339&r2=1055340&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java Wed Jan 5 07:49:55 2011 @@ -19,6 +19,7 @@ package org.apache.camel.component.log; import org.apache.camel.Component; import org.apache.camel.impl.ProcessorEndpoint; import org.apache.camel.processor.Logger; +import org.apache.camel.util.ServiceHelper; /** * Log endpoint. @@ -43,6 +44,16 @@ public class LogEndpoint extends Process return logger; } + @Override + public void start() throws Exception { + ServiceHelper.startService(logger); + } + + @Override + public void stop() throws Exception { + ServiceHelper.stopService(logger); + } + public void setLogger(Logger logger) { this.logger = logger; // the logger is the processor Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java?rev=1055340&r1=1055339&r2=1055340&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java Wed Jan 5 07:49:55 2011 @@ -20,6 +20,7 @@ import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultExchangeFormatter; +import org.apache.camel.impl.ServiceSupport; import org.apache.camel.spi.ExchangeFormatter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,7 +31,7 @@ import org.apache.commons.logging.LogFac * * @version $Revision$ */ -public class Logger implements Processor { +public class Logger extends ServiceSupport implements Processor { private Log log; private LoggingLevel level; private ExchangeFormatter formatter = DefaultExchangeFormatter.getInstance(); @@ -306,4 +307,14 @@ public class Logger implements Processor public void setLogName(String logName) { this.log = LogFactory.getLog(logName); } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1055340&r1=1055339&r2=1055340&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java Wed Jan 5 07:49:55 2011 @@ -17,25 +17,38 @@ package org.apache.camel.processor; import java.text.NumberFormat; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; +import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A logger for logging message throughput. - * + * * @version $Revision$ */ public class ThroughputLogger extends Logger { - private int groupSize = 100; - private long startTime; - private long groupStartTime; + private static final Log LOG = LogFactory.getLog(ThroughputLogger.class); + private final AtomicInteger receivedCounter = new AtomicInteger(); private NumberFormat numberFormat = NumberFormat.getNumberInstance(); + private long groupReceivedCount; + private boolean groupActiveOnly; + private Integer groupSize; + private long groupDelay = 1000; + private Long groupInterval; + private long startTime; + private long groupStartTime; private String action = "Received"; private String logMessage; + private CamelContext camelContext; + private ScheduledExecutorService logSchedulerService; public ThroughputLogger() { } @@ -56,11 +69,28 @@ public class ThroughputLogger extends Lo super(logName, level); } - public ThroughputLogger(String logName, LoggingLevel level, int groupSize) { + public ThroughputLogger(String logName, LoggingLevel level, Integer groupSize) { super(logName, level); setGroupSize(groupSize); } + public ThroughputLogger(CamelContext camelContext, String logName, LoggingLevel level, + Long groupInterval, Long groupDelay, Boolean groupActiveOnly) { + super(logName, level); + + //initialize the startTime (since no messages may be received before a timer log event) + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + + this.camelContext = camelContext; + setGroupInterval(groupInterval); + setGroupActiveOnly(groupActiveOnly); + if (groupDelay != null) { + setGroupDelay(groupDelay); + } + } + public ThroughputLogger(String logName, int groupSize) { super(logName); setGroupSize(groupSize); @@ -76,23 +106,54 @@ public class ThroughputLogger extends Lo startTime = System.currentTimeMillis(); } int receivedCount = receivedCounter.incrementAndGet(); - if (receivedCount % groupSize == 0) { - logMessage = createLogMessage(exchange, receivedCount); - super.process(exchange); + + //only process if groupSize is set...otherwise we're in groupInterval mode + if (groupSize != null) { + if (receivedCount % groupSize == 0) { + logMessage = createLogMessage(exchange, receivedCount); + super.process(exchange); + } } } - public int getGroupSize() { + public Integer getGroupSize() { return groupSize; } - public void setGroupSize(int groupSize) { - if (groupSize == 0) { - throw new IllegalArgumentException("groupSize cannot be zero!"); + public void setGroupSize(Integer groupSize) { + if (groupSize == null || groupSize <= 0) { + throw new IllegalArgumentException("groupSize must be positive, was: " + groupSize); } this.groupSize = groupSize; } + public Long getGroupInterval() { + return groupInterval; + } + + public void setGroupInterval(Long groupInterval) { + if (groupInterval == null || groupInterval <= 0) { + throw new IllegalArgumentException("groupInterval must be positive, was: " + groupInterval); + } + this.groupInterval = groupInterval; + } + + public long getGroupDelay() { + return groupDelay; + } + + public void setGroupDelay(long groupDelay) { + this.groupDelay = groupDelay; + } + + public boolean getGroupActiveOnly() { + return groupActiveOnly; + } + + private void setGroupActiveOnly(boolean groupActiveOnly) { + this.groupActiveOnly = groupActiveOnly; + } + public NumberFormat getNumberFormat() { return numberFormat; } @@ -114,6 +175,27 @@ public class ThroughputLogger extends Lo return logMessage; } + @Override + public void start() throws Exception { + // if an interval was specified, create a background thread + if (groupInterval != null) { + ObjectHelper.notNull(camelContext, "CamelContext", this); + + logSchedulerService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "ThroughputLogger", 1); + Runnable scheduledLogTask = new ScheduledLogTask(); + LOG.info("scheduling throughput log to run every " + groupInterval + " millis."); + // must use fixed rate to have it trigger at every X interval + logSchedulerService.scheduleAtFixedRate(scheduledLogTask, groupDelay, groupInterval, TimeUnit.MILLISECONDS); + } + } + + @Override + public void stop() throws Exception { + if (logSchedulerService != null) { + camelContext.getExecutorServiceStrategy().shutdownNow(logSchedulerService); + } + } + protected String createLogMessage(Exchange exchange, int receivedCount) { long time = System.currentTimeMillis(); if (groupStartTime == 0) { @@ -131,11 +213,59 @@ public class ThroughputLogger extends Lo + " messages per second. average: " + numberFormat.format(average); } - // timeOneMessage = elapsed / messageCount - // messagePerSend = 1000 / timeOneMessage + /** + * Background task that logs throughput stats. + */ + private final class ScheduledLogTask implements Runnable { + + public void run() { + // only run if CamelContext has been fully started + if (!camelContext.getStatus().isStarted()) { + if (LOG.isTraceEnabled()) { + LOG.trace("ThroughputLogger cannot start because CamelContext(" + camelContext.getName() + ") has not been started yet"); + } + return; + } + + LOG.trace("ThroughputLogger started"); + createGroupIntervalLogMessage(); + LOG.trace("ThroughputLogger complete"); + } + } + + protected void createGroupIntervalLogMessage() { + int receivedCount = receivedCounter.get(); + + // if configured, hide log messages when no new messages have been received + if (groupActiveOnly && receivedCount == groupReceivedCount) { + return; + } + + long time = System.currentTimeMillis(); + if (groupStartTime == 0) { + groupStartTime = startTime; + } + + long duration = time - groupStartTime; + long currentCount = receivedCount - groupReceivedCount; + double rate = messagesPerSecond(currentCount, groupStartTime, time); + double average = messagesPerSecond(receivedCount, startTime, time); + + groupStartTime = time; + groupReceivedCount = receivedCount; + + String message = getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration + + " millis which is: " + numberFormat.format(rate) + + " messages per second. average: " + numberFormat.format(average); + log(message); + } + protected double messagesPerSecond(long messageCount, long startTime, long endTime) { + // timeOneMessage = elapsed / messageCount + // messagePerSend = 1000 / timeOneMessage double rate = messageCount * 1000.0; rate /= endTime - startTime; return rate; } + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java?rev=1055340&r1=1055339&r2=1055340&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java Wed Jan 5 07:49:55 2011 @@ -25,7 +25,19 @@ import org.apache.camel.component.mock.M */ public class ThroughputLoggerTest extends ContextTestSupport { - public void testSendMessageToLog() throws Exception { + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testSendMessageToLogUsingGroupSize() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() { + from("seda:in").to("log:hello?groupSize=2").delay(100).to("mock:result"); + } + }); + context.start(); + MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(4); @@ -37,13 +49,21 @@ public class ThroughputLoggerTest extend assertMockEndpointsSatisfied(); } - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("seda:in").to("log:hello?groupSize=2").delay(100).to("mock:result"); + public void testSendMessageToLogUsingGroupInterval() throws Exception { + context.addRoutes(new RouteBuilder() { + public void configure() { + from("seda:in").to("log:hello?groupInterval=200&groupDelay=100&groupActiveOnly=false").delay(50).to("mock:result"); } - }; + }); + context.start(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(20); + + for (int i = 0; i < 20; i++) { + template.sendBody("seda:in", "Hello World"); + } + + assertMockEndpointsSatisfied(); } } \ No newline at end of file