This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch cc in repository https://gitbox.apache.org/repos/asf/camel.git
commit 36e9d93099a2fc7706a3ba2e6956189f13f365a3 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Nov 29 08:39:32 2023 +0100 CAMEL-20164: camel-core - Dev console for consumer --- .../services/org/apache/camel/dev-console/consumer | 2 + .../camel/impl/console/ConsumerDevConsole.java | 89 ++++++++++++++++++++++ .../camel/api/management/ManagedCamelContext.java | 22 ++++++ .../mbean/ManagedSchedulePollConsumerMBean.java | 18 +++++ .../camel/management/ManagedCamelContextImpl.java | 24 ++++++ .../mbean/ManagedScheduledPollConsumer.java | 31 ++++++++ 6 files changed, 186 insertions(+) diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/consumer b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/consumer new file mode 100644 index 00000000000..aeb2b3da695 --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/consumer @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.impl.console.ConsumerDevConsole diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java new file mode 100644 index 00000000000..5c7159e7468 --- /dev/null +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/ConsumerDevConsole.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.console; + +import java.util.Map; + +import org.apache.camel.Route; +import org.apache.camel.api.management.ManagedCamelContext; +import org.apache.camel.api.management.mbean.ManagedConsumerMBean; +import org.apache.camel.api.management.mbean.ManagedSchedulePollConsumerMBean; +import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.util.json.JsonObject; + +@DevConsole("consumer") +public class ConsumerDevConsole extends AbstractDevConsole { + + public ConsumerDevConsole() { + super("camel", "consumer", "Consumers", "Display information about Camel consumers"); + } + + @Override + protected String doCallText(Map<String, Object> options) { + StringBuilder sb = new StringBuilder(); + + ManagedCamelContext mcc = getCamelContext().getCamelContextExtension().getContextPlugin(ManagedCamelContext.class); + if (mcc != null) { + for (Route route : getCamelContext().getRoutes()) { + String id = route.getId(); + ManagedConsumerMBean mc = mcc.getManagedConsumer(id); + if (mc != null) { + Integer inflight = mc.getInflightExchanges(); + if (inflight == null) { + inflight = 0; + } + + if (!sb.isEmpty()) { + sb.append("\n"); + } + sb.append(String.format("\n Id: %s", id)); + sb.append(String.format("\n From: %s", mc.getEndpointUri())); + sb.append(String.format("\n State: %s", mc.getState())); + sb.append(String.format("\n Class: %s", mc.getServiceType())); + sb.append(String.format("\n Inflight: %d", inflight)); + if (mcc instanceof ManagedSchedulePollConsumerMBean mpc) { + sb.append(String.format("\n Polling: %s", mpc.isPolling())); + sb.append(String.format("\n Scheduler Started: %s", mpc.isSchedulerStarted())); + sb.append(String.format("\n Scheduler Class: %s", mpc.getSchedulerClassName())); + sb.append(String.format("\n Running Logging Level: %s", mpc.getRunningLoggingLevel())); + sb.append(String.format("\n Fixed Delay: %s", mpc.isUseFixedDelay())); + sb.append(String.format("\n Greedy: %s", mpc.isGreedy())); + sb.append(String.format("\n Send Empty Message When Idle: %s", mpc.isSendEmptyMessageWhenIdle())); + sb.append(String.format("\n Repeat Count: %s", mpc.getRepeatCount())); + sb.append(String.format("\n Delay(initial: %d delay: %d unit: %s)", + mpc.getInitialDelay(), mpc.getDelay(), mpc.getTimeUnit())); + sb.append(String.format( + "\n Backoff(counter: %d multiplier: %d errorThreshold: %d, idleThreshold: %d )", + mpc.getBackoffCounter(), mpc.getBackoffMultiplier(), mpc.getBackoffErrorThreshold(), + mpc.getBackoffIdleThreshold())); + } + } + } + } + + return sb.toString(); + } + + @Override + protected JsonObject doCallJson(Map<String, Object> options) { + JsonObject root = new JsonObject(); + + return root; + } + +} diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/ManagedCamelContext.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/ManagedCamelContext.java index ebb1d4592be..03316912eba 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/ManagedCamelContext.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/ManagedCamelContext.java @@ -17,6 +17,7 @@ package org.apache.camel.api.management; import org.apache.camel.api.management.mbean.ManagedCamelContextMBean; +import org.apache.camel.api.management.mbean.ManagedConsumerMBean; import org.apache.camel.api.management.mbean.ManagedProcessorMBean; import org.apache.camel.api.management.mbean.ManagedRouteMBean; import org.apache.camel.api.management.mbean.ManagedStepMBean; @@ -78,4 +79,25 @@ public interface ManagedCamelContext { */ <T extends ManagedRouteMBean> T getManagedRoute(String routeId, Class<T> type); + /** + * Gets the managed consumer client api from any of the routes which with the given route id + * + * @param id route id having the consumer + * @return the consumer or <tt>null</tt> if not found + */ + default ManagedConsumerMBean getManagedConsumer(String id) { + return getManagedConsumer(id, ManagedConsumerMBean.class); + } + + /** + * Gets the managed consumer client api from any of the routes which with the given route id + * + * @param id route id having the consumer + * @param type the managed consumer type from the + * {@link org.apache.camel.api.management.mbean} package. + * @return the consumer or <tt>null</tt> if not found + * @throws IllegalArgumentException if the type is not compliant + */ + <T extends ManagedConsumerMBean> T getManagedConsumer(String id, Class<T> type); + } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java index 908a5a13037..5cee5385216 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java @@ -39,6 +39,24 @@ public interface ManagedSchedulePollConsumerMBean extends ManagedConsumerMBean { @ManagedAttribute(description = "Scheduled Fixed Delay") void setUseFixedDelay(boolean useFixedDelay); + @ManagedAttribute(description = "Scheduled Greedy") + boolean isGreedy(); + + @ManagedAttribute(description = "Scheduled Greedy") + void setGreedy(boolean greedy); + + @ManagedAttribute(description = "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead") + boolean isSendEmptyMessageWhenIdle(); + + @ManagedAttribute(description = "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead") + void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle); + + @ManagedAttribute(description = "The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.") + String getRunningLoggingLevel(); + + @ManagedAttribute(description = "The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.") + void setRunningLoggingLevel(String runningLoggingLevel); + @ManagedAttribute(description = "Scheduled TimeUnit") String getTimeUnit(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/ManagedCamelContextImpl.java b/core/camel-management/src/main/java/org/apache/camel/management/ManagedCamelContextImpl.java index 99df07c2198..cbc2af9287e 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/ManagedCamelContextImpl.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/ManagedCamelContextImpl.java @@ -20,11 +20,13 @@ import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; import org.apache.camel.api.management.ManagedCamelContext; import org.apache.camel.api.management.mbean.ManagedCamelContextMBean; +import org.apache.camel.api.management.mbean.ManagedConsumerMBean; import org.apache.camel.api.management.mbean.ManagedProcessorMBean; import org.apache.camel.api.management.mbean.ManagedRouteMBean; import org.apache.camel.api.management.mbean.ManagedStepMBean; @@ -118,6 +120,28 @@ public class ManagedCamelContextImpl implements ManagedCamelContext { return null; } + @Override + public <T extends ManagedConsumerMBean> T getManagedConsumer(String id, Class<T> type) { + // jmx must be enabled + if (getManagementStrategy().getManagementAgent() == null) { + return null; + } + + Route route = camelContext.getRoute(id); + if (route != null) { + try { + Consumer consumer = route.getConsumer(); + ObjectName on = getManagementStrategy().getManagementObjectNameStrategy().getObjectNameForConsumer(camelContext, consumer); + return getManagementStrategy().getManagementAgent().newProxyClient(on, type); + } catch (MalformedObjectNameException e) { + throw RuntimeCamelException.wrapRuntimeCamelException(e); + } + } + + + return null; + } + @Override public ManagedCamelContextMBean getManagedCamelContext() { // jmx must be enabled diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java index 5d69ec0be75..1865ebac112 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.management.mbean; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; +import org.apache.camel.LoggingLevel; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedSchedulePollConsumerMBean; import org.apache.camel.support.ScheduledPollConsumer; @@ -67,6 +68,36 @@ public class ManagedScheduledPollConsumer extends ManagedConsumer implements Man getConsumer().setUseFixedDelay(useFixedDelay); } + @Override + public boolean isGreedy() { + return getConsumer().isGreedy(); + } + + @Override + public void setGreedy(boolean greedy) { + getConsumer().setGreedy(greedy); + } + + @Override + public boolean isSendEmptyMessageWhenIdle() { + return getConsumer().isSendEmptyMessageWhenIdle(); + } + + @Override + public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { + getConsumer().setSendEmptyMessageWhenIdle(sendEmptyMessageWhenIdle); + } + + @Override + public String getRunningLoggingLevel() { + return getConsumer().getRunLoggingLevel().name(); + } + + @Override + public void setRunningLoggingLevel(String runningLoggingLevel) { + getConsumer().setRunLoggingLevel(LoggingLevel.valueOf(runningLoggingLevel)); + } + @Override public String getTimeUnit() { return getConsumer().getTimeUnit().name();