Issue CAMEL-8640. Encapsulated BacklogTracer queue. Implementation of queue 
changed to list based instead of array based. Ensuring free space in queue 
responsibility moved into BacklogTracer instead from BacklogTracer's user.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2c19b34f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2c19b34f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2c19b34f

Branch: refs/heads/camel-2.15.x
Commit: 2c19b34f7a439d8f6119f3c500a784d63307d3c0
Parents: 2f2746e
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun May 3 11:32:49 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun May 3 11:41:48 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/DefaultCamelContext.java  |  2 +-
 .../camel/processor/CamelInternalProcessor.java | 21 +++----------
 .../processor/interceptor/BacklogTracer.java    | 32 ++++++++++++--------
 .../processor/interceptor/DefaultChannel.java   |  2 +-
 4 files changed, 26 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 99127f2..0b60f6a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -3461,7 +3461,7 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
 
     public InterceptStrategy getDefaultBacklogTracer() {
         if (defaultBacklogTracer == null) {
-            defaultBacklogTracer = new BacklogTracer(this);
+            defaultBacklogTracer = BacklogTracer.createTracer(this);
         }
         return defaultBacklogTracer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 97f557b..f8c71f4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.processor;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
@@ -521,15 +520,13 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
      */
     public static final class BacklogTracerAdvice implements 
CamelInternalProcessorAdvice {
 
-        private final Queue<DefaultBacklogTracerEventMessage> queue;
         private final BacklogTracer backlogTracer;
         private final ProcessorDefinition<?> processorDefinition;
         private final ProcessorDefinition<?> routeDefinition;
         private final boolean first;
 
-        public BacklogTracerAdvice(Queue<DefaultBacklogTracerEventMessage> 
queue, BacklogTracer backlogTracer,
-                                   ProcessorDefinition<?> processorDefinition, 
ProcessorDefinition<?> routeDefinition, boolean first) {
-            this.queue = queue;
+        public BacklogTracerAdvice(BacklogTracer backlogTracer, 
ProcessorDefinition<?> processorDefinition,
+                                   ProcessorDefinition<?> routeDefinition, 
boolean first) {
             this.backlogTracer = backlogTracer;
             this.processorDefinition = processorDefinition;
             this.routeDefinition = routeDefinition;
@@ -539,16 +536,6 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
         @Override
         public Object before(Exchange exchange) throws Exception {
             if (backlogTracer.shouldTrace(processorDefinition, exchange)) {
-                // ensure there is space on the queue
-                int drain = queue.size() - backlogTracer.getBacklogSize();
-                // and we need room for ourselves and possible also a first 
pseudo message as well
-                drain += first ? 2 : 1;
-                if (drain > 0) {
-                    for (int i = 0; i < drain; i++) {
-                        queue.poll();
-                    }
-                }
-
                 Date timestamp = new Date();
                 String toNode = processorDefinition.getId();
                 String exchangeId = exchange.getExchangeId();
@@ -560,10 +547,10 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
                 if (first) {
                     Date created = 
exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class);
                     DefaultBacklogTracerEventMessage pseudo = new 
DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), 
created, routeId, null, exchangeId, messageAsXml);
-                    queue.add(pseudo);
+                    backlogTracer.traceEvent(pseudo);
                 }
                 DefaultBacklogTracerEventMessage event = new 
DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), 
timestamp, routeId, toNode, exchangeId, messageAsXml);
-                queue.add(event);
+                backlogTracer.traceEvent(event);
             }
 
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
index 6dde932..e2c51bd 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor.interceptor;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
@@ -52,7 +52,7 @@ public class BacklogTracer extends ServiceSupport implements 
InterceptStrategy {
     private boolean enabled;
     private final AtomicLong traceCounter = new AtomicLong(0);
     // use a queue with a upper limit to avoid storing too many messages
-    private final Queue<DefaultBacklogTracerEventMessage> queue = new 
ArrayBlockingQueue<DefaultBacklogTracerEventMessage>(MAX_BACKLOG_SIZE);
+    private final Queue<BacklogTracerEventMessage> queue = new 
LinkedBlockingQueue<>(MAX_BACKLOG_SIZE);
     // how many of the last messages to keep in the backlog at total
     private int backlogSize = 1000;
     private boolean removeOnDump = true;
@@ -65,14 +65,10 @@ public class BacklogTracer extends ServiceSupport 
implements InterceptStrategy {
     private String traceFilter;
     private Predicate predicate;
 
-    public BacklogTracer(CamelContext camelContext) {
+    private BacklogTracer(CamelContext camelContext) {
         this.camelContext = camelContext;
     }
 
-    public Queue<DefaultBacklogTracerEventMessage> getQueue() {
-        return queue;
-    }
-
     @Override
     @Deprecated
     public Processor wrapProcessorInInterceptors(CamelContext context, 
ProcessorDefinition<?> definition, Processor target, Processor nextTarget) 
throws Exception {
@@ -86,8 +82,7 @@ public class BacklogTracer extends ServiceSupport implements 
InterceptStrategy {
      * @return a new backlog tracer
      */
     public static BacklogTracer createTracer(CamelContext context) {
-        BacklogTracer tracer = new BacklogTracer(context);
-        return tracer;
+        return new BacklogTracer(context);
     }
 
     /**
@@ -153,6 +148,19 @@ public class BacklogTracer extends ServiceSupport 
implements InterceptStrategy {
         return false;
     }
 
+    public void traceEvent(DefaultBacklogTracerEventMessage event) {
+        if (!enabled) {
+            return;
+        }
+
+        // ensure there is space on the queue and we need room for ourselves 
and possible also a first pseudo message as well
+        if (queue.size() >= backlogSize) {
+            queue.poll();
+        }
+
+        queue.add(event);
+    }
+
     private boolean shouldTraceFilter(Exchange exchange) {
         return predicate.matches(exchange);
     }
@@ -251,9 +259,9 @@ public class BacklogTracer extends ServiceSupport 
implements InterceptStrategy {
     }
 
     public List<BacklogTracerEventMessage> dumpTracedMessages(String nodeId) {
-        List<BacklogTracerEventMessage> answer = new 
ArrayList<BacklogTracerEventMessage>();
+        List<BacklogTracerEventMessage> answer = new ArrayList<>();
         if (nodeId != null) {
-            for (DefaultBacklogTracerEventMessage message : queue) {
+            for (BacklogTracerEventMessage message : queue) {
                 if (nodeId.equals(message.getToNode()) || 
nodeId.equals(message.getRouteId())) {
                     answer.add(message);
                 }
@@ -280,7 +288,7 @@ public class BacklogTracer extends ServiceSupport 
implements InterceptStrategy {
     }
 
     public List<BacklogTracerEventMessage> dumpAllTracedMessages() {
-        List<BacklogTracerEventMessage> answer = new 
ArrayList<BacklogTracerEventMessage>();
+        List<BacklogTracerEventMessage> answer = new ArrayList<>();
         answer.addAll(queue);
         if (isRemoveOnDump()) {
             queue.clear();

http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index c9ae2f3..dc719e3 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -229,7 +229,7 @@ public class DefaultChannel extends CamelInternalProcessor 
implements ModelChann
                 first = route.getOutputs().get(0) == definition;
             }
 
-            addAdvice(new BacklogTracerAdvice(backlogTracer.getQueue(), 
backlogTracer, targetOutputDef, route, first));
+            addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, 
route, first));
 
             // add debugger as well so we have both tracing and debugging out 
of the box
             InterceptStrategy debugger = getOrCreateBacklogDebugger();

Reply via email to