This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 594290e3924 CAMEL-18844: Fix event console performance degration on 
high concurrent load due to internal locking. It is okay to not use concurrent 
queue and instead just a basic array with moving cursor - its okay if an event 
is overridden.
594290e3924 is described below

commit 594290e392469a10c46054fd3804c7a4a52baabd
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Dec 28 13:57:11 2022 +0100

    CAMEL-18844: Fix event console performance degration on high concurrent 
load due to internal locking. It is okay to not use concurrent queue and 
instead just a basic array with moving cursor - its okay if an event is 
overridden.
---
 .../apache/camel/impl/console/EventConsole.java    | 178 ++++++++++-----------
 1 file changed, 89 insertions(+), 89 deletions(-)

diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
index 2967f6d3ae6..00504fa8c72 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java
@@ -19,8 +19,6 @@ package org.apache.camel.impl.console;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 
 import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.Configurer;
@@ -34,12 +32,16 @@ import org.apache.camel.util.json.JsonObject;
 @Configurer(bootstrap = true)
 public class EventConsole extends AbstractDevConsole {
 
-    @Metadata(defaultValue = "25", description = "Maximum capacity of last 
number of events to capture")
+    @Metadata(defaultValue = "25",
+              description = "Maximum capacity of last number of events to 
capture (capacity must be between 25 and 1000)")
     private int capacity = 25;
 
-    private BlockingQueue<CamelEvent> events;
-    private BlockingQueue<CamelEvent.RouteEvent> routeEvents;
-    private BlockingQueue<CamelEvent.ExchangeEvent> exchangeEvents;
+    private CamelEvent[] events;
+    private volatile int posEvents = -1;
+    private CamelEvent.RouteEvent[] routeEvents;
+    private volatile int posRoutes = -1;
+    private CamelEvent.ExchangeEvent[] exchangeEvents;
+    private volatile int posExchanges = -1;
     private final ConsoleEventNotifier listener = new ConsoleEventNotifier();
 
     public EventConsole() {
@@ -56,10 +58,13 @@ public class EventConsole extends AbstractDevConsole {
 
     @Override
     protected void doInit() throws Exception {
-        // capacity capped queue using fair to make sure events are in correct 
order
-        this.events = new ArrayBlockingQueue<>(capacity, true);
-        this.routeEvents = new ArrayBlockingQueue<>(capacity, true);
-        this.exchangeEvents = new ArrayBlockingQueue<>(capacity, true);
+        if (capacity > 1000 || capacity < 25) {
+            throw new IllegalArgumentException("Capacity must be between 25 
and 1000");
+        }
+        // capacity capped arrays, and we do not care about concurrency to 
avoid locking or slowdowns
+        this.events = new CamelEvent[capacity];
+        this.routeEvents = new CamelEvent.RouteEvent[capacity];
+        this.exchangeEvents = new CamelEvent.ExchangeEvent[capacity];
     }
 
     @Override
@@ -75,39 +80,16 @@ public class EventConsole extends AbstractDevConsole {
     protected String doCallText(Map<String, Object> options) {
         StringBuilder sb = new StringBuilder();
 
-        if (events != null && !events.isEmpty()) {
-            sb.append(String.format("Last %s Camel Events:", events.size()));
-            for (CamelEvent event : events) {
-                if (event.getTimestamp() > 0) {
-                    sb.append(String.format("\n    %s (age: %s)", event, 
TimeUtils.printSince(event.getTimestamp())));
-                } else {
-                    sb.append(String.format("\n    %s", event));
-                }
-            }
+        if (posEvents != -1) {
+            sb.append(appendTextEvents(events, "Camel", posEvents, capacity));
             sb.append("\n");
         }
-        if (routeEvents != null && !routeEvents.isEmpty()) {
-            sb.append("\n");
-            sb.append(String.format("Last %s Route Events:", 
routeEvents.size()));
-            for (CamelEvent.RouteEvent event : routeEvents) {
-                if (event.getTimestamp() > 0) {
-                    sb.append(String.format("\n    %s (age: %s)", event, 
TimeUtils.printSince(event.getTimestamp())));
-                } else {
-                    sb.append(String.format("\n    %s", event));
-                }
-            }
+        if (posRoutes != -1) {
+            sb.append(appendTextEvents(routeEvents, "Route", posRoutes, 
capacity));
             sb.append("\n");
         }
-        if (exchangeEvents != null && !exchangeEvents.isEmpty()) {
-            sb.append("\n");
-            sb.append(String.format("Last %s Exchange Events:", 
exchangeEvents.size()));
-            for (CamelEvent.ExchangeEvent event : exchangeEvents) {
-                if (event.getTimestamp() > 0) {
-                    sb.append(String.format("\n    %s (age: %s)", event, 
TimeUtils.printSince(event.getTimestamp())));
-                } else {
-                    sb.append(String.format("\n    %s", event));
-                }
-            }
+        if (posExchanges != -1) {
+            sb.append(appendTextEvents(exchangeEvents, "Exchange", 
posExchanges, capacity));
             sb.append("\n");
         }
 
@@ -117,82 +99,100 @@ public class EventConsole extends AbstractDevConsole {
     protected JsonObject doCallJson(Map<String, Object> options) {
         JsonObject root = new JsonObject();
 
-        if (events != null && !events.isEmpty()) {
-            List<JsonObject> arr = new ArrayList<>();
-            for (CamelEvent event : events) {
-                JsonObject jo = new JsonObject();
-                jo.put("type", event.getType().toString());
-                if (event.getTimestamp() > 0) {
-                    jo.put("timestamp", event.getTimestamp());
-                }
-                jo.put("message", event.toString());
-                arr.add(jo);
+        if (posEvents != -1) {
+            List<JsonObject> arr = appendJSonEvents(events, posEvents, 
capacity);
+            if (!arr.isEmpty()) {
+                root.put("events", arr);
             }
-            root.put("events", arr);
         }
-        if (routeEvents != null && !routeEvents.isEmpty()) {
-            List<JsonObject> arr = new ArrayList<>();
-            for (CamelEvent event : routeEvents) {
-                JsonObject jo = new JsonObject();
-                jo.put("type", event.getType().toString());
+        if (posRoutes != -1) {
+            List<JsonObject> arr = appendJSonEvents(routeEvents, posRoutes, 
capacity);
+            if (!arr.isEmpty()) {
+                root.put("routeEvents", arr);
+            }
+        }
+        if (posExchanges != -1) {
+            List<JsonObject> arr = appendJSonEvents(exchangeEvents, 
posExchanges, capacity);
+            if (!arr.isEmpty()) {
+                root.put("exchangeEvents", arr);
+            }
+        }
+
+        return root;
+    }
+
+    private static String appendTextEvents(CamelEvent[] events, String kind, 
int cursor, int capacity) {
+        StringBuilder sb = new StringBuilder();
+        int pos = 0;
+        int added = 0;
+        // cursor is at last event, so move to back
+        cursor = ++cursor % capacity;
+        CamelEvent event = events[cursor];
+        while (pos < capacity) {
+            if (event != null) {
+                added++;
                 if (event.getTimestamp() > 0) {
-                    jo.put("timestamp", event.getTimestamp());
+                    sb.append(String.format("    %s (age: %s)\n", event, 
TimeUtils.printSince(event.getTimestamp())));
+                } else {
+                    sb.append(String.format("    %s\n", event));
                 }
-                jo.put("message", event.toString());
-                arr.add(jo);
             }
-            root.put("routeEvents", arr);
+            // move to next
+            pos++;
+            cursor = ++cursor % capacity;
+            event = events[cursor];
         }
-        if (exchangeEvents != null && !exchangeEvents.isEmpty()) {
-            List<JsonObject> arr = new ArrayList<>();
-            for (CamelEvent.ExchangeEvent event : exchangeEvents) {
+        if (added > 0) {
+            sb.insert(0, String.format("Last %s %s Events:\n", added, kind));
+        }
+        return sb.toString();
+    }
+
+    private static List<JsonObject> appendJSonEvents(CamelEvent[] events, int 
cursor, int capacity) {
+        List<JsonObject> arr = new ArrayList<>();
+        int pos = 0;
+        // cursor is at last event, so move to back
+        cursor = ++cursor % capacity;
+        CamelEvent event = events[cursor];
+        while (pos < capacity) {
+            if (event != null) {
                 JsonObject jo = new JsonObject();
                 jo.put("type", event.getType().toString());
                 if (event.getTimestamp() > 0) {
                     jo.put("timestamp", event.getTimestamp());
                 }
-                jo.put("exchangeId", event.getExchange().getExchangeId());
+                if (event instanceof CamelEvent.ExchangeEvent) {
+                    CamelEvent.ExchangeEvent ee = (CamelEvent.ExchangeEvent) 
event;
+                    jo.put("exchangeId", ee.getExchange().getExchangeId());
+                }
                 jo.put("message", event.toString());
                 arr.add(jo);
             }
-            root.put("exchangeEvents", arr);
+            // move to next
+            pos++;
+            cursor = ++cursor % capacity;
+            event = events[cursor];
         }
-
-        return root;
+        return arr;
     }
 
     private class ConsoleEventNotifier extends EventNotifierSupport {
 
         @Override
         public void notify(CamelEvent event) throws Exception {
-            // offer new event and if false, then remove head and try again
-
+            // for high concurrent load then exchange events may override
+            // that is okay as we do not want this console to cause slow-downs
             if (event instanceof CamelEvent.ExchangeEvent) {
                 CamelEvent.ExchangeEvent ce = (CamelEvent.ExchangeEvent) event;
-                boolean added;
-                do {
-                    added = exchangeEvents.offer(ce);
-                    if (!added) {
-                        exchangeEvents.poll();
-                    }
-                } while (!added);
+                posExchanges = ++posExchanges % capacity;
+                exchangeEvents[posExchanges] = ce;
             } else if (event instanceof CamelEvent.RouteEvent) {
                 CamelEvent.RouteEvent re = (CamelEvent.RouteEvent) event;
-                boolean added;
-                do {
-                    added = routeEvents.offer(re);
-                    if (!added) {
-                        routeEvents.poll();
-                    }
-                } while (!added);
+                posRoutes = ++posRoutes % capacity;
+                routeEvents[posRoutes] = re;
             } else {
-                boolean added;
-                do {
-                    added = events.offer(event);
-                    if (!added) {
-                        events.poll();
-                    }
-                } while (!added);
+                posEvents = ++posEvents % capacity;
+                events[posEvents] = event;
             }
         }
 

Reply via email to