This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch uc
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 39d1b6a1db09fbfa03758508641d3678f44588a2
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Jun 22 15:05:29 2024 +0200

    CAMEL-20879: camel-core: remote vs local endpoints counters
---
 .../camel/impl/console/ContextDevConsole.java      |  9 ++---
 .../management/mbean/ManagedCamelContextMBean.java |  3 ++
 .../management/mbean/ManagedCamelContext.java      | 18 +++++++++
 .../core/commands/process/CamelContextStatus.java  | 44 ++++++++++++++++++++--
 .../core/commands/process/CamelRouteStatus.java    | 21 ++++++++---
 .../jbang/core/commands/process/CamelRouteTop.java |  5 ++-
 .../jbang/core/commands/process/ListProcess.java   | 44 ++++++++++++++++++++--
 7 files changed, 125 insertions(+), 19 deletions(-)

diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
index 2ed7cea893d..5981c5d7a8d 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
@@ -71,11 +71,9 @@ public class ContextDevConsole extends AbstractDevConsole {
                 if (!thp.isEmpty()) {
                     sb.append(String.format("\n    Messages/Sec: %s", thp));
                 }
-                sb.append(String.format("\n    Total: %s", 
mb.getExchangesTotal()));
-                sb.append(String.format("\n    Total (remote): %s", 
mb.getRemoteExchangesTotal()));
-                sb.append(String.format("\n    Failed: %s", 
mb.getExchangesFailed()));
-                sb.append(String.format("\n    Failed (remote): %s", 
mb.getRemoteExchangesFailed()));
-                sb.append(String.format("\n    Inflight: %s", 
mb.getExchangesInflight()));
+                sb.append(String.format("\n    Total: %s/%s", 
mb.getRemoteExchangesTotal(), mb.getExchangesTotal()));
+                sb.append(String.format("\n    Failed: %s/%s", 
mb.getRemoteExchangesFailed(), mb.getExchangesFailed()));
+                sb.append(String.format("\n    Inflight: %s/%s", 
mb.getRemoteExchangesInflight(), mb.getExchangesInflight()));
                 long idle = mb.getIdleSince();
                 if (idle > 0) {
                     sb.append(String.format("\n    Idle Since: %s", 
TimeUtils.printDuration(idle)));
@@ -156,6 +154,7 @@ public class ContextDevConsole extends AbstractDevConsole {
                 stats.put("exchangesInflight", mb.getExchangesInflight());
                 stats.put("remoteExchangesTotal", 
mb.getRemoteExchangesTotal());
                 stats.put("remoteExchangesFailed", 
mb.getRemoteExchangesFailed());
+                stats.put("remoteExchangesInflight", 
mb.getRemoteExchangesInflight());
                 stats.put("reloaded", reloaded);
                 stats.put("meanProcessingTime", mb.getMeanProcessingTime());
                 stats.put("maxProcessingTime", mb.getMaxProcessingTime());
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
index bae8cde3989..a2b8485f749 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
@@ -138,6 +138,9 @@ public interface ManagedCamelContextMBean extends 
ManagedPerformanceCounterMBean
     @ManagedAttribute(description = "Failed number of exchanges processed from 
remote endpoints only")
     long getRemoteExchangesFailed();
 
+    @ManagedAttribute(description = "Total number of exchanges inflight from 
remote endpoints only")
+    long getRemoteExchangesInflight();
+
     @ManagedAttribute(description = "Whether breadcrumbs is in use")
     boolean isUseBreadcrumb();
 
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
index b4c14c53346..a7e838b3fe9 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
@@ -67,6 +67,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
     private Statistic remoteExchangesTotal;
     private Statistic remoteExchangesCompleted;
     private Statistic remoteExchangesFailed;
+    private Statistic remoteExchangesInflight;
 
     public ManagedCamelContext(CamelContext context) {
         this.context = context;
@@ -81,6 +82,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
         this.remoteExchangesTotal = new StatisticCounter();
         this.remoteExchangesCompleted = new StatisticCounter();
         this.remoteExchangesFailed = new StatisticCounter();
+        this.remoteExchangesInflight = new StatisticCounter();
         boolean enabled = context.getManagementStrategy().getManagementAgent() 
!= null
                 && 
context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != 
ManagementStatisticsLevel.Off;
         setStatisticsEnabled(enabled);
@@ -92,6 +94,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
         remoteExchangesTotal.reset();
         remoteExchangesCompleted.reset();
         remoteExchangesFailed.reset();
+        remoteExchangesInflight.reset();
     }
 
     @Override
@@ -108,6 +111,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
                 if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
                     remoteExchangesTotal.increment();
                     remoteExchangesCompleted.increment();
+                    remoteExchangesInflight.decrement();
                 }
             }
         } else {
@@ -115,6 +119,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
             if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
                 remoteExchangesTotal.increment();
                 remoteExchangesCompleted.increment();
+                remoteExchangesInflight.decrement();
             }
         }
     }
@@ -133,6 +138,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
                 if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
                     remoteExchangesTotal.increment();
                     remoteExchangesFailed.increment();
+                    remoteExchangesInflight.decrement();
                 }
             }
         } else {
@@ -140,6 +146,7 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
             if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
                 remoteExchangesTotal.increment();
                 remoteExchangesFailed.increment();
+                remoteExchangesInflight.decrement();
             }
         }
     }
@@ -155,9 +162,15 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
             int level = uow.routeStackLevel(includeRouteTemplates, 
includeKamelets);
             if (level <= 1) {
                 super.processExchange(exchange, type);
+                if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                    remoteExchangesInflight.increment();
+                }
             }
         } else {
             super.processExchange(exchange, type);
+            if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                remoteExchangesInflight.increment();
+            }
         }
     }
 
@@ -375,6 +388,11 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
         return remoteExchangesFailed.getValue();
     }
 
+    @Override
+    public long getRemoteExchangesInflight() {
+        return remoteExchangesInflight.getValue();
+    }
+
     @Override
     public boolean isUseBreadcrumb() {
         return context.isUseBreadcrumb();
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
index df7db610e44..c04c6fd1b28 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
@@ -90,8 +90,20 @@ public class CamelContextStatus extends ProcessWatchCommand {
                                 row.throughput = thp.toString();
                             }
                             row.total = stats.get("exchangesTotal").toString();
-                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            Object num = stats.get("remoteExchangesTotal");
+                            if (num != null) {
+                                row.totalRemote = num.toString();
+                            }
                             row.failed = 
stats.get("exchangesFailed").toString();
+                            num = stats.get("remoteExchangesFailed");
+                            if (num != null) {
+                                row.failedRemote = num.toString();
+                            }
+                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            num = stats.get("remoteExchangesInflight");
+                            if (num != null) {
+                                row.inflightRemote = num.toString();
+                            }
                             row.reloaded = stats.get("reloaded").toString();
                             Object last = stats.get("lastProcessingTime");
                             if (last != null) {
@@ -157,9 +169,9 @@ public class CamelContextStatus extends ProcessWatchCommand 
{
                     new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
                     new Column().header("ROUTE").with(this::getRoutes),
                     new Column().header("MSG/S").with(this::getThroughput),
-                    new Column().header("TOTAL").with(r -> r.total),
-                    new Column().header("FAIL").with(r -> r.failed),
-                    new Column().header("INFLIGHT").with(r -> r.inflight),
+                    new Column().header("TOTAL").with(this::getTotal),
+                    new Column().header("FAIL").with(this::getFailed),
+                    new Column().header("INFLIGHT").with(this::getInflight),
                     new Column().header("LAST").with(r -> r.last),
                     new Column().header("DELTA").with(this::getDelta),
                     new 
Column().header("SINCE-LAST").with(this::getSinceLast))));
@@ -208,6 +220,27 @@ public class CamelContextStatus extends 
ProcessWatchCommand {
         }
     }
 
+    private String getTotal(Row r) {
+        if (r.totalRemote != null) {
+            return r.totalRemote + "/" + r.total;
+        }
+        return r.total;
+    }
+
+    private String getFailed(Row r) {
+        if (r.failedRemote != null) {
+            return r.failedRemote + "/" + r.failed;
+        }
+        return r.failed;
+    }
+
+    private String getInflight(Row r) {
+        if (r.inflightRemote != null) {
+            return r.inflightRemote + "/" + r.inflight;
+        }
+        return r.inflight;
+    }
+
     private String getPlatform(Row r) {
         if (r.platformVersion != null) {
             return r.platform + " v" + r.platformVersion;
@@ -271,8 +304,11 @@ public class CamelContextStatus extends 
ProcessWatchCommand {
         long uptime;
         String throughput;
         String total;
+        String totalRemote;
         String failed;
+        String failedRemote;
         String inflight;
+        String inflightRemote;
         String last;
         String delta;
         String sinceLastStarted;
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
index 89939bedc35..186411cce04 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.github.freva.asciitable.AsciiTable;
 import com.github.freva.asciitable.Column;
@@ -73,6 +74,7 @@ public class CamelRouteStatus extends ProcessWatchCommand {
     public Integer doProcessWatchCall() throws Exception {
         List<Row> rows = new ArrayList<>();
 
+        AtomicBoolean remoteVisible = new AtomicBoolean();
         List<Long> pids = findPids(name);
         ProcessHandle.allProcesses()
                 .filter(ph -> pids.contains(ph.pid()))
@@ -94,6 +96,10 @@ public class CamelRouteStatus extends ProcessWatchCommand {
                             row.pid = Long.toString(ph.pid());
                             row.routeId = o.getString("routeId");
                             row.from = o.getString("from");
+                            row.remote = o.getBooleanOrDefault("remote", 
false);
+                            if (row.remote) {
+                                remoteVisible.set(true);
+                            }
                             row.source = o.getString("source");
                             row.state = o.getString("state");
                             row.age = o.getString("uptime");
@@ -172,13 +178,13 @@ public class CamelRouteStatus extends ProcessWatchCommand 
{
         rows.sort(this::sortRow);
 
         if (!rows.isEmpty()) {
-            printTable(rows);
+            printTable(rows, remoteVisible.get());
         }
 
         return 0;
     }
 
-    protected void printTable(List<Row> rows) {
+    protected void printTable(List<Row> rows, boolean remoteVisible) {
         printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
                 new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
                 new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -188,9 +194,9 @@ public class CamelRouteStatus extends ProcessWatchCommand {
                 new 
Column().header("FROM").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
                         .maxWidth(45, OverflowBehaviour.ELLIPSIS_RIGHT)
                         .with(this::getFrom),
-                new 
Column().header("FROM").visible(wideUri).dataAlign(HorizontalAlign.LEFT)
-                        .maxWidth(45, OverflowBehaviour.NEWLINE)
-                        .with(this::getFrom),
+                new 
Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER)
+                        .dataAlign(HorizontalAlign.CENTER)
+                        .with(this::getRemote),
                 new 
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
                         .with(r -> r.state),
                 new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
@@ -260,6 +266,10 @@ public class CamelRouteStatus extends ProcessWatchCommand {
         return s;
     }
 
+    protected String getRemote(Row r) {
+        return r.remote ? "x" : "";
+    }
+
     protected String getId(Row r) {
         if (source && r.source != null) {
             return sourceLocLine(r.source);
@@ -286,6 +296,7 @@ public class CamelRouteStatus extends ProcessWatchCommand {
         long uptime;
         String routeId;
         String from;
+        boolean remote;
         String source;
         String state;
         String age;
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
index 31cfc6a3a56..75d65f92a70 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
@@ -35,7 +35,7 @@ public class CamelRouteTop extends CamelRouteStatus {
     }
 
     @Override
-    protected void printTable(List<Row> rows) {
+    protected void printTable(List<Row> rows, boolean remoteVisible) {
         printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
                 new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
                 new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -44,6 +44,9 @@ public class CamelRouteTop extends CamelRouteStatus {
                         .with(this::getId),
                 new 
Column().header("FROM").dataAlign(HorizontalAlign.LEFT).maxWidth(40, 
OverflowBehaviour.ELLIPSIS_RIGHT)
                         .with(this::getFrom),
+                new 
Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER)
+                        .dataAlign(HorizontalAlign.CENTER)
+                        .with(this::getRemote),
                 new 
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
                         .with(r -> r.state),
                 new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
index dd9c2213b90..a3dec68a651 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
@@ -82,8 +82,20 @@ public class ListProcess extends ProcessWatchCommand {
                         Map<String, ?> stats = context.getMap("statistics");
                         if (stats != null) {
                             row.total = stats.get("exchangesTotal").toString();
-                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            Object num = stats.get("remoteExchangesTotal");
+                            if (num != null) {
+                                row.totalRemote = num.toString();
+                            }
                             row.failed = 
stats.get("exchangesFailed").toString();
+                            num = stats.get("remoteExchangesFailed");
+                            if (num != null) {
+                                row.failedRemote = num.toString();
+                            }
+                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            num = stats.get("remoteExchangesInflight");
+                            if (num != null) {
+                                row.inflightRemote = num.toString();
+                            }
                         }
                         rows.add(row);
                     }
@@ -105,15 +117,36 @@ public class ListProcess extends ProcessWatchCommand {
                         new 
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
                                 .with(r -> extractState(r.state)),
                         new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.ago),
-                        new Column().header("TOTAL").with(r -> r.total),
-                        new Column().header("FAIL").with(r -> r.failed),
-                        new Column().header("INFLIGHT").with(r -> 
r.inflight))));
+                        new Column().header("TOTAL").with(this::getTotal),
+                        new Column().header("FAIL").with(this::getFailed),
+                        new 
Column().header("INFLIGHT").with(this::getInflight))));
             }
         }
 
         return 0;
     }
 
+    private String getTotal(Row r) {
+        if (r.totalRemote != null) {
+            return r.totalRemote + "/" + r.total;
+        }
+        return r.total;
+    }
+
+    private String getFailed(Row r) {
+        if (r.failedRemote != null) {
+            return r.failedRemote + "/" + r.failed;
+        }
+        return r.failed;
+    }
+
+    private String getInflight(Row r) {
+        if (r.inflightRemote != null) {
+            return r.inflightRemote + "/" + r.inflight;
+        }
+        return r.inflight;
+    }
+
     protected int sortRow(Row o1, Row o2) {
         String s = sort;
         int negate = 1;
@@ -141,8 +174,11 @@ public class ListProcess extends ProcessWatchCommand {
         String ago;
         long uptime;
         String total;
+        String totalRemote;
         String failed;
+        String failedRemote;
         String inflight;
+        String inflightRemote;
     }
 
 }

Reply via email to