This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 87c8e832e9f remote vs local counter to better observe how many internal and external messages is being processed (#14617) 87c8e832e9f is described below commit 87c8e832e9f73ae1f33867085f82b9303bb69939 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Jun 22 15:34:59 2024 +0200 remote vs local counter to better observe how many internal and external messages is being processed (#14617) CAMEL-20879: camel-core: remote vs local endpoints counters --- .../org/apache/camel/spi/InflightRepository.java | 5 ++ .../impl/engine/DefaultInflightRepository.java | 8 +++ .../camel/impl/console/ConsumerDevConsole.java | 10 ++-- .../camel/impl/console/ContextDevConsole.java | 18 ++++--- .../camel/impl/console/EndpointDevConsole.java | 6 ++- .../apache/camel/impl/console/InflightConsole.java | 6 ++- .../apache/camel/impl/console/RouteDevConsole.java | 2 + .../management/mbean/ManagedCamelContextMBean.java | 12 +++++ .../api/management/mbean/ManagedConsumerMBean.java | 3 ++ .../api/management/mbean/ManagedEndpointMBean.java | 2 +- .../api/management/mbean/ManagedProducerMBean.java | 3 ++ .../api/management/mbean/ManagedRouteMBean.java | 3 ++ .../management/mbean/ManagedCamelContext.java | 63 ++++++++++++++++++++++ .../camel/management/mbean/ManagedConsumer.java | 5 ++ .../camel/management/mbean/ManagedProducer.java | 4 ++ .../camel/management/mbean/ManagedRoute.java | 8 +++ .../core/commands/process/CamelContextStatus.java | 44 +++++++++++++-- .../core/commands/process/CamelRouteStatus.java | 23 ++++++-- .../jbang/core/commands/process/CamelRouteTop.java | 5 +- .../jbang/core/commands/process/ListInflight.java | 15 ++++++ .../jbang/core/commands/process/ListProcess.java | 44 +++++++++++++-- 21 files changed, 260 insertions(+), 29 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java index 9c837ad8a92..a4704984055 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InflightRepository.java @@ -58,6 +58,11 @@ public interface InflightRepository extends StaticService { */ String getFromRouteId(); + /** + * Whether the endpoint is remote where the exchange originates (started) + */ + boolean isFromRemoteEndpoint(); + /** * The id of the route where the exchange currently is being processed * <p/> diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java index 53cd68e0187..c606fb47730 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java @@ -261,6 +261,14 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh return exchange.getFromRouteId(); } + @Override + public boolean isFromRemoteEndpoint() { + if (exchange.getFromEndpoint() != null) { + return exchange.getFromEndpoint().isRemote(); + } + return false; + } + @Override public String getAtRouteId() { return ExchangeHelper.getAtRouteId(exchange); diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java index 858edf525a8..8ded7df6a0d 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java @@ -63,6 +63,8 @@ public class ConsumerDevConsole extends AbstractDevConsole { sb.append(String.format("\n Uri: %s", mc.getEndpointUri())); sb.append(String.format("\n State: %s", mc.getState())); sb.append(String.format("\n Class: %s", mc.getServiceType())); + sb.append(String.format("\n Remote: %b", mc.isRemoteEndpoint())); + sb.append(String.format("\n Hosted: %b", mc.isHostedService())); sb.append(String.format("\n Inflight: %d", inflight)); if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) { sb.append(String.format("\n Polling: %s", mpc.isPolling())); @@ -74,9 +76,9 @@ public class ConsumerDevConsole extends AbstractDevConsole { sb.append(String.format("\n Greedy: %s", mpc.isGreedy())); sb.append(String.format("\n Running Logging Level: %s", mpc.getRunningLoggingLevel())); sb.append(String.format("\n Send Empty Message When Idle: %s", mpc.isSendEmptyMessageWhenIdle())); - sb.append(String.format("\n Counter(total: %d success: %d error: %d)", + sb.append(String.format("\n Counter (total: %d success: %d error: %d)", mpc.getCounter(), mpc.getSuccessCounter(), mpc.getErrorCounter())); - sb.append(String.format("\n Delay(initial: %d delay: %d unit: %s)", + sb.append(String.format("\n Delay (initial: %d delay: %d unit: %s)", mpc.getInitialDelay(), mpc.getDelay(), mpc.getTimeUnit())); sb.append(String.format( "\n Backoff(counter: %d multiplier: %d errorThreshold: %d, idleThreshold: %d )", @@ -113,7 +115,7 @@ public class ConsumerDevConsole extends AbstractDevConsole { sb.append(String.format("\n Repeat Count: %s", repeatCount)); } sb.append(String.format("\n Running Logging Level: %s", runLoggingLevel)); - sb.append(String.format("\n Counter(total: %s)", counter)); + sb.append(String.format("\n Counter (total: %s)", counter)); } } catch (Exception e) { @@ -150,6 +152,8 @@ public class ConsumerDevConsole extends AbstractDevConsole { jo.put("uri", mc.getEndpointUri()); jo.put("state", mc.getState()); jo.put("class", mc.getServiceType()); + jo.put("remote", mc.isRemoteEndpoint()); + jo.put("hosted", mc.isHostedService()); jo.put("inflight", inflight); jo.put("scheduled", false); if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) { diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java index a4bbdc3f395..5981c5d7a8d 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/ContextDevConsole.java @@ -40,12 +40,13 @@ public class ContextDevConsole extends AbstractDevConsole { protected String doCallText(Map<String, Object> options) { StringBuilder sb = new StringBuilder(); - sb.append(String.format("Apache Camel %s %s (%s) uptime %s", getCamelContext().getVersion(), - getCamelContext().getStatus().name().toLowerCase(Locale.ROOT), getCamelContext().getName(), - CamelContextHelper.getUptime(getCamelContext()))); + String profile = ""; if (getCamelContext().getCamelContextExtension().getProfile() != null) { - sb.append(String.format("\n Profile: %s", getCamelContext().getCamelContextExtension().getProfile())); + profile = " (profile: " + getCamelContext().getCamelContextExtension().getProfile() + ")"; } + sb.append(String.format("Apache Camel %s %s (%s)%s uptime %s", getCamelContext().getVersion(), + getCamelContext().getStatus().name().toLowerCase(Locale.ROOT), getCamelContext().getName(), + profile, CamelContextHelper.getUptime(getCamelContext()))); if (getCamelContext().getDescription() != null) { sb.append(String.format("\n %s", getCamelContext().getDescription())); } @@ -70,9 +71,9 @@ public class ContextDevConsole extends AbstractDevConsole { if (!thp.isEmpty()) { sb.append(String.format("\n Messages/Sec: %s", thp)); } - sb.append(String.format("\n Total: %s", mb.getExchangesTotal())); - sb.append(String.format("\n Failed: %s", mb.getExchangesFailed())); - sb.append(String.format("\n Inflight: %s", mb.getExchangesInflight())); + sb.append(String.format("\n Total: %s/%s", mb.getRemoteExchangesTotal(), mb.getExchangesTotal())); + sb.append(String.format("\n Failed: %s/%s", mb.getRemoteExchangesFailed(), mb.getExchangesFailed())); + sb.append(String.format("\n Inflight: %s/%s", mb.getRemoteExchangesInflight(), mb.getExchangesInflight())); long idle = mb.getIdleSince(); if (idle > 0) { sb.append(String.format("\n Idle Since: %s", TimeUtils.printDuration(idle))); @@ -151,6 +152,9 @@ public class ContextDevConsole extends AbstractDevConsole { stats.put("exchangesTotal", mb.getExchangesTotal()); stats.put("exchangesFailed", mb.getExchangesFailed()); stats.put("exchangesInflight", mb.getExchangesInflight()); + stats.put("remoteExchangesTotal", mb.getRemoteExchangesTotal()); + stats.put("remoteExchangesFailed", mb.getRemoteExchangesFailed()); + stats.put("remoteExchangesInflight", mb.getRemoteExchangesInflight()); stats.put("reloaded", reloaded); stats.put("meanProcessingTime", mb.getMeanProcessingTime()); stats.put("maxProcessingTime", mb.getMaxProcessingTime()); diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java index 9f5331f4cf6..6475ec32ff0 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/EndpointDevConsole.java @@ -54,6 +54,7 @@ public class EndpointDevConsole extends AbstractDevConsole { if (!col.isEmpty()) { for (Endpoint e : col) { boolean stub = e.getComponent().getClass().getSimpleName().equals("StubComponent"); + boolean remote = e.isRemote(); String uri = e.toString(); if (!uri.startsWith("stub:") && stub) { // shadow-stub @@ -62,9 +63,10 @@ public class EndpointDevConsole extends AbstractDevConsole { var stat = findStats(stats, e.getEndpointUri()); if (stat.isPresent()) { var st = stat.get(); - sb.append(String.format("\n %s (direction: %s, usage: %s)", uri, st.getDirection(), st.getHits())); + sb.append(String.format("\n %s (remote: %s direction: %s, usage: %s)", uri, remote, st.getDirection(), + st.getHits())); } else { - sb.append(String.format("\n %s", uri)); + sb.append(String.format("\n %s (remote: %s)", uri, remote)); } } } diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java index 3ad4d5f2583..ad7f6f31b47 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/InflightConsole.java @@ -57,8 +57,9 @@ public class InflightConsole extends AbstractDevConsole { if (repo.isInflightBrowseEnabled()) { for (InflightRepository.InflightExchange ie : repo.browse(filter, max, false)) { String age = TimeUtils.printDuration(ie.getDuration(), true); - sb.append(String.format("\n %s (from: %s at: %s/%s age: %s)", - ie.getExchange().getExchangeId(), ie.getFromRouteId(), ie.getAtRouteId(), ie.getNodeId(), age)); + sb.append(String.format("\n %s (from: %s at: %s/%s remote: %b age: %s)", + ie.getExchange().getExchangeId(), ie.getFromRouteId(), ie.getAtRouteId(), ie.getNodeId(), + ie.isFromRemoteEndpoint(), age)); } } @@ -82,6 +83,7 @@ public class InflightConsole extends AbstractDevConsole { JsonObject props = new JsonObject(); props.put("exchangeId", ie.getExchange().getExchangeId()); props.put("fromRouteId", ie.getFromRouteId()); + props.put("fromRemoteEndpoint", ie.isFromRemoteEndpoint()); props.put("atRouteId", ie.getAtRouteId()); props.put("nodeId", ie.getNodeId()); props.put("elapsed", ie.getElapsed()); diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java index 27ad2f51fe4..59d022b5440 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteDevConsole.java @@ -87,6 +87,7 @@ public class RouteDevConsole extends AbstractDevConsole { sb.append(String.format(" Node Prefix Id: %s", mrb.getNodePrefixId())); } sb.append(String.format("\n From: %s", mrb.getEndpointUri())); + sb.append(String.format("\n Remote: %s", mrb.isRemoteEndpoint())); if (mrb.getSourceLocation() != null) { sb.append(String.format("\n Source: %s", mrb.getSourceLocation())); } @@ -233,6 +234,7 @@ public class RouteDevConsole extends AbstractDevConsole { jo.put("nodePrefixId", mrb.getNodePrefixId()); } jo.put("from", mrb.getEndpointUri()); + jo.put("remote", mrb.isRemoteEndpoint()); if (mrb.getSourceLocation() != null) { jo.put("source", mrb.getSourceLocation()); } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java index 2e94ff19b99..a2b8485f749 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java @@ -129,6 +129,18 @@ public interface ManagedCamelContextMBean extends ManagedPerformanceCounterMBean @ManagedAttribute(description = "Throughput message/second") String getThroughput(); + @ManagedAttribute(description = "Total number of exchanges processed from remote endpoints only") + long getRemoteExchangesTotal(); + + @ManagedAttribute(description = "Completed (success) number of exchanges processed from remote endpoints only") + long getRemoteExchangesCompleted(); + + @ManagedAttribute(description = "Failed number of exchanges processed from remote endpoints only") + long getRemoteExchangesFailed(); + + @ManagedAttribute(description = "Total number of exchanges inflight from remote endpoints only") + long getRemoteExchangesInflight(); + @ManagedAttribute(description = "Whether breadcrumbs is in use") boolean isUseBreadcrumb(); diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java index 96404ade564..1cbb9fdace7 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedConsumerMBean.java @@ -29,4 +29,7 @@ public interface ManagedConsumerMBean extends ManagedServiceMBean { @ManagedAttribute(description = "Whether this consumer hosts a service such as acting as a HTTP server (only available for some components)") boolean isHostedService(); + @ManagedAttribute(description = "Whether this consumer connects to remote or local systems") + boolean isRemoteEndpoint(); + } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java index dcf0a15337c..a71b4773bde 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedEndpointMBean.java @@ -37,7 +37,7 @@ public interface ManagedEndpointMBean { @ManagedAttribute(description = "Singleton") boolean isSingleton(); - @ManagedAttribute(description = "Remote") + @ManagedAttribute(description = "Whether this endpoint connects to remote or local systems") boolean isRemote(); @ManagedAttribute(description = "Endpoint State") diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java index 4f40b4b00c0..11f34f096e4 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerMBean.java @@ -26,4 +26,7 @@ public interface ManagedProducerMBean extends ManagedServiceMBean { @ManagedAttribute(description = "Singleton") boolean isSingleton(); + @ManagedAttribute(description = "Whether this producer connects to remote or local systems") + boolean isRemoteEndpoint(); + } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java index e22e393b6a5..514230d8e83 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java @@ -178,4 +178,7 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean { @ManagedAttribute(description = "Whether update route from XML is enabled") boolean isUpdateRouteEnabled(); + @ManagedAttribute(description = "Whether the consumer connects to remote or local systems") + boolean isRemoteEndpoint(); + } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java index 78fb78127b5..a7e838b3fe9 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java @@ -64,6 +64,10 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti private final String jmxDomain; private final boolean includeRouteTemplates; private final boolean includeKamelets; + private Statistic remoteExchangesTotal; + private Statistic remoteExchangesCompleted; + private Statistic remoteExchangesFailed; + private Statistic remoteExchangesInflight; public ManagedCamelContext(CamelContext context) { this.context = context; @@ -75,11 +79,24 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti @Override public void init(ManagementStrategy strategy) { super.init(strategy); + this.remoteExchangesTotal = new StatisticCounter(); + this.remoteExchangesCompleted = new StatisticCounter(); + this.remoteExchangesFailed = new StatisticCounter(); + this.remoteExchangesInflight = new StatisticCounter(); boolean enabled = context.getManagementStrategy().getManagementAgent() != null && context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off; setStatisticsEnabled(enabled); } + @Override + public void reset() { + super.reset(); + remoteExchangesTotal.reset(); + remoteExchangesCompleted.reset(); + remoteExchangesFailed.reset(); + remoteExchangesInflight.reset(); + } + @Override public void completedExchange(Exchange exchange, long time) { // the camel-context mbean is triggered for every route mbean @@ -91,9 +108,19 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti int level = uow.routeStackLevel(includeRouteTemplates, includeKamelets); if (level <= 1) { super.completedExchange(exchange, time); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesTotal.increment(); + remoteExchangesCompleted.increment(); + remoteExchangesInflight.decrement(); + } } } else { super.completedExchange(exchange, time); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesTotal.increment(); + remoteExchangesCompleted.increment(); + remoteExchangesInflight.decrement(); + } } } @@ -108,9 +135,19 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti int level = uow.routeStackLevel(includeRouteTemplates, includeKamelets); if (level <= 1) { super.failedExchange(exchange); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesTotal.increment(); + remoteExchangesFailed.increment(); + remoteExchangesInflight.decrement(); + } } } else { super.failedExchange(exchange); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesTotal.increment(); + remoteExchangesFailed.increment(); + remoteExchangesInflight.decrement(); + } } } @@ -125,9 +162,15 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti int level = uow.routeStackLevel(includeRouteTemplates, includeKamelets); if (level <= 1) { super.processExchange(exchange, type); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesInflight.increment(); + } } } else { super.processExchange(exchange, type); + if (exchange.getFromEndpoint() != null && exchange.getFromEndpoint().isRemote()) { + remoteExchangesInflight.increment(); + } } } @@ -330,6 +373,26 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti } } + @Override + public long getRemoteExchangesTotal() { + return remoteExchangesTotal.getValue(); + } + + @Override + public long getRemoteExchangesCompleted() { + return remoteExchangesCompleted.getValue(); + } + + @Override + public long getRemoteExchangesFailed() { + return remoteExchangesFailed.getValue(); + } + + @Override + public long getRemoteExchangesInflight() { + return remoteExchangesInflight.getValue(); + } + @Override public boolean isUseBreadcrumb() { return context.isUseBreadcrumb(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java index 5766b4c4e25..f4ea8da0545 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java @@ -56,4 +56,9 @@ public class ManagedConsumer extends ManagedService implements ManagedConsumerMB } return false; } + + @Override + public boolean isRemoteEndpoint() { + return consumer.getEndpoint().isRemote(); + } } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java index 1d08d966ff7..94ce8c0d929 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProducer.java @@ -44,4 +44,8 @@ public class ManagedProducer extends ManagedService implements ManagedProducerMB return producer.isSingleton(); } + @Override + public boolean isRemoteEndpoint() { + return producer.getEndpoint().isRemote(); + } } diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index 151ae524b0f..ecbed4766b1 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -733,6 +733,14 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList return enabled != null ? enabled : false; } + @Override + public boolean isRemoteEndpoint() { + if (route.getEndpoint() != null) { + return route.getEndpoint().isRemote(); + } + return false; + } + @Override public boolean equals(Object o) { return this == o || o != null && getClass() == o.getClass() && route.equals(((ManagedRoute) o).route); diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java index df7db610e44..c04c6fd1b28 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelContextStatus.java @@ -90,8 +90,20 @@ public class CamelContextStatus extends ProcessWatchCommand { row.throughput = thp.toString(); } row.total = stats.get("exchangesTotal").toString(); - row.inflight = stats.get("exchangesInflight").toString(); + Object num = stats.get("remoteExchangesTotal"); + if (num != null) { + row.totalRemote = num.toString(); + } row.failed = stats.get("exchangesFailed").toString(); + num = stats.get("remoteExchangesFailed"); + if (num != null) { + row.failedRemote = num.toString(); + } + row.inflight = stats.get("exchangesInflight").toString(); + num = stats.get("remoteExchangesInflight"); + if (num != null) { + row.inflightRemote = num.toString(); + } row.reloaded = stats.get("reloaded").toString(); Object last = stats.get("lastProcessingTime"); if (last != null) { @@ -157,9 +169,9 @@ public class CamelContextStatus extends ProcessWatchCommand { new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age), new Column().header("ROUTE").with(this::getRoutes), new Column().header("MSG/S").with(this::getThroughput), - new Column().header("TOTAL").with(r -> r.total), - new Column().header("FAIL").with(r -> r.failed), - new Column().header("INFLIGHT").with(r -> r.inflight), + new Column().header("TOTAL").with(this::getTotal), + new Column().header("FAIL").with(this::getFailed), + new Column().header("INFLIGHT").with(this::getInflight), new Column().header("LAST").with(r -> r.last), new Column().header("DELTA").with(this::getDelta), new Column().header("SINCE-LAST").with(this::getSinceLast)))); @@ -208,6 +220,27 @@ public class CamelContextStatus extends ProcessWatchCommand { } } + private String getTotal(Row r) { + if (r.totalRemote != null) { + return r.totalRemote + "/" + r.total; + } + return r.total; + } + + private String getFailed(Row r) { + if (r.failedRemote != null) { + return r.failedRemote + "/" + r.failed; + } + return r.failed; + } + + private String getInflight(Row r) { + if (r.inflightRemote != null) { + return r.inflightRemote + "/" + r.inflight; + } + return r.inflight; + } + private String getPlatform(Row r) { if (r.platformVersion != null) { return r.platform + " v" + r.platformVersion; @@ -271,8 +304,11 @@ public class CamelContextStatus extends ProcessWatchCommand { long uptime; String throughput; String total; + String totalRemote; String failed; + String failedRemote; String inflight; + String inflightRemote; String last; String delta; String sinceLastStarted; diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java index 89939bedc35..ac3eb7464f4 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteStatus.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import com.github.freva.asciitable.AsciiTable; import com.github.freva.asciitable.Column; @@ -73,6 +74,7 @@ public class CamelRouteStatus extends ProcessWatchCommand { public Integer doProcessWatchCall() throws Exception { List<Row> rows = new ArrayList<>(); + AtomicBoolean remoteVisible = new AtomicBoolean(); List<Long> pids = findPids(name); ProcessHandle.allProcesses() .filter(ph -> pids.contains(ph.pid())) @@ -94,6 +96,12 @@ public class CamelRouteStatus extends ProcessWatchCommand { row.pid = Long.toString(ph.pid()); row.routeId = o.getString("routeId"); row.from = o.getString("from"); + Boolean bool = o.getBoolean("remote"); + if (bool != null) { + // older camel versions does not include this information + remoteVisible.set(true); + row.remote = bool; + } row.source = o.getString("source"); row.state = o.getString("state"); row.age = o.getString("uptime"); @@ -172,13 +180,13 @@ public class CamelRouteStatus extends ProcessWatchCommand { rows.sort(this::sortRow); if (!rows.isEmpty()) { - printTable(rows); + printTable(rows, remoteVisible.get()); } return 0; } - protected void printTable(List<Row> rows) { + protected void printTable(List<Row> rows, boolean remoteVisible) { printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) @@ -188,9 +196,9 @@ public class CamelRouteStatus extends ProcessWatchCommand { new Column().header("FROM").visible(!wideUri).dataAlign(HorizontalAlign.LEFT) .maxWidth(45, OverflowBehaviour.ELLIPSIS_RIGHT) .with(this::getFrom), - new Column().header("FROM").visible(wideUri).dataAlign(HorizontalAlign.LEFT) - .maxWidth(45, OverflowBehaviour.NEWLINE) - .with(this::getFrom), + new Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER) + .dataAlign(HorizontalAlign.CENTER) + .with(this::getRemote), new Column().header("STATUS").headerAlign(HorizontalAlign.CENTER) .with(r -> r.state), new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age), @@ -260,6 +268,10 @@ public class CamelRouteStatus extends ProcessWatchCommand { return s; } + protected String getRemote(Row r) { + return r.remote ? "x" : ""; + } + protected String getId(Row r) { if (source && r.source != null) { return sourceLocLine(r.source); @@ -286,6 +298,7 @@ public class CamelRouteStatus extends ProcessWatchCommand { long uptime; String routeId; String from; + boolean remote; String source; String state; String age; diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java index 31cfc6a3a56..75d65f92a70 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/CamelRouteTop.java @@ -35,7 +35,7 @@ public class CamelRouteTop extends CamelRouteStatus { } @Override - protected void printTable(List<Row> rows) { + protected void printTable(List<Row> rows, boolean remoteVisible) { printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) @@ -44,6 +44,9 @@ public class CamelRouteTop extends CamelRouteStatus { .with(this::getId), new Column().header("FROM").dataAlign(HorizontalAlign.LEFT).maxWidth(40, OverflowBehaviour.ELLIPSIS_RIGHT) .with(this::getFrom), + new Column().header("REMOTE").visible(remoteVisible).headerAlign(HorizontalAlign.CENTER) + .dataAlign(HorizontalAlign.CENTER) + .with(this::getRemote), new Column().header("STATUS").headerAlign(HorizontalAlign.CENTER) .with(r -> r.state), new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age), diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java index b65dd342f48..e0c39a4ccfb 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInflight.java @@ -19,6 +19,7 @@ package org.apache.camel.dsl.jbang.core.commands.process; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import com.github.freva.asciitable.AsciiTable; import com.github.freva.asciitable.Column; @@ -52,6 +53,7 @@ public class ListInflight extends ProcessWatchCommand { public Integer doProcessWatchCall() throws Exception { List<Row> rows = new ArrayList<>(); + AtomicBoolean remoteVisible = new AtomicBoolean(); List<Long> pids = findPids(name); ProcessHandle.allProcesses() .filter(ph -> pids.contains(ph.pid())) @@ -81,6 +83,12 @@ public class ListInflight extends ProcessWatchCommand { jo = (JsonObject) arr.get(i); row.exchangeId = jo.getString("exchangeId"); row.fromRouteId = jo.getString("fromRouteId"); + Boolean bool = jo.getBoolean("fromRemoteEndpoint"); + if (bool != null) { + // older camel versions does not include this information + remoteVisible.set(true); + row.fromRemoteEndpoint = bool; + } row.atRouteId = jo.getString("atRouteId"); row.nodeId = jo.getString("nodeId"); row.elapsed = jo.getLong("elapsed"); @@ -101,6 +109,8 @@ public class ListInflight extends ProcessWatchCommand { new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) .with(r -> r.name), new Column().header("EXCHANGE-ID").dataAlign(HorizontalAlign.LEFT).with(r -> r.exchangeId), + new Column().header("REMOTE").visible(remoteVisible.get()).dataAlign(HorizontalAlign.CENTER) + .with(this::getRemote), new Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT) .with(r -> r.atRouteId), new Column().header("ID").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT) @@ -139,6 +149,10 @@ public class ListInflight extends ProcessWatchCommand { return TimeUtils.printDuration(r.elapsed); } + private String getRemote(Row r) { + return r.fromRemoteEndpoint ? "x" : ""; + } + private static class Row implements Cloneable { String pid; String name; @@ -146,6 +160,7 @@ public class ListInflight extends ProcessWatchCommand { long uptime; String exchangeId; String fromRouteId; + boolean fromRemoteEndpoint; String atRouteId; String nodeId; long elapsed; diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java index dd9c2213b90..a3dec68a651 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListProcess.java @@ -82,8 +82,20 @@ public class ListProcess extends ProcessWatchCommand { Map<String, ?> stats = context.getMap("statistics"); if (stats != null) { row.total = stats.get("exchangesTotal").toString(); - row.inflight = stats.get("exchangesInflight").toString(); + Object num = stats.get("remoteExchangesTotal"); + if (num != null) { + row.totalRemote = num.toString(); + } row.failed = stats.get("exchangesFailed").toString(); + num = stats.get("remoteExchangesFailed"); + if (num != null) { + row.failedRemote = num.toString(); + } + row.inflight = stats.get("exchangesInflight").toString(); + num = stats.get("remoteExchangesInflight"); + if (num != null) { + row.inflightRemote = num.toString(); + } } rows.add(row); } @@ -105,15 +117,36 @@ public class ListProcess extends ProcessWatchCommand { new Column().header("STATUS").headerAlign(HorizontalAlign.CENTER) .with(r -> extractState(r.state)), new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.ago), - new Column().header("TOTAL").with(r -> r.total), - new Column().header("FAIL").with(r -> r.failed), - new Column().header("INFLIGHT").with(r -> r.inflight)))); + new Column().header("TOTAL").with(this::getTotal), + new Column().header("FAIL").with(this::getFailed), + new Column().header("INFLIGHT").with(this::getInflight)))); } } return 0; } + private String getTotal(Row r) { + if (r.totalRemote != null) { + return r.totalRemote + "/" + r.total; + } + return r.total; + } + + private String getFailed(Row r) { + if (r.failedRemote != null) { + return r.failedRemote + "/" + r.failed; + } + return r.failed; + } + + private String getInflight(Row r) { + if (r.inflightRemote != null) { + return r.inflightRemote + "/" + r.inflight; + } + return r.inflight; + } + protected int sortRow(Row o1, Row o2) { String s = sort; int negate = 1; @@ -141,8 +174,11 @@ public class ListProcess extends ProcessWatchCommand { String ago; long uptime; String total; + String totalRemote; String failed; + String failedRemote; String inflight; + String inflightRemote; } }