Author: davsclaus
Date: Wed Jan  5 07:49:55 2011
New Revision: 1055340

URL: http://svn.apache.org/viewvc?rev=1055340&view=rev
Log:
CAMEL-1902: Throughput logger can now also log using a scheduled interval. 
Thanks to Ben ODay for patch.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java?rev=1055340&r1=1055339&r2=1055340&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogComponent.java
 Wed Jan  5 07:49:55 2011
@@ -36,10 +36,15 @@ public class LogComponent extends Defaul
     protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
         LoggingLevel level = getLoggingLevel(parameters);
         Integer groupSize = getAndRemoveParameter(parameters, "groupSize", 
Integer.class);
+        Long groupInterval = getAndRemoveParameter(parameters, 
"groupInterval", Long.class);
 
         Logger logger;
         if (groupSize != null) {
             logger = new ThroughputLogger(remaining, level, groupSize);
+        } else if (groupInterval != null) {
+            Boolean groupActiveOnly = getAndRemoveParameter(parameters, 
"groupActiveOnly", Boolean.class, Boolean.TRUE);
+            Long groupDelay = getAndRemoveParameter(parameters, "groupDelay", 
Long.class);
+            logger = new ThroughputLogger(this.getCamelContext(), remaining, 
level, groupInterval, groupDelay, groupActiveOnly);
         } else {
             LogFormatter formatter = new LogFormatter();
             IntrospectionSupport.setProperties(formatter, parameters);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java?rev=1055340&r1=1055339&r2=1055340&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
 Wed Jan  5 07:49:55 2011
@@ -19,6 +19,7 @@ package org.apache.camel.component.log;
 import org.apache.camel.Component;
 import org.apache.camel.impl.ProcessorEndpoint;
 import org.apache.camel.processor.Logger;
+import org.apache.camel.util.ServiceHelper;
 
 /**
  * Log endpoint.
@@ -43,6 +44,16 @@ public class LogEndpoint extends Process
         return logger;
     }
 
+    @Override
+    public void start() throws Exception {
+        ServiceHelper.startService(logger);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        ServiceHelper.stopService(logger);
+    }
+    
     public void setLogger(Logger logger) {
         this.logger = logger;
         // the logger is the processor

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java?rev=1055340&r1=1055339&r2=1055340&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Logger.java 
Wed Jan  5 07:49:55 2011
@@ -20,6 +20,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultExchangeFormatter;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,7 +31,7 @@ import org.apache.commons.logging.LogFac
  *
  * @version $Revision$
  */
-public class Logger implements Processor {
+public class Logger extends ServiceSupport implements Processor {
     private Log log;
     private LoggingLevel level;
     private ExchangeFormatter formatter = 
DefaultExchangeFormatter.getInstance();
@@ -306,4 +307,14 @@ public class Logger implements Processor
     public void setLogName(String logName) {
         this.log = LogFactory.getLog(logName);
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java?rev=1055340&r1=1055339&r2=1055340&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThroughputLogger.java
 Wed Jan  5 07:49:55 2011
@@ -17,25 +17,38 @@
 package org.apache.camel.processor;
 
 import java.text.NumberFormat;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A logger for logging message throughput.
- *  
+ *
  * @version $Revision$
  */
 public class ThroughputLogger extends Logger {
-    private int groupSize = 100;
-    private long startTime;
-    private long groupStartTime;
+    private static final Log LOG = LogFactory.getLog(ThroughputLogger.class);
+
     private final AtomicInteger receivedCounter = new AtomicInteger();
     private NumberFormat numberFormat = NumberFormat.getNumberInstance();
+    private long groupReceivedCount;
+    private boolean groupActiveOnly;
+    private Integer groupSize;
+    private long groupDelay = 1000;
+    private Long groupInterval;
+    private long startTime;
+    private long groupStartTime;
     private String action = "Received";
     private String logMessage;
+    private CamelContext camelContext;
+    private ScheduledExecutorService logSchedulerService;
 
     public ThroughputLogger() {
     }
@@ -56,11 +69,28 @@ public class ThroughputLogger extends Lo
         super(logName, level);
     }
 
-    public ThroughputLogger(String logName, LoggingLevel level, int groupSize) 
{
+    public ThroughputLogger(String logName, LoggingLevel level, Integer 
groupSize) {
         super(logName, level);
         setGroupSize(groupSize);
     }
 
+    public ThroughputLogger(CamelContext camelContext, String logName, 
LoggingLevel level,
+                            Long groupInterval, Long groupDelay, Boolean 
groupActiveOnly) {
+        super(logName, level);
+
+        //initialize the startTime (since no messages may be received before a 
timer log event)
+        if (startTime == 0) {
+            startTime = System.currentTimeMillis();
+        }
+
+        this.camelContext = camelContext;
+        setGroupInterval(groupInterval);
+        setGroupActiveOnly(groupActiveOnly);
+        if (groupDelay != null) {
+            setGroupDelay(groupDelay);
+        }
+    }
+
     public ThroughputLogger(String logName, int groupSize) {
         super(logName);
         setGroupSize(groupSize);
@@ -76,23 +106,54 @@ public class ThroughputLogger extends Lo
             startTime = System.currentTimeMillis();
         }
         int receivedCount = receivedCounter.incrementAndGet();
-        if (receivedCount % groupSize == 0) {
-            logMessage = createLogMessage(exchange, receivedCount);
-            super.process(exchange);
+
+        //only process if groupSize is set...otherwise we're in groupInterval 
mode
+        if (groupSize != null) {
+            if (receivedCount % groupSize == 0) {
+                logMessage = createLogMessage(exchange, receivedCount);
+                super.process(exchange);
+            }
         }
     }
 
-    public int getGroupSize() {
+    public Integer getGroupSize() {
         return groupSize;
     }
 
-    public void setGroupSize(int groupSize) {
-        if (groupSize == 0) {
-            throw new IllegalArgumentException("groupSize cannot be zero!");
+    public void setGroupSize(Integer groupSize) {
+        if (groupSize == null || groupSize <= 0) {
+            throw new IllegalArgumentException("groupSize must be positive, 
was: " + groupSize);
         }
         this.groupSize = groupSize;
     }
 
+    public Long getGroupInterval() {
+        return groupInterval;
+    }
+
+    public void setGroupInterval(Long groupInterval) {
+        if (groupInterval == null || groupInterval <= 0) {
+            throw new IllegalArgumentException("groupInterval must be 
positive, was: " + groupInterval);
+        }
+        this.groupInterval = groupInterval;
+    }
+
+    public long getGroupDelay() {
+        return groupDelay;
+    }
+
+    public void setGroupDelay(long groupDelay) {
+        this.groupDelay = groupDelay;
+    }
+
+    public boolean getGroupActiveOnly() {
+        return groupActiveOnly;
+    }
+
+    private void setGroupActiveOnly(boolean groupActiveOnly) {
+        this.groupActiveOnly = groupActiveOnly;
+    }
+
     public NumberFormat getNumberFormat() {
         return numberFormat;
     }
@@ -114,6 +175,27 @@ public class ThroughputLogger extends Lo
         return logMessage;
     }
 
+    @Override
+    public void start() throws Exception {
+        // if an interval was specified, create a background thread
+        if (groupInterval != null) {
+            ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+            logSchedulerService = 
camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, 
"ThroughputLogger", 1);
+            Runnable scheduledLogTask = new ScheduledLogTask();
+            LOG.info("scheduling throughput log to run every " + groupInterval 
+ " millis.");
+            // must use fixed rate to have it trigger at every X interval
+            logSchedulerService.scheduleAtFixedRate(scheduledLogTask, 
groupDelay, groupInterval, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (logSchedulerService != null) {
+            
camelContext.getExecutorServiceStrategy().shutdownNow(logSchedulerService);
+        }
+    }
+
     protected String createLogMessage(Exchange exchange, int receivedCount) {
         long time = System.currentTimeMillis();
         if (groupStartTime == 0) {
@@ -131,11 +213,59 @@ public class ThroughputLogger extends Lo
                 + " messages per second. average: " + 
numberFormat.format(average);
     }
 
-    // timeOneMessage = elapsed / messageCount
-    // messagePerSend = 1000 / timeOneMessage
+    /**
+     * Background task that logs throughput stats.
+     */
+    private final class ScheduledLogTask implements Runnable {
+
+        public void run() {
+            // only run if CamelContext has been fully started
+            if (!camelContext.getStatus().isStarted()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("ThroughputLogger cannot start because 
CamelContext(" + camelContext.getName() + ") has not been started yet");
+                }
+                return;
+            }
+
+            LOG.trace("ThroughputLogger started");
+            createGroupIntervalLogMessage();
+            LOG.trace("ThroughputLogger complete");
+        }
+    }
+
+    protected void createGroupIntervalLogMessage() {
+        int receivedCount = receivedCounter.get();
+
+        // if configured, hide log messages when no new messages have been 
received
+        if (groupActiveOnly && receivedCount == groupReceivedCount) {
+            return;
+        }
+
+        long time = System.currentTimeMillis();
+        if (groupStartTime == 0) {
+            groupStartTime = startTime;
+        }
+
+        long duration = time - groupStartTime;
+        long currentCount = receivedCount - groupReceivedCount;
+        double rate = messagesPerSecond(currentCount, groupStartTime, time);
+        double average = messagesPerSecond(receivedCount, startTime, time);
+
+        groupStartTime = time;
+        groupReceivedCount = receivedCount;
+
+        String message = getAction() + ": " + receivedCount + " messages so 
far. Last group took: " + duration
+                + " millis which is: " + numberFormat.format(rate)
+                + " messages per second. average: " + 
numberFormat.format(average);
+        log(message);
+    }
+
     protected double messagesPerSecond(long messageCount, long startTime, long 
endTime) {
+        // timeOneMessage = elapsed / messageCount
+        // messagePerSend = 1000 / timeOneMessage
         double rate = messageCount * 1000.0;
         rate /= endTime - startTime;
         return rate;
     }
+
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java?rev=1055340&r1=1055339&r2=1055340&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/log/ThroughputLoggerTest.java
 Wed Jan  5 07:49:55 2011
@@ -25,7 +25,19 @@ import org.apache.camel.component.mock.M
  */
 public class ThroughputLoggerTest extends ContextTestSupport {
 
-    public void testSendMessageToLog() throws Exception {
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testSendMessageToLogUsingGroupSize() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                
from("seda:in").to("log:hello?groupSize=2").delay(100).to("mock:result");
+            }
+        });
+        context.start();
+        
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(4);
 
@@ -37,13 +49,21 @@ public class ThroughputLoggerTest extend
         assertMockEndpointsSatisfied();
     }
 
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                
from("seda:in").to("log:hello?groupSize=2").delay(100).to("mock:result");
+    public void testSendMessageToLogUsingGroupInterval() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                
from("seda:in").to("log:hello?groupInterval=200&groupDelay=100&groupActiveOnly=false").delay(50).to("mock:result");
             }
-        };
+        });
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+
+        for (int i = 0; i < 20; i++) {
+            template.sendBody("seda:in", "Hello World");
+        }
+        
+        assertMockEndpointsSatisfied();
     }
 }
\ No newline at end of file


Reply via email to