This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch uc in repository https://gitbox.apache.org/repos/asf/camel.git
commit 39d1b6a1db09fbfa03758508641d3678f44588a2 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Jun 22 15:05:29 2024 +0200 CAMEL-20879: camel-core: remote vs local endpoints counters --- .../camel/impl/console/ContextDevConsole.java | 9 ++--- .../management/mbean/ManagedCamelContextMBean.java | 3 ++ .../management/mbean/ManagedCamelContext.java | 18 +++++++++ .../core/commands/process/CamelContextStatus.java | 44 ++++++++++++++++++++-- .../core/commands/process/CamelRouteStatus.java | 21 ++++++++--- .../jbang/core/commands/process/CamelRouteTop.java | 5 ++- .../jbang/core/commands/process/ListProcess.java | 44 ++++++++++++++++++++-- 7 files changed, 125 insertions(+), 19 deletions(-) diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java index 2ed7cea893d..5981c5d7a8d 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java @@ -71,11 +71,9 @@ public class ContextDevConsole extends AbstractDevConsole { if (!thp.isEmpty()) { sb.append(String.format("\n Messages/Sec: %s", thp)); } - sb.append(String.format("\n Total: %s", mb.getExchangesTotal())); - sb.append(String.format("\n Total (remote): %s", mb.getRemoteExchangesTotal())); - sb.append(String.format("\n Failed: %s", mb.getExchangesFailed())); - sb.append(String.format("\n Failed (remote): %s", mb.getRemoteExchangesFailed())); - sb.append(String.format("\n Inflight: %s", mb.getExchangesInflight())); + sb.append(String.format("\n Total: %s/%s", mb.getRemoteExchangesTotal(), mb.getExchangesTotal())); + sb.append(String.format("\n Failed: %s/%s", mb.getRemoteExchangesFailed(), mb.getExchangesFailed())); + sb.append(String.format("\n Inflight: %s/%s", mb.getRemoteExchangesInflight(), mb.getExchangesInflight())); long idle = mb.getIdleSince(); if (idle > 0) { sb.append(String.format("\n Idle Since: %s", TimeUtils.printDuration(idle))); @@ -156,6 +154,7 @@ public class ContextDevConsole extends AbstractDevConsole { stats.put("exchangesInflight", mb.getExchangesInflight()); stats.put("remoteExchangesTotal", mb.getRemoteExchangesTotal()); stats.put("remoteExchangesFailed", mb.getRemoteExchangesFailed()); + stats.put("remoteExchangesInflight", mb.getRemoteExchangesInflight()); stats.put("reloaded", reloaded); stats.put("meanProcessingTime", mb.getMeanProcessingTime()); stats.put("maxProcessingTime", mb.getMaxProcessingTime()); diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java index bae8cde3989..a2b8485f749 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java @@ -138,6 +138,9 @@ public interface ManagedCamelContextMBean extends ManagedPerformanceCounterMBean @ManagedAttribute(description = "Failed number of exchanges processed from remote endpoints only") long getRemoteExchangesFailed(); + @ManagedAttribute(description = "Total number of exchanges inflight from remote endpoints only") + long getRemoteExchangesInflight(); + @ManagedAttribute(description = "Whether breadcrumbs is in use") boolean isUseBreadcrumb(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java index b4c14c53346..a7e838b3fe9 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java @@ -67,6 +67,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti private Statistic remoteExchangesTotal; private Statistic remoteExchangesCompleted; private Statistic remoteExchangesFailed; + private Statistic remoteExchangesInflight; public ManagedCamelContext(CamelContext context) { this.context = context; @@ -81,6 +82,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti this.remoteExchangesTotal = new StatisticCounter(); this.remoteExchangesCompleted = new StatisticCounter(); this.remoteExchangesFailed = new StatisticCounter(); + this.remoteExchangesInflight = new StatisticCounter(); boolean enabled = context.getManagementStrategy().getManagementAgent() != null && context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off; setStatisticsEnabled(enabled); @@ -92,6 +94,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti remoteExchangesTotal.reset(); remoteExchangesCompleted.reset(); remoteExchangesFailed.reset(); + remoteExchangesInflight.reset(); } @Override @@ -108,6 +111,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { remoteExchangesTotal.increment(); remoteExchangesCompleted.increment(); + remoteExchangesInflight.decrement(); } } } else { @@ -115,6 +119,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { remoteExchangesTotal.increment(); remoteExchangesCompleted.increment(); + remoteExchangesInflight.decrement(); } } } @@ -133,6 +138,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { remoteExchangesTotal.increment(); remoteExchangesFailed.increment(); + remoteExchangesInflight.decrement(); } } } else { @@ -140,6 +146,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { remoteExchangesTotal.increment(); remoteExchangesFailed.increment(); + remoteExchangesInflight.decrement(); } } } @@ -155,9 +162,15 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti int level = uow.routeStackLevel(includeRouteTemplates, includeKamelets); if (level <= 1) { super.processExchange(exchange, type); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesInflight.increment(); + } } } else { super.processExchange(exchange, type); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesInflight.increment(); + } } } @@ -375,6 +388,11 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti return remoteExchangesFailed.getValue(); } + @Override + public long getRemoteExchangesInflight() { + return remoteExchangesInflight.getValue(); + } + @Override public boolean isUseBreadcrumb() { return context.isUseBreadcrumb(); diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java index df7db610e44..c04c6fd1b28 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java @@ -90,8 +90,20 @@ public class CamelContextStatus extends ProcessWatchCommand { row.throughput = thp.toString(); } row.total = stats.get("exchangesTotal").toString(); - row.inflight = stats.get("exchangesInflight").toString(); + Object num = stats.get("remoteExchangesTotal"); + if (num != null) { + row.totalRemote = num.toString(); + } row.failed = stats.get("exchangesFailed").toString(); + num = stats.get("remoteExchangesFailed"); + if (num != null) { + row.failedRemote = num.toString(); + } + row.inflight = stats.get("exchangesInflight").toString(); + num = stats.get("remoteExchangesInflight"); + if (num != null) { + row.inflightRemote = num.toString(); + } row.reloaded = stats.get("reloaded").toString(); Object last = stats.get("lastProcessingTime"); if (last != null) { @@ -157,9 +169,9 @@ public class CamelContextStatus extends ProcessWatchCommand { new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age), new Column().header("ROUTE").with(this::getRoutes), new Column().header("MSG/S").with(this::getThroughput), - new Column().header("TOTAL").with(r -> r.total), - new Column().header("FAIL").with(r -> r.failed), - new Column().header("INFLIGHT").with(r -> r.inflight), + new Column().header("TOTAL").with(this::getTotal), + new Column().header("FAIL").with(this::getFailed), + new Column().header("INFLIGHT").with(this::getInflight), new Column().header("LAST").with(r -> r.last), new Column().header("DELTA").with(this::getDelta), new Column().header("SINCE-LAST").with(this::getSinceLast)))); @@ -208,6 +220,27 @@ public class CamelContextStatus extends ProcessWatchCommand { } } + private String getTotal(Row r) { + if (r.totalRemote != null) { + return r.totalRemote + "/" + r.total; + } + return r.total; + } + + private String getFailed(Row r) { + if (r.failedRemote != null) { + return r.failedRemote + "/" + r.failed; + } + return r.failed; + } + + private String getInflight(Row r) { + if (r.inflightRemote != null) { + return r.inflightRemote + "/" + r.inflight; + } + return r.inflight; + } + private String getPlatform(Row r) { if (r.platformVersion != null) { return r.platform + " v" + r.platformVersion; @@ -271,8 +304,11 @@ public class CamelContextStatus extends ProcessWatchCommand { long uptime; String throughput; String total; + String totalRemote; String failed; + String failedRemote; String inflight; + String inflightRemote; String last; String delta; String sinceLastStarted; diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java index 89939bedc35..186411cce04 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import com.github.freva.asciitable.AsciiTable; import com.github.freva.asciitable.Column; @@ -73,6 +74,7 @@ public class CamelRouteStatus extends ProcessWatchCommand { public Integer doProcessWatchCall() throws Exception { List<Row> rows = new ArrayList<>(); + AtomicBoolean remoteVisible = new AtomicBoolean(); List<Long> pids = findPids(name); ProcessHandle.allProcesses() .filter(ph -> pids.contains(ph.pid())) @@ -94,6 +96,10 @@ public class CamelRouteStatus extends ProcessWatchCommand { row.pid = Long.toString(ph.pid()); row.routeId = o.getString("routeId"); row.from = o.getString("from"); + row.remote = o.getBooleanOrDefault("remote", false); + if (row.remote) { + remoteVisible.set(true); + } row.source = o.getString("source"); row.state = o.getString("state"); row.age = o.getString("uptime"); @@ -172,13 +178,13 @@ public class CamelRouteStatus extends ProcessWatchCommand { rows.sort(this::sortRow); if (!rows.isEmpty()) { - printTable(rows); + printTable(rows, remoteVisible.get()); } return 0; } - protected void printTable(List<Row> rows) { + protected void printTable(List<Row> rows, boolean remoteVisible) { printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) @@ -188,9 +194,9 @@ public class CamelRouteStatus extends ProcessWatchCommand { new Column().header("FROM").visible(!wideUri).dataAlign(HorizontalAlign.LEFT) .maxWidth(45, OverflowBehaviour.ELLIPSIS_RIGHT) .with(this::getFrom), - new Column().header("FROM").visible(wideUri).dataAlign(HorizontalAlign.LEFT) - .maxWidth(45, OverflowBehaviour.NEWLINE) - .with(this::getFrom), + new Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER) + .dataAlign(HorizontalAlign.CENTER) + .with(this::getRemote), new Column().header("STATUS").headerAlign(HorizontalAlign.CENTER) .with(r -> r.state), new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age), @@ -260,6 +266,10 @@ public class CamelRouteStatus extends ProcessWatchCommand { return s; } + protected String getRemote(Row r) { + return r.remote ? "x" : ""; + } + protected String getId(Row r) { if (source && r.source != null) { return sourceLocLine(r.source); @@ -286,6 +296,7 @@ public class CamelRouteStatus extends ProcessWatchCommand { long uptime; String routeId; String from; + boolean remote; String source; String state; String age; diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java index 31cfc6a3a56..75d65f92a70 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java @@ -35,7 +35,7 @@ public class CamelRouteTop extends CamelRouteStatus { } @Override - protected void printTable(List<Row> rows) { + protected void printTable(List<Row> rows, boolean remoteVisible) { printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) @@ -44,6 +44,9 @@ public class CamelRouteTop extends CamelRouteStatus { .with(this::getId), new Column().header("FROM").dataAlign(HorizontalAlign.LEFT).maxWidth(40, OverflowBehaviour.ELLIPSIS_RIGHT) .with(this::getFrom), + new Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER) + .dataAlign(HorizontalAlign.CENTER) + .with(this::getRemote), new Column().header("STATUS").headerAlign(HorizontalAlign.CENTER) .with(r -> r.state), new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age), diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java index dd9c2213b90..a3dec68a651 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java @@ -82,8 +82,20 @@ public class ListProcess extends ProcessWatchCommand { Map<String, ?> stats = context.getMap("statistics"); if (stats != null) { row.total = stats.get("exchangesTotal").toString(); - row.inflight = stats.get("exchangesInflight").toString(); + Object num = stats.get("remoteExchangesTotal"); + if (num != null) { + row.totalRemote = num.toString(); + } row.failed = stats.get("exchangesFailed").toString(); + num = stats.get("remoteExchangesFailed"); + if (num != null) { + row.failedRemote = num.toString(); + } + row.inflight = stats.get("exchangesInflight").toString(); + num = stats.get("remoteExchangesInflight"); + if (num != null) { + row.inflightRemote = num.toString(); + } } rows.add(row); } @@ -105,15 +117,36 @@ public class ListProcess extends ProcessWatchCommand { new Column().header("STATUS").headerAlign(HorizontalAlign.CENTER) .with(r -> extractState(r.state)), new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.ago), - new Column().header("TOTAL").with(r -> r.total), - new Column().header("FAIL").with(r -> r.failed), - new Column().header("INFLIGHT").with(r -> r.inflight)))); + new Column().header("TOTAL").with(this::getTotal), + new Column().header("FAIL").with(this::getFailed), + new Column().header("INFLIGHT").with(this::getInflight)))); } } return 0; } + private String getTotal(Row r) { + if (r.totalRemote != null) { + return r.totalRemote + "/" + r.total; + } + return r.total; + } + + private String getFailed(Row r) { + if (r.failedRemote != null) { + return r.failedRemote + "/" + r.failed; + } + return r.failed; + } + + private String getInflight(Row r) { + if (r.inflightRemote != null) { + return r.inflightRemote + "/" + r.inflight; + } + return r.inflight; + } + protected int sortRow(Row o1, Row o2) { String s = sort; int negate = 1; @@ -141,8 +174,11 @@ public class ListProcess extends ProcessWatchCommand { String ago; long uptime; String total; + String totalRemote; String failed; + String failedRemote; String inflight; + String inflightRemote; } }