This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 87c8e832e9f remote vs local counter to better observe how many 
internal and external messages is being processed (#14617)
87c8e832e9f is described below

commit 87c8e832e9f73ae1f33867085f82b9303bb69939
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sat Jun 22 15:34:59 2024 +0200

    remote vs local counter to better observe how many internal and external 
messages is being processed (#14617)
    
    CAMEL-20879: camel-core: remote vs local endpoints counters
---
 .../org/apache/camel/spi/InflightRepository.java   |  5 ++
 .../impl/engine/DefaultInflightRepository.java     |  8 +++
 .../camel/impl/console/ConsumerDevConsole.java     | 10 ++--
 .../camel/impl/console/ContextDevConsole.java      | 18 ++++---
 .../camel/impl/console/EndpointDevConsole.java     |  6 ++-
 .../apache/camel/impl/console/InflightConsole.java |  6 ++-
 .../apache/camel/impl/console/RouteDevConsole.java |  2 +
 .../management/mbean/ManagedCamelContextMBean.java | 12 +++++
 .../api/management/mbean/ManagedConsumerMBean.java |  3 ++
 .../api/management/mbean/ManagedEndpointMBean.java |  2 +-
 .../api/management/mbean/ManagedProducerMBean.java |  3 ++
 .../api/management/mbean/ManagedRouteMBean.java    |  3 ++
 .../management/mbean/ManagedCamelContext.java      | 63 ++++++++++++++++++++++
 .../camel/management/mbean/ManagedConsumer.java    |  5 ++
 .../camel/management/mbean/ManagedProducer.java    |  4 ++
 .../camel/management/mbean/ManagedRoute.java       |  8 +++
 .../core/commands/process/CamelContextStatus.java  | 44 +++++++++++++--
 .../core/commands/process/CamelRouteStatus.java    | 23 ++++++--
 .../jbang/core/commands/process/CamelRouteTop.java |  5 +-
 .../jbang/core/commands/process/ListInflight.java  | 15 ++++++
 .../jbang/core/commands/process/ListProcess.java   | 44 +++++++++++++--
 21 files changed, 260 insertions(+), 29 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
index 9c837ad8a92..a4704984055 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -58,6 +58,11 @@ public interface InflightRepository extends StaticService {
          */
         String getFromRouteId();
 
+        /**
+         * Whether the endpoint is remote where the exchange originates 
(started)
+         */
+        boolean isFromRemoteEndpoint();
+
         /**
          * The id of the route where the exchange currently is being processed
          * <p/>
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
index 53cd68e0187..c606fb47730 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
@@ -261,6 +261,14 @@ public class DefaultInflightRepository extends 
ServiceSupport implements Infligh
             return exchange.getFromRouteId();
         }
 
+        @Override
+        public boolean isFromRemoteEndpoint() {
+            if (exchange.getFromEndpoint() != null) {
+                return exchange.getFromEndpoint().isRemote();
+            }
+            return false;
+        }
+
         @Override
         public String getAtRouteId() {
             return ExchangeHelper.getAtRouteId(exchange);
diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
index 858edf525a8..8ded7df6a0d 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java
@@ -63,6 +63,8 @@ public class ConsumerDevConsole extends AbstractDevConsole {
                     sb.append(String.format("\n    Uri: %s", 
mc.getEndpointUri()));
                     sb.append(String.format("\n    State: %s", mc.getState()));
                     sb.append(String.format("\n    Class: %s", 
mc.getServiceType()));
+                    sb.append(String.format("\n    Remote: %b", 
mc.isRemoteEndpoint()));
+                    sb.append(String.format("\n    Hosted: %b", 
mc.isHostedService()));
                     sb.append(String.format("\n    Inflight: %d", inflight));
                     if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) {
                         sb.append(String.format("\n    Polling: %s", 
mpc.isPolling()));
@@ -74,9 +76,9 @@ public class ConsumerDevConsole extends AbstractDevConsole {
                         sb.append(String.format("\n    Greedy: %s", 
mpc.isGreedy()));
                         sb.append(String.format("\n    Running Logging Level: 
%s", mpc.getRunningLoggingLevel()));
                         sb.append(String.format("\n    Send Empty Message When 
Idle: %s", mpc.isSendEmptyMessageWhenIdle()));
-                        sb.append(String.format("\n    Counter(total: %d 
success: %d error: %d)",
+                        sb.append(String.format("\n    Counter (total: %d 
success: %d error: %d)",
                                 mpc.getCounter(), mpc.getSuccessCounter(), 
mpc.getErrorCounter()));
-                        sb.append(String.format("\n    Delay(initial: %d 
delay: %d unit: %s)",
+                        sb.append(String.format("\n    Delay (initial: %d 
delay: %d unit: %s)",
                                 mpc.getInitialDelay(), mpc.getDelay(), 
mpc.getTimeUnit()));
                         sb.append(String.format(
                                 "\n    Backoff(counter: %d multiplier: %d 
errorThreshold: %d, idleThreshold: %d )",
@@ -113,7 +115,7 @@ public class ConsumerDevConsole extends AbstractDevConsole {
                                     sb.append(String.format("\n    Repeat 
Count: %s", repeatCount));
                                 }
                                 sb.append(String.format("\n    Running Logging 
Level: %s", runLoggingLevel));
-                                sb.append(String.format("\n    Counter(total: 
%s)", counter));
+                                sb.append(String.format("\n    Counter (total: 
%s)", counter));
 
                             }
                         } catch (Exception e) {
@@ -150,6 +152,8 @@ public class ConsumerDevConsole extends AbstractDevConsole {
                     jo.put("uri", mc.getEndpointUri());
                     jo.put("state", mc.getState());
                     jo.put("class", mc.getServiceType());
+                    jo.put("remote", mc.isRemoteEndpoint());
+                    jo.put("hosted", mc.isHostedService());
                     jo.put("inflight", inflight);
                     jo.put("scheduled", false);
                     if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) {
diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
index a4bbdc3f395..5981c5d7a8d 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java
@@ -40,12 +40,13 @@ public class ContextDevConsole extends AbstractDevConsole {
     protected String doCallText(Map<String, Object> options) {
         StringBuilder sb = new StringBuilder();
 
-        sb.append(String.format("Apache Camel %s %s (%s) uptime %s", 
getCamelContext().getVersion(),
-                getCamelContext().getStatus().name().toLowerCase(Locale.ROOT), 
getCamelContext().getName(),
-                CamelContextHelper.getUptime(getCamelContext())));
+        String profile = "";
         if (getCamelContext().getCamelContextExtension().getProfile() != null) 
{
-            sb.append(String.format("\n    Profile: %s", 
getCamelContext().getCamelContextExtension().getProfile()));
+            profile = " (profile: " + 
getCamelContext().getCamelContextExtension().getProfile() + ")";
         }
+        sb.append(String.format("Apache Camel %s %s (%s)%s uptime %s", 
getCamelContext().getVersion(),
+                getCamelContext().getStatus().name().toLowerCase(Locale.ROOT), 
getCamelContext().getName(),
+                profile, CamelContextHelper.getUptime(getCamelContext())));
         if (getCamelContext().getDescription() != null) {
             sb.append(String.format("\n    %s", 
getCamelContext().getDescription()));
         }
@@ -70,9 +71,9 @@ public class ContextDevConsole extends AbstractDevConsole {
                 if (!thp.isEmpty()) {
                     sb.append(String.format("\n    Messages/Sec: %s", thp));
                 }
-                sb.append(String.format("\n    Total: %s", 
mb.getExchangesTotal()));
-                sb.append(String.format("\n    Failed: %s", 
mb.getExchangesFailed()));
-                sb.append(String.format("\n    Inflight: %s", 
mb.getExchangesInflight()));
+                sb.append(String.format("\n    Total: %s/%s", 
mb.getRemoteExchangesTotal(), mb.getExchangesTotal()));
+                sb.append(String.format("\n    Failed: %s/%s", 
mb.getRemoteExchangesFailed(), mb.getExchangesFailed()));
+                sb.append(String.format("\n    Inflight: %s/%s", 
mb.getRemoteExchangesInflight(), mb.getExchangesInflight()));
                 long idle = mb.getIdleSince();
                 if (idle > 0) {
                     sb.append(String.format("\n    Idle Since: %s", 
TimeUtils.printDuration(idle)));
@@ -151,6 +152,9 @@ public class ContextDevConsole extends AbstractDevConsole {
                 stats.put("exchangesTotal", mb.getExchangesTotal());
                 stats.put("exchangesFailed", mb.getExchangesFailed());
                 stats.put("exchangesInflight", mb.getExchangesInflight());
+                stats.put("remoteExchangesTotal", 
mb.getRemoteExchangesTotal());
+                stats.put("remoteExchangesFailed", 
mb.getRemoteExchangesFailed());
+                stats.put("remoteExchangesInflight", 
mb.getRemoteExchangesInflight());
                 stats.put("reloaded", reloaded);
                 stats.put("meanProcessingTime", mb.getMeanProcessingTime());
                 stats.put("maxProcessingTime", mb.getMaxProcessingTime());
diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
index 9f5331f4cf6..6475ec32ff0 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java
@@ -54,6 +54,7 @@ public class EndpointDevConsole extends AbstractDevConsole {
         if (!col.isEmpty()) {
             for (Endpoint e : col) {
                 boolean stub = 
e.getComponent().getClass().getSimpleName().equals("StubComponent");
+                boolean remote = e.isRemote();
                 String uri = e.toString();
                 if (!uri.startsWith("stub:") && stub) {
                     // shadow-stub
@@ -62,9 +63,10 @@ public class EndpointDevConsole extends AbstractDevConsole {
                 var stat = findStats(stats, e.getEndpointUri());
                 if (stat.isPresent()) {
                     var st = stat.get();
-                    sb.append(String.format("\n    %s (direction: %s, usage: 
%s)", uri, st.getDirection(), st.getHits()));
+                    sb.append(String.format("\n    %s (remote: %s direction: 
%s, usage: %s)", uri, remote, st.getDirection(),
+                            st.getHits()));
                 } else {
-                    sb.append(String.format("\n    %s", uri));
+                    sb.append(String.format("\n    %s (remote: %s)", uri, 
remote));
                 }
             }
         }
diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
index 3ad4d5f2583..ad7f6f31b47 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java
@@ -57,8 +57,9 @@ public class InflightConsole extends AbstractDevConsole {
         if (repo.isInflightBrowseEnabled()) {
             for (InflightRepository.InflightExchange ie : repo.browse(filter, 
max, false)) {
                 String age = TimeUtils.printDuration(ie.getDuration(), true);
-                sb.append(String.format("\n    %s (from: %s at: %s/%s age: 
%s)",
-                        ie.getExchange().getExchangeId(), ie.getFromRouteId(), 
ie.getAtRouteId(), ie.getNodeId(), age));
+                sb.append(String.format("\n    %s (from: %s at: %s/%s remote: 
%b age: %s)",
+                        ie.getExchange().getExchangeId(), ie.getFromRouteId(), 
ie.getAtRouteId(), ie.getNodeId(),
+                        ie.isFromRemoteEndpoint(), age));
             }
         }
 
@@ -82,6 +83,7 @@ public class InflightConsole extends AbstractDevConsole {
                 JsonObject props = new JsonObject();
                 props.put("exchangeId", ie.getExchange().getExchangeId());
                 props.put("fromRouteId", ie.getFromRouteId());
+                props.put("fromRemoteEndpoint", ie.isFromRemoteEndpoint());
                 props.put("atRouteId", ie.getAtRouteId());
                 props.put("nodeId", ie.getNodeId());
                 props.put("elapsed", ie.getElapsed());
diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
index 27ad2f51fe4..59d022b5440 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java
@@ -87,6 +87,7 @@ public class RouteDevConsole extends AbstractDevConsole {
                 sb.append(String.format("    Node Prefix Id: %s", 
mrb.getNodePrefixId()));
             }
             sb.append(String.format("\n    From: %s", mrb.getEndpointUri()));
+            sb.append(String.format("\n    Remote: %s", 
mrb.isRemoteEndpoint()));
             if (mrb.getSourceLocation() != null) {
                 sb.append(String.format("\n    Source: %s", 
mrb.getSourceLocation()));
             }
@@ -233,6 +234,7 @@ public class RouteDevConsole extends AbstractDevConsole {
                 jo.put("nodePrefixId", mrb.getNodePrefixId());
             }
             jo.put("from", mrb.getEndpointUri());
+            jo.put("remote", mrb.isRemoteEndpoint());
             if (mrb.getSourceLocation() != null) {
                 jo.put("source", mrb.getSourceLocation());
             }
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
index 2e94ff19b99..a2b8485f749 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
@@ -129,6 +129,18 @@ public interface ManagedCamelContextMBean extends 
ManagedPerformanceCounterMBean
     @ManagedAttribute(description = "Throughput message/second")
     String getThroughput();
 
+    @ManagedAttribute(description = "Total number of exchanges processed from 
remote endpoints only")
+    long getRemoteExchangesTotal();
+
+    @ManagedAttribute(description = "Completed (success) number of exchanges 
processed from remote endpoints only")
+    long getRemoteExchangesCompleted();
+
+    @ManagedAttribute(description = "Failed number of exchanges processed from 
remote endpoints only")
+    long getRemoteExchangesFailed();
+
+    @ManagedAttribute(description = "Total number of exchanges inflight from 
remote endpoints only")
+    long getRemoteExchangesInflight();
+
     @ManagedAttribute(description = "Whether breadcrumbs is in use")
     boolean isUseBreadcrumb();
 
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
index 96404ade564..1cbb9fdace7 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java
@@ -29,4 +29,7 @@ public interface ManagedConsumerMBean extends 
ManagedServiceMBean {
     @ManagedAttribute(description = "Whether this consumer hosts a service 
such as acting as a HTTP server (only available for some components)")
     boolean isHostedService();
 
+    @ManagedAttribute(description = "Whether this consumer connects to remote 
or local systems")
+    boolean isRemoteEndpoint();
+
 }
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
index dcf0a15337c..a71b4773bde 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java
@@ -37,7 +37,7 @@ public interface ManagedEndpointMBean {
     @ManagedAttribute(description = "Singleton")
     boolean isSingleton();
 
-    @ManagedAttribute(description = "Remote")
+    @ManagedAttribute(description = "Whether this endpoint connects to remote 
or local systems")
     boolean isRemote();
 
     @ManagedAttribute(description = "Endpoint State")
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
index 4f40b4b00c0..11f34f096e4 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java
@@ -26,4 +26,7 @@ public interface ManagedProducerMBean extends 
ManagedServiceMBean {
     @ManagedAttribute(description = "Singleton")
     boolean isSingleton();
 
+    @ManagedAttribute(description = "Whether this producer connects to remote 
or local systems")
+    boolean isRemoteEndpoint();
+
 }
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
index e22e393b6a5..514230d8e83 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
@@ -178,4 +178,7 @@ public interface ManagedRouteMBean extends 
ManagedPerformanceCounterMBean {
     @ManagedAttribute(description = "Whether update route from XML is enabled")
     boolean isUpdateRouteEnabled();
 
+    @ManagedAttribute(description = "Whether the consumer connects to remote 
or local systems")
+    boolean isRemoteEndpoint();
+
 }
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
index 78fb78127b5..a7e838b3fe9 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
@@ -64,6 +64,10 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
     private final String jmxDomain;
     private final boolean includeRouteTemplates;
     private final boolean includeKamelets;
+    private Statistic remoteExchangesTotal;
+    private Statistic remoteExchangesCompleted;
+    private Statistic remoteExchangesFailed;
+    private Statistic remoteExchangesInflight;
 
     public ManagedCamelContext(CamelContext context) {
         this.context = context;
@@ -75,11 +79,24 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
     @Override
     public void init(ManagementStrategy strategy) {
         super.init(strategy);
+        this.remoteExchangesTotal = new StatisticCounter();
+        this.remoteExchangesCompleted = new StatisticCounter();
+        this.remoteExchangesFailed = new StatisticCounter();
+        this.remoteExchangesInflight = new StatisticCounter();
         boolean enabled = context.getManagementStrategy().getManagementAgent() 
!= null
                 && 
context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != 
ManagementStatisticsLevel.Off;
         setStatisticsEnabled(enabled);
     }
 
+    @Override
+    public void reset() {
+        super.reset();
+        remoteExchangesTotal.reset();
+        remoteExchangesCompleted.reset();
+        remoteExchangesFailed.reset();
+        remoteExchangesInflight.reset();
+    }
+
     @Override
     public void completedExchange(Exchange exchange, long time) {
         // the camel-context mbean is triggered for every route mbean
@@ -91,9 +108,19 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
             int level = uow.routeStackLevel(includeRouteTemplates, 
includeKamelets);
             if (level <= 1) {
                 super.completedExchange(exchange, time);
+                if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                    remoteExchangesTotal.increment();
+                    remoteExchangesCompleted.increment();
+                    remoteExchangesInflight.decrement();
+                }
             }
         } else {
             super.completedExchange(exchange, time);
+            if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                remoteExchangesTotal.increment();
+                remoteExchangesCompleted.increment();
+                remoteExchangesInflight.decrement();
+            }
         }
     }
 
@@ -108,9 +135,19 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
             int level = uow.routeStackLevel(includeRouteTemplates, 
includeKamelets);
             if (level <= 1) {
                 super.failedExchange(exchange);
+                if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                    remoteExchangesTotal.increment();
+                    remoteExchangesFailed.increment();
+                    remoteExchangesInflight.decrement();
+                }
             }
         } else {
             super.failedExchange(exchange);
+            if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                remoteExchangesTotal.increment();
+                remoteExchangesFailed.increment();
+                remoteExchangesInflight.decrement();
+            }
         }
     }
 
@@ -125,9 +162,15 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
             int level = uow.routeStackLevel(includeRouteTemplates, 
includeKamelets);
             if (level <= 1) {
                 super.processExchange(exchange, type);
+                if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                    remoteExchangesInflight.increment();
+                }
             }
         } else {
             super.processExchange(exchange, type);
+            if (exchange.getFromEndpoint() != null && 
exchange.getFromEndpoint().isRemote()) {
+                remoteExchangesInflight.increment();
+            }
         }
     }
 
@@ -330,6 +373,26 @@ public class ManagedCamelContext extends 
ManagedPerformanceCounter implements Ti
         }
     }
 
+    @Override
+    public long getRemoteExchangesTotal() {
+        return remoteExchangesTotal.getValue();
+    }
+
+    @Override
+    public long getRemoteExchangesCompleted() {
+        return remoteExchangesCompleted.getValue();
+    }
+
+    @Override
+    public long getRemoteExchangesFailed() {
+        return remoteExchangesFailed.getValue();
+    }
+
+    @Override
+    public long getRemoteExchangesInflight() {
+        return remoteExchangesInflight.getValue();
+    }
+
     @Override
     public boolean isUseBreadcrumb() {
         return context.isUseBreadcrumb();
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
index 5766b4c4e25..f4ea8da0545 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
@@ -56,4 +56,9 @@ public class ManagedConsumer extends ManagedService 
implements ManagedConsumerMB
         }
         return false;
     }
+
+    @Override
+    public boolean isRemoteEndpoint() {
+        return consumer.getEndpoint().isRemote();
+    }
 }
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
index 1d08d966ff7..94ce8c0d929 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java
@@ -44,4 +44,8 @@ public class ManagedProducer extends ManagedService 
implements ManagedProducerMB
         return producer.isSingleton();
     }
 
+    @Override
+    public boolean isRemoteEndpoint() {
+        return producer.getEndpoint().isRemote();
+    }
 }
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index 151ae524b0f..ecbed4766b1 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -733,6 +733,14 @@ public class ManagedRoute extends 
ManagedPerformanceCounter implements TimerList
         return enabled != null ? enabled : false;
     }
 
+    @Override
+    public boolean isRemoteEndpoint() {
+        if (route.getEndpoint() != null) {
+            return route.getEndpoint().isRemote();
+        }
+        return false;
+    }
+
     @Override
     public boolean equals(Object o) {
         return this == o || o != null && getClass() == o.getClass() && 
route.equals(((ManagedRoute) o).route);
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
index df7db610e44..c04c6fd1b28 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java
@@ -90,8 +90,20 @@ public class CamelContextStatus extends ProcessWatchCommand {
                                 row.throughput = thp.toString();
                             }
                             row.total = stats.get("exchangesTotal").toString();
-                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            Object num = stats.get("remoteExchangesTotal");
+                            if (num != null) {
+                                row.totalRemote = num.toString();
+                            }
                             row.failed = 
stats.get("exchangesFailed").toString();
+                            num = stats.get("remoteExchangesFailed");
+                            if (num != null) {
+                                row.failedRemote = num.toString();
+                            }
+                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            num = stats.get("remoteExchangesInflight");
+                            if (num != null) {
+                                row.inflightRemote = num.toString();
+                            }
                             row.reloaded = stats.get("reloaded").toString();
                             Object last = stats.get("lastProcessingTime");
                             if (last != null) {
@@ -157,9 +169,9 @@ public class CamelContextStatus extends ProcessWatchCommand 
{
                     new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
                     new Column().header("ROUTE").with(this::getRoutes),
                     new Column().header("MSG/S").with(this::getThroughput),
-                    new Column().header("TOTAL").with(r -> r.total),
-                    new Column().header("FAIL").with(r -> r.failed),
-                    new Column().header("INFLIGHT").with(r -> r.inflight),
+                    new Column().header("TOTAL").with(this::getTotal),
+                    new Column().header("FAIL").with(this::getFailed),
+                    new Column().header("INFLIGHT").with(this::getInflight),
                     new Column().header("LAST").with(r -> r.last),
                     new Column().header("DELTA").with(this::getDelta),
                     new 
Column().header("SINCE-LAST").with(this::getSinceLast))));
@@ -208,6 +220,27 @@ public class CamelContextStatus extends 
ProcessWatchCommand {
         }
     }
 
+    private String getTotal(Row r) {
+        if (r.totalRemote != null) {
+            return r.totalRemote + "/" + r.total;
+        }
+        return r.total;
+    }
+
+    private String getFailed(Row r) {
+        if (r.failedRemote != null) {
+            return r.failedRemote + "/" + r.failed;
+        }
+        return r.failed;
+    }
+
+    private String getInflight(Row r) {
+        if (r.inflightRemote != null) {
+            return r.inflightRemote + "/" + r.inflight;
+        }
+        return r.inflight;
+    }
+
     private String getPlatform(Row r) {
         if (r.platformVersion != null) {
             return r.platform + " v" + r.platformVersion;
@@ -271,8 +304,11 @@ public class CamelContextStatus extends 
ProcessWatchCommand {
         long uptime;
         String throughput;
         String total;
+        String totalRemote;
         String failed;
+        String failedRemote;
         String inflight;
+        String inflightRemote;
         String last;
         String delta;
         String sinceLastStarted;
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
index 89939bedc35..ac3eb7464f4 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.github.freva.asciitable.AsciiTable;
 import com.github.freva.asciitable.Column;
@@ -73,6 +74,7 @@ public class CamelRouteStatus extends ProcessWatchCommand {
     public Integer doProcessWatchCall() throws Exception {
         List<Row> rows = new ArrayList<>();
 
+        AtomicBoolean remoteVisible = new AtomicBoolean();
         List<Long> pids = findPids(name);
         ProcessHandle.allProcesses()
                 .filter(ph -> pids.contains(ph.pid()))
@@ -94,6 +96,12 @@ public class CamelRouteStatus extends ProcessWatchCommand {
                             row.pid = Long.toString(ph.pid());
                             row.routeId = o.getString("routeId");
                             row.from = o.getString("from");
+                            Boolean bool = o.getBoolean("remote");
+                            if (bool != null) {
+                                // older camel versions does not include this 
information
+                                remoteVisible.set(true);
+                                row.remote = bool;
+                            }
                             row.source = o.getString("source");
                             row.state = o.getString("state");
                             row.age = o.getString("uptime");
@@ -172,13 +180,13 @@ public class CamelRouteStatus extends ProcessWatchCommand 
{
         rows.sort(this::sortRow);
 
         if (!rows.isEmpty()) {
-            printTable(rows);
+            printTable(rows, remoteVisible.get());
         }
 
         return 0;
     }
 
-    protected void printTable(List<Row> rows) {
+    protected void printTable(List<Row> rows, boolean remoteVisible) {
         printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
                 new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
                 new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -188,9 +196,9 @@ public class CamelRouteStatus extends ProcessWatchCommand {
                 new 
Column().header("FROM").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
                         .maxWidth(45, OverflowBehaviour.ELLIPSIS_RIGHT)
                         .with(this::getFrom),
-                new 
Column().header("FROM").visible(wideUri).dataAlign(HorizontalAlign.LEFT)
-                        .maxWidth(45, OverflowBehaviour.NEWLINE)
-                        .with(this::getFrom),
+                new 
Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER)
+                        .dataAlign(HorizontalAlign.CENTER)
+                        .with(this::getRemote),
                 new 
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
                         .with(r -> r.state),
                 new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
@@ -260,6 +268,10 @@ public class CamelRouteStatus extends ProcessWatchCommand {
         return s;
     }
 
+    protected String getRemote(Row r) {
+        return r.remote ? "x" : "";
+    }
+
     protected String getId(Row r) {
         if (source && r.source != null) {
             return sourceLocLine(r.source);
@@ -286,6 +298,7 @@ public class CamelRouteStatus extends ProcessWatchCommand {
         long uptime;
         String routeId;
         String from;
+        boolean remote;
         String source;
         String state;
         String age;
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
index 31cfc6a3a56..75d65f92a70 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java
@@ -35,7 +35,7 @@ public class CamelRouteTop extends CamelRouteStatus {
     }
 
     @Override
-    protected void printTable(List<Row> rows) {
+    protected void printTable(List<Row> rows, boolean remoteVisible) {
         printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
                 new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
                 new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -44,6 +44,9 @@ public class CamelRouteTop extends CamelRouteStatus {
                         .with(this::getId),
                 new 
Column().header("FROM").dataAlign(HorizontalAlign.LEFT).maxWidth(40, 
OverflowBehaviour.ELLIPSIS_RIGHT)
                         .with(this::getFrom),
+                new 
Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER)
+                        .dataAlign(HorizontalAlign.CENTER)
+                        .with(this::getRemote),
                 new 
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
                         .with(r -> r.state),
                 new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
index b65dd342f48..e0c39a4ccfb 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java
@@ -19,6 +19,7 @@ package org.apache.camel.dsl.jbang.core.commands.process;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.github.freva.asciitable.AsciiTable;
 import com.github.freva.asciitable.Column;
@@ -52,6 +53,7 @@ public class ListInflight extends ProcessWatchCommand {
     public Integer doProcessWatchCall() throws Exception {
         List<Row> rows = new ArrayList<>();
 
+        AtomicBoolean remoteVisible = new AtomicBoolean();
         List<Long> pids = findPids(name);
         ProcessHandle.allProcesses()
                 .filter(ph -> pids.contains(ph.pid()))
@@ -81,6 +83,12 @@ public class ListInflight extends ProcessWatchCommand {
                                     jo = (JsonObject) arr.get(i);
                                     row.exchangeId = 
jo.getString("exchangeId");
                                     row.fromRouteId = 
jo.getString("fromRouteId");
+                                    Boolean bool = 
jo.getBoolean("fromRemoteEndpoint");
+                                    if (bool != null) {
+                                        // older camel versions does not 
include this information
+                                        remoteVisible.set(true);
+                                        row.fromRemoteEndpoint = bool;
+                                    }
                                     row.atRouteId = jo.getString("atRouteId");
                                     row.nodeId = jo.getString("nodeId");
                                     row.elapsed = jo.getLong("elapsed");
@@ -101,6 +109,8 @@ public class ListInflight extends ProcessWatchCommand {
                     new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
                             .with(r -> r.name),
                     new 
Column().header("EXCHANGE-ID").dataAlign(HorizontalAlign.LEFT).with(r -> 
r.exchangeId),
+                    new 
Column().header("REMOTE").visible(remoteVisible.get()).dataAlign(HorizontalAlign.CENTER)
+                            .with(this::getRemote),
                     new 
Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).maxWidth(25, 
OverflowBehaviour.ELLIPSIS_RIGHT)
                             .with(r -> r.atRouteId),
                     new 
Column().header("ID").dataAlign(HorizontalAlign.LEFT).maxWidth(25, 
OverflowBehaviour.ELLIPSIS_RIGHT)
@@ -139,6 +149,10 @@ public class ListInflight extends ProcessWatchCommand {
         return TimeUtils.printDuration(r.elapsed);
     }
 
+    private String getRemote(Row r) {
+        return r.fromRemoteEndpoint ? "x" : "";
+    }
+
     private static class Row implements Cloneable {
         String pid;
         String name;
@@ -146,6 +160,7 @@ public class ListInflight extends ProcessWatchCommand {
         long uptime;
         String exchangeId;
         String fromRouteId;
+        boolean fromRemoteEndpoint;
         String atRouteId;
         String nodeId;
         long elapsed;
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
index dd9c2213b90..a3dec68a651 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java
@@ -82,8 +82,20 @@ public class ListProcess extends ProcessWatchCommand {
                         Map<String, ?> stats = context.getMap("statistics");
                         if (stats != null) {
                             row.total = stats.get("exchangesTotal").toString();
-                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            Object num = stats.get("remoteExchangesTotal");
+                            if (num != null) {
+                                row.totalRemote = num.toString();
+                            }
                             row.failed = 
stats.get("exchangesFailed").toString();
+                            num = stats.get("remoteExchangesFailed");
+                            if (num != null) {
+                                row.failedRemote = num.toString();
+                            }
+                            row.inflight = 
stats.get("exchangesInflight").toString();
+                            num = stats.get("remoteExchangesInflight");
+                            if (num != null) {
+                                row.inflightRemote = num.toString();
+                            }
                         }
                         rows.add(row);
                     }
@@ -105,15 +117,36 @@ public class ListProcess extends ProcessWatchCommand {
                         new 
Column().header("STATUS").headerAlign(HorizontalAlign.CENTER)
                                 .with(r -> extractState(r.state)),
                         new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.ago),
-                        new Column().header("TOTAL").with(r -> r.total),
-                        new Column().header("FAIL").with(r -> r.failed),
-                        new Column().header("INFLIGHT").with(r -> 
r.inflight))));
+                        new Column().header("TOTAL").with(this::getTotal),
+                        new Column().header("FAIL").with(this::getFailed),
+                        new 
Column().header("INFLIGHT").with(this::getInflight))));
             }
         }
 
         return 0;
     }
 
+    private String getTotal(Row r) {
+        if (r.totalRemote != null) {
+            return r.totalRemote + "/" + r.total;
+        }
+        return r.total;
+    }
+
+    private String getFailed(Row r) {
+        if (r.failedRemote != null) {
+            return r.failedRemote + "/" + r.failed;
+        }
+        return r.failed;
+    }
+
+    private String getInflight(Row r) {
+        if (r.inflightRemote != null) {
+            return r.inflightRemote + "/" + r.inflight;
+        }
+        return r.inflight;
+    }
+
     protected int sortRow(Row o1, Row o2) {
         String s = sort;
         int negate = 1;
@@ -141,8 +174,11 @@ public class ListProcess extends ProcessWatchCommand {
         String ago;
         long uptime;
         String total;
+        String totalRemote;
         String failed;
+        String failedRemote;
         String inflight;
+        String inflightRemote;
     }
 
 }


Reply via email to