This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.20.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.20.x by this push: new 299b5afe009 CAMEL-18844: Fix event console performance degration on high concurrent load due to internal locking. It is okay to not use concurrent queue and instead just a basic array with moving cursor - its okay if an event is overridden. 299b5afe009 is described below commit 299b5afe00925c3bc872acf52cc0023fbe826539 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Dec 28 13:57:11 2022 +0100 CAMEL-18844: Fix event console performance degration on high concurrent load due to internal locking. It is okay to not use concurrent queue and instead just a basic array with moving cursor - its okay if an event is overridden. --- .../apache/camel/impl/console/EventConsole.java | 178 ++++++++++----------- 1 file changed, 89 insertions(+), 89 deletions(-) diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java index 2967f6d3ae6..00504fa8c72 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/EventConsole.java @@ -19,8 +19,6 @@ package org.apache.camel.impl.console; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.Configurer; @@ -34,12 +32,16 @@ import org.apache.camel.util.json.JsonObject; @Configurer(bootstrap = true) public class EventConsole extends AbstractDevConsole { - @Metadata(defaultValue = "25", description = "Maximum capacity of last number of events to capture") + @Metadata(defaultValue = "25", + description = "Maximum capacity of last number of events to capture (capacity must be between 25 and 1000)") private int capacity = 25; - private BlockingQueue<CamelEvent> events; - private BlockingQueue<CamelEvent.RouteEvent> routeEvents; - private BlockingQueue<CamelEvent.ExchangeEvent> exchangeEvents; + private CamelEvent[] events; + private volatile int posEvents = -1; + private CamelEvent.RouteEvent[] routeEvents; + private volatile int posRoutes = -1; + private CamelEvent.ExchangeEvent[] exchangeEvents; + private volatile int posExchanges = -1; private final ConsoleEventNotifier listener = new ConsoleEventNotifier(); public EventConsole() { @@ -56,10 +58,13 @@ public class EventConsole extends AbstractDevConsole { @Override protected void doInit() throws Exception { - // capacity capped queue using fair to make sure events are in correct order - this.events = new ArrayBlockingQueue<>(capacity, true); - this.routeEvents = new ArrayBlockingQueue<>(capacity, true); - this.exchangeEvents = new ArrayBlockingQueue<>(capacity, true); + if (capacity > 1000 || capacity < 25) { + throw new IllegalArgumentException("Capacity must be between 25 and 1000"); + } + // capacity capped arrays, and we do not care about concurrency to avoid locking or slowdowns + this.events = new CamelEvent[capacity]; + this.routeEvents = new CamelEvent.RouteEvent[capacity]; + this.exchangeEvents = new CamelEvent.ExchangeEvent[capacity]; } @Override @@ -75,39 +80,16 @@ public class EventConsole extends AbstractDevConsole { protected String doCallText(Map<String, Object> options) { StringBuilder sb = new StringBuilder(); - if (events != null && !events.isEmpty()) { - sb.append(String.format("Last %s Camel Events:", events.size())); - for (CamelEvent event : events) { - if (event.getTimestamp() > 0) { - sb.append(String.format("\n %s (age: %s)", event, TimeUtils.printSince(event.getTimestamp()))); - } else { - sb.append(String.format("\n %s", event)); - } - } + if (posEvents != -1) { + sb.append(appendTextEvents(events, "Camel", posEvents, capacity)); sb.append("\n"); } - if (routeEvents != null && !routeEvents.isEmpty()) { - sb.append("\n"); - sb.append(String.format("Last %s Route Events:", routeEvents.size())); - for (CamelEvent.RouteEvent event : routeEvents) { - if (event.getTimestamp() > 0) { - sb.append(String.format("\n %s (age: %s)", event, TimeUtils.printSince(event.getTimestamp()))); - } else { - sb.append(String.format("\n %s", event)); - } - } + if (posRoutes != -1) { + sb.append(appendTextEvents(routeEvents, "Route", posRoutes, capacity)); sb.append("\n"); } - if (exchangeEvents != null && !exchangeEvents.isEmpty()) { - sb.append("\n"); - sb.append(String.format("Last %s Exchange Events:", exchangeEvents.size())); - for (CamelEvent.ExchangeEvent event : exchangeEvents) { - if (event.getTimestamp() > 0) { - sb.append(String.format("\n %s (age: %s)", event, TimeUtils.printSince(event.getTimestamp()))); - } else { - sb.append(String.format("\n %s", event)); - } - } + if (posExchanges != -1) { + sb.append(appendTextEvents(exchangeEvents, "Exchange", posExchanges, capacity)); sb.append("\n"); } @@ -117,82 +99,100 @@ public class EventConsole extends AbstractDevConsole { protected JsonObject doCallJson(Map<String, Object> options) { JsonObject root = new JsonObject(); - if (events != null && !events.isEmpty()) { - List<JsonObject> arr = new ArrayList<>(); - for (CamelEvent event : events) { - JsonObject jo = new JsonObject(); - jo.put("type", event.getType().toString()); - if (event.getTimestamp() > 0) { - jo.put("timestamp", event.getTimestamp()); - } - jo.put("message", event.toString()); - arr.add(jo); + if (posEvents != -1) { + List<JsonObject> arr = appendJSonEvents(events, posEvents, capacity); + if (!arr.isEmpty()) { + root.put("events", arr); } - root.put("events", arr); } - if (routeEvents != null && !routeEvents.isEmpty()) { - List<JsonObject> arr = new ArrayList<>(); - for (CamelEvent event : routeEvents) { - JsonObject jo = new JsonObject(); - jo.put("type", event.getType().toString()); + if (posRoutes != -1) { + List<JsonObject> arr = appendJSonEvents(routeEvents, posRoutes, capacity); + if (!arr.isEmpty()) { + root.put("routeEvents", arr); + } + } + if (posExchanges != -1) { + List<JsonObject> arr = appendJSonEvents(exchangeEvents, posExchanges, capacity); + if (!arr.isEmpty()) { + root.put("exchangeEvents", arr); + } + } + + return root; + } + + private static String appendTextEvents(CamelEvent[] events, String kind, int cursor, int capacity) { + StringBuilder sb = new StringBuilder(); + int pos = 0; + int added = 0; + // cursor is at last event, so move to back + cursor = ++cursor % capacity; + CamelEvent event = events[cursor]; + while (pos < capacity) { + if (event != null) { + added++; if (event.getTimestamp() > 0) { - jo.put("timestamp", event.getTimestamp()); + sb.append(String.format(" %s (age: %s)\n", event, TimeUtils.printSince(event.getTimestamp()))); + } else { + sb.append(String.format(" %s\n", event)); } - jo.put("message", event.toString()); - arr.add(jo); } - root.put("routeEvents", arr); + // move to next + pos++; + cursor = ++cursor % capacity; + event = events[cursor]; } - if (exchangeEvents != null && !exchangeEvents.isEmpty()) { - List<JsonObject> arr = new ArrayList<>(); - for (CamelEvent.ExchangeEvent event : exchangeEvents) { + if (added > 0) { + sb.insert(0, String.format("Last %s %s Events:\n", added, kind)); + } + return sb.toString(); + } + + private static List<JsonObject> appendJSonEvents(CamelEvent[] events, int cursor, int capacity) { + List<JsonObject> arr = new ArrayList<>(); + int pos = 0; + // cursor is at last event, so move to back + cursor = ++cursor % capacity; + CamelEvent event = events[cursor]; + while (pos < capacity) { + if (event != null) { JsonObject jo = new JsonObject(); jo.put("type", event.getType().toString()); if (event.getTimestamp() > 0) { jo.put("timestamp", event.getTimestamp()); } - jo.put("exchangeId", event.getExchange().getExchangeId()); + if (event instanceof CamelEvent.ExchangeEvent) { + CamelEvent.ExchangeEvent ee = (CamelEvent.ExchangeEvent) event; + jo.put("exchangeId", ee.getExchange().getExchangeId()); + } jo.put("message", event.toString()); arr.add(jo); } - root.put("exchangeEvents", arr); + // move to next + pos++; + cursor = ++cursor % capacity; + event = events[cursor]; } - - return root; + return arr; } private class ConsoleEventNotifier extends EventNotifierSupport { @Override public void notify(CamelEvent event) throws Exception { - // offer new event and if false, then remove head and try again - + // for high concurrent load then exchange events may override + // that is okay as we do not want this console to cause slow-downs if (event instanceof CamelEvent.ExchangeEvent) { CamelEvent.ExchangeEvent ce = (CamelEvent.ExchangeEvent) event; - boolean added; - do { - added = exchangeEvents.offer(ce); - if (!added) { - exchangeEvents.poll(); - } - } while (!added); + posExchanges = ++posExchanges % capacity; + exchangeEvents[posExchanges] = ce; } else if (event instanceof CamelEvent.RouteEvent) { CamelEvent.RouteEvent re = (CamelEvent.RouteEvent) event; - boolean added; - do { - added = routeEvents.offer(re); - if (!added) { - routeEvents.poll(); - } - } while (!added); + posRoutes = ++posRoutes % capacity; + routeEvents[posRoutes] = re; } else { - boolean added; - do { - added = events.offer(event); - if (!added) { - events.poll(); - } - } while (!added); + posEvents = ++posEvents % capacity; + events[posEvents] = event; } }