Repository: camel Updated Branches: refs/heads/camel-2.13.x 6e063c2f6 -> 903fa824a refs/heads/master 9bae4b23d -> 5c13e0d10
CAMEL-7333: RuntimeEndpointRegistry to capture runtime usage of endpoints used during routing from dynamic eips. Exposed in JMX as well. Option to turn it easily on|off. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5c13e0d1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5c13e0d1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5c13e0d1 Branch: refs/heads/master Commit: 5c13e0d10a89678f5a69950644f6551af9b9edbb Parents: 9bae4b2 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Mar 28 20:41:29 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Mar 28 20:41:29 2014 +0100 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 22 ++- .../mbean/ManagedCamelContextMBean.java | 5 +- .../api/management/mbean/ManagedRouteMBean.java | 5 +- .../ManagedRuntimeEndpointRegistryMBean.java | 41 +++++ .../apache/camel/impl/DefaultCamelContext.java | 28 ++- .../impl/DefaultRuntimeEndpointRegistry.java | 169 +++++++++++++++++++ .../DefaultManagementLifecycleStrategy.java | 4 + .../management/mbean/ManagedCamelContext.java | 6 +- .../camel/management/mbean/ManagedRoute.java | 5 + .../mbean/ManagedRuntimeEndpointRegistry.java | 63 +++++++ .../camel/model/RouteDefinitionHelper.java | 45 ++++- .../camel/spi/RuntimeEndpointRegistry.java | 73 ++++++++ .../impl/MultipleLifecycleStrategyTest.java | 2 +- .../camel/impl/RuntimeEndpointRegistryTest.java | 76 +++++++++ .../management/ManagedCamelContextTest.java | 4 +- .../ManagedRouteDumpRouteAsXmlTest.java | 4 +- .../model/GatherAllStaticEndpointUrisTest.java | 10 +- .../blueprint/CamelContextFactoryBean.java | 10 ++ .../xml/AbstractCamelContextFactoryBean.java | 11 ++ .../camel/spring/CamelContextFactoryBean.java | 10 ++ 20 files changed, 573 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index 7f8e42f..8e32849 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -51,6 +51,7 @@ import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RouteStartupOrder; +import org.apache.camel.spi.RuntimeEndpointRegistry; import org.apache.camel.spi.ServicePool; import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; @@ -1259,7 +1260,7 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { String getComponentDocumentation(String componentName) throws IOException; /** - * Creates a JSON representation of all the <b>static</b> configured endpoints defined in the given route(s). + * Creates a JSON representation of all the <b>static</b> and <b>dynamic</b> configured endpoints defined in the given route(s). * * @param routeId for a particular route, or <tt>null</tt> for all routes * @return a JSON string @@ -1267,6 +1268,15 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { String createRouteStaticEndpointJson(String routeId); /** + * Creates a JSON representation of all the <b>static</b> (and possible <b>dynamic</b>) configured endpoints defined in the given route(s). + * + * @param routeId for a particular route, or <tt>null</tt> for all routes + * @param includeDynamic whether to include dynamic endpoints + * @return a JSON string + */ + String createRouteStaticEndpointJson(String routeId, boolean includeDynamic); + + /** * Gets the {@link StreamCachingStrategy} to use. */ StreamCachingStrategy getStreamCachingStrategy(); @@ -1286,4 +1296,14 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { */ void setUnitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory); + /** + * Gets the {@link org.apache.camel.spi.RuntimeEndpointRegistry} to use, or <tt>null</tt> if none is in use. + */ + RuntimeEndpointRegistry getRuntimeEndpointRegistry(); + + /** + * Sets a custom {@link org.apache.camel.spi.RuntimeEndpointRegistry} to use. + */ + void setRuntimeEndpointRegistry(RuntimeEndpointRegistry runtimeEndpointRegistry); + } http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java index 1befa4f..ce08339 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java @@ -258,7 +258,10 @@ public interface ManagedCamelContextMBean extends ManagedPerformanceCounterMBean @ManagedOperation(description = "Returns the HTML documentation for the given camel component") String getComponentDocumentation(String componentName) throws IOException; - @ManagedOperation(description = "Returns the JSON representation of all the static endpoints defined in all the routes") + @ManagedOperation(description = "Returns the JSON representation of all the static and dynamic endpoints defined in all the routes") String createRouteStaticEndpointJson(); + @ManagedOperation(description = "Returns the JSON representation of all the static endpoints (and possible dynamic) defined in all the routes") + String createRouteStaticEndpointJson(boolean includeDynamic); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java index 2b45bd3..f577320 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java @@ -104,7 +104,10 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean { @ManagedOperation(description = "Reset counters") void reset(boolean includeProcessors) throws Exception; - @ManagedOperation(description = "Returns the JSON representation of all the static endpoints defined in this route") + @ManagedOperation(description = "Returns the JSON representation of all the static and dynamic endpoints defined in this route") String createRouteStaticEndpointJson(); + @ManagedOperation(description = "Returns the JSON representation of all the static endpoints (and possible dynamic) defined in this route") + String createRouteStaticEndpointJson(boolean includeDynamic); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java new file mode 100644 index 0000000..57637f1 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRuntimeEndpointRegistryMBean.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import java.util.List; + +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; + +public interface ManagedRuntimeEndpointRegistryMBean extends ManagedServiceMBean { + + @ManagedOperation(description = "Resets the usage gathered") + void reset(); + + @ManagedAttribute(description = "Whether gathering runtime usage is enabled or not.") + boolean isEnabled(); + + @ManagedAttribute(description = "Whether gathering runtime usage is enabled or not.") + void setEnabled(boolean enabled); + + @ManagedOperation(description = " Gets all the endpoint uris captured during runtime that are in-use.") + List<String> getAllEndpoints(boolean includeInputs); + + @ManagedOperation(description = " Gets all the endpoint uris captured during runtime that are in-use for the given route.") + List<String> getEndpointsPerRoute(String routeId, boolean includeInputs); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 0baac81..840e997 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -116,6 +116,7 @@ import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RouteContext; import org.apache.camel.spi.RouteStartupOrder; +import org.apache.camel.spi.RuntimeEndpointRegistry; import org.apache.camel.spi.ServicePool; import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; @@ -212,6 +213,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private InterceptStrategy defaultBacklogTracer; private InterceptStrategy defaultBacklogDebugger; private InflightRepository inflightRepository = new DefaultInflightRepository(); + private RuntimeEndpointRegistry runtimeEndpointRegistry = new DefaultRuntimeEndpointRegistry(); private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<RouteStartupOrder>(); // start auto assigning route ids using numbering 1000 and upwards private int defaultRouteStartupOrder = 1000; @@ -1058,6 +1060,11 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } public String createRouteStaticEndpointJson(String routeId) { + // lets include dynamic as well as we want as much data as possible + return createRouteStaticEndpointJson(routeId, true); + } + + public String createRouteStaticEndpointJson(String routeId, boolean includeDynamic) { List<RouteDefinition> routes = new ArrayList<RouteDefinition>(); if (routeId != null) { RouteDefinition route = getRouteDefinition(routeId); @@ -1081,7 +1088,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon String id = route.getId(); buffer.append("\n \"" + id + "\": {"); buffer.append("\n \"inputs\": ["); - Set<String> inputs = RouteDefinitionHelper.gatherAllStaticEndpointUris(route, true, false); + // for inputs we do not need to check dynamic as we have the data from the route definition + Set<String> inputs = RouteDefinitionHelper.gatherAllStaticEndpointUris(this, route, true, false); boolean first = true; for (String input : inputs) { if (!first) { @@ -1096,7 +1104,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon buffer.append(","); buffer.append("\n \"outputs\": ["); - Set<String> outputs = RouteDefinitionHelper.gatherAllStaticEndpointUris(route, false, true); + Set<String> outputs = RouteDefinitionHelper.gatherAllEndpointUris(this, route, false, true, includeDynamic); first = true; for (String output : outputs) { if (!first) { @@ -1522,6 +1530,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon this.unitOfWorkFactory = unitOfWorkFactory; } + public RuntimeEndpointRegistry getRuntimeEndpointRegistry() { + return runtimeEndpointRegistry; + } + + public void setRuntimeEndpointRegistry(RuntimeEndpointRegistry runtimeEndpointRegistry) { + this.runtimeEndpointRegistry = runtimeEndpointRegistry; + } + public String getUptime() { // compute and log uptime if (startDate == null) { @@ -1773,6 +1789,13 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon addService(shutdownStrategy); addService(packageScanClassResolver); + if (runtimeEndpointRegistry != null) { + if (runtimeEndpointRegistry instanceof EventNotifier) { + getManagementStrategy().addEventNotifier((EventNotifier) runtimeEndpointRegistry); + } + addService(runtimeEndpointRegistry); + } + // eager lookup any configured properties component to avoid subsequent lookup attempts which may impact performance // due we use properties component for property placeholder resolution at runtime Component existing = lookupPropertiesComponent(); @@ -2369,6 +2392,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon getLanguageResolver(); getTypeConverterRegistry(); getTypeConverter(); + getRuntimeEndpointRegistry(); if (isTypeConverterStatisticsEnabled() != null) { getTypeConverterRegistry().getStatistics().setStatisticsEnabled(isTypeConverterStatisticsEnabled()); http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java new file mode 100644 index 0000000..e4ca093 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EventObject; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.management.event.ExchangeSendingEvent; +import org.apache.camel.management.event.RouteStartedEvent; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RuntimeEndpointRegistry; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.util.LRUCache; + +public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport implements RuntimeEndpointRegistry { + + // endpoint uri -> route ids + private Map<String, Set<String>> inputs; + private Map<String, Set<String>> outputs; + private int limit = 1000; + private boolean enabled = true; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + @Override + public List<String> getAllEndpoints(boolean includeInputs) { + List<String> answer = new ArrayList<String>(); + if (includeInputs) { + answer.addAll(inputs.keySet()); + } + answer.addAll(outputs.keySet()); + return Collections.unmodifiableList(answer); + } + + @Override + public List<String> getEndpointsPerRoute(String routeId, boolean includeInputs) { + List<String> answer = new ArrayList<String>(); + if (includeInputs) { + for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) { + if (entry.getValue().contains(routeId)) { + answer.add(entry.getKey()); + } + } + } + for (Map.Entry<String, Set<String>> entry : outputs.entrySet()) { + if (entry.getValue().contains(routeId)) { + answer.add(entry.getKey()); + } + } + return Collections.unmodifiableList(answer); + } + + @Override + public int getLimit() { + return limit; + } + + @Override + public void setLimit(int limit) { + this.limit = limit; + } + + @Override + public void reset() { + inputs.clear(); + outputs.clear(); + } + + @Override + public int size() { + int total = inputs.size(); + total += outputs.size(); + return total; + } + + @Override + protected void doStart() throws Exception { + if (inputs == null) { + inputs = new LRUCache<String, Set<String>>(limit); + } + if (outputs == null) { + outputs = new LRUCache<String, Set<String>>(limit); + } + } + + @Override + protected void doStop() throws Exception { + reset(); + } + + @Override + public void notify(EventObject event) throws Exception { + if (event instanceof RouteStartedEvent) { + RouteStartedEvent rse = (RouteStartedEvent) event; + Endpoint endpoint = rse.getRoute().getEndpoint(); + String routeId = rse.getRoute().getId(); + + Set<String> routes = inputs.get(endpoint); + if (routeId != null && (routes == null || !routes.contains(routeId))) { + if (routes == null) { + routes = new ConcurrentSkipListSet<String>(); + } + routes.add(routeId); + inputs.put(endpoint.getEndpointUri(), routes); + } + } else { + ExchangeSendingEvent ese = (ExchangeSendingEvent) event; + Endpoint endpoint = ese.getEndpoint(); + String routeId = getRouteId(ese.getExchange()); + + Set<String> routes = outputs.get(endpoint); + if (routeId != null && (routes == null || !routes.contains(routeId))) { + if (routes == null) { + routes = new ConcurrentSkipListSet<String>(); + } + routes.add(routeId); + outputs.put(endpoint.getEndpointUri(), routes); + } + } + } + + private String getRouteId(Exchange exchange) { + String answer = null; + UnitOfWork uow = exchange.getUnitOfWork(); + RouteContext rc = uow != null ? uow.getRouteContext() : null; + if (rc != null) { + answer = rc.getRoute().getId(); + } + if (answer == null) { + // fallback and get from route id on the exchange + answer = exchange.getFromRouteId(); + } + return answer; + } + + @Override + public boolean isEnabled(EventObject event) { + return enabled && event instanceof ExchangeSendingEvent + || event instanceof RouteStartedEvent; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java index 2528715..38d8b65 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java @@ -59,6 +59,7 @@ import org.apache.camel.management.mbean.ManagedEndpoint; import org.apache.camel.management.mbean.ManagedEndpointRegistry; import org.apache.camel.management.mbean.ManagedProducerCache; import org.apache.camel.management.mbean.ManagedRoute; +import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; @@ -84,6 +85,7 @@ import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementObjectStrategy; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.RuntimeEndpointRegistry; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.TypeConverterRegistry; import org.apache.camel.spi.UnitOfWork; @@ -460,6 +462,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service); } else if (service instanceof TypeConverterRegistry) { answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); + } else if (service instanceof RuntimeEndpointRegistry) { + answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service); } else if (service instanceof StreamCachingStrategy) { answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service); } else if (service != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java index a20617d..e26c8aa 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java @@ -433,7 +433,11 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti } public String createRouteStaticEndpointJson() { - return context.createRouteStaticEndpointJson(null); + return createRouteStaticEndpointJson(true); + } + + public String createRouteStaticEndpointJson(boolean includeDynamic) { + return context.createRouteStaticEndpointJson(null, includeDynamic); } public List<String> findComponentNames() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index 0f3fa57..103d975 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -362,6 +362,11 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList } @Override + public String createRouteStaticEndpointJson(boolean includeDynamic) { + return getContext().createRouteStaticEndpointJson(getRouteId(), includeDynamic); + } + + @Override public boolean equals(Object o) { return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute)o).route)); } http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java new file mode 100644 index 0000000..8b9a286 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRuntimeEndpointRegistry.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedRuntimeEndpointRegistryMBean; +import org.apache.camel.spi.RuntimeEndpointRegistry; + +/** + * + */ +@ManagedResource(description = "Managed RuntimeEndpointRegistry") +public class ManagedRuntimeEndpointRegistry extends ManagedService implements ManagedRuntimeEndpointRegistryMBean { + + private final RuntimeEndpointRegistry registry; + + public ManagedRuntimeEndpointRegistry(CamelContext context, RuntimeEndpointRegistry registry) { + super(context, registry); + this.registry = registry; + } + + @Override + public void reset() { + registry.reset(); + } + + @Override + public boolean isEnabled() { + return registry.isEnabled(); + } + + @Override + public void setEnabled(boolean enabled) { + registry.setEnabled(enabled); + } + + @Override + public List<String> getAllEndpoints(boolean includeInputs) { + return registry.getAllEndpoints(includeInputs); + } + + @Override + public List<String> getEndpointsPerRoute(String routeId, boolean includeInputs) { + return registry.getEndpointsPerRoute(routeId, includeInputs); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java b/camel-core/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java index e7f77e8..7ad3ff4 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java +++ b/camel-core/src/main/java/org/apache/camel/model/RouteDefinitionHelper.java @@ -16,6 +16,8 @@ */ package org.apache.camel.model; +import java.io.UnsupportedEncodingException; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashSet; @@ -27,6 +29,7 @@ import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.URISupport; import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs; @@ -50,12 +53,25 @@ public final class RouteDefinitionHelper { * @param includeOutputs whether to include outputs * @return the endpoints uris */ - public static Set<String> gatherAllStaticEndpointUris(RouteDefinition route, boolean includeInputs, boolean includeOutputs) { + public static Set<String> gatherAllStaticEndpointUris(CamelContext camelContext, RouteDefinition route, boolean includeInputs, boolean includeOutputs) { + return gatherAllEndpointUris(camelContext, route, includeInputs, includeOutputs, false); + } + + /** + * Gather all the endpoint uri's the route is using from the EIPs that has a static or dynamic endpoint defined. + * + * @param route the route + * @param includeInputs whether to include inputs + * @param includeOutputs whether to include outputs + * @param includeDynamic whether to include dynamic outputs which has been in use during routing at runtime, gathered from the {@link org.apache.camel.spi.RuntimeEndpointRegistry}. + * @return the endpoints uris + */ + public static Set<String> gatherAllEndpointUris(CamelContext camelContext, RouteDefinition route, boolean includeInputs, boolean includeOutputs, boolean includeDynamic) { Set<String> answer = new LinkedHashSet<String>(); if (includeInputs) { for (FromDefinition from : route.getInputs()) { - String uri = from.getEndpointUri(); + String uri = normalizeUri(from.getEndpointUri()); if (uri != null) { answer.add(uri); } @@ -65,14 +81,35 @@ public final class RouteDefinitionHelper { if (includeOutputs) { Iterator<EndpointRequiredDefinition> it = filterTypeInOutputs(route.getOutputs(), EndpointRequiredDefinition.class); while (it.hasNext()) { - String uri = it.next().getEndpointUri(); - answer.add(uri); + String uri = normalizeUri(it.next().getEndpointUri()); + if (uri != null) { + answer.add(uri); + } + } + if (includeDynamic && camelContext.getRuntimeEndpointRegistry() != null) { + List<String> endpoints = camelContext.getRuntimeEndpointRegistry().getEndpointsPerRoute(route.getId(), false); + for (String uri : endpoints) { + if (uri != null) { + answer.add(uri); + } + } } } return answer; } + private static String normalizeUri(String uri) { + try { + return URISupport.normalizeUri(uri); + } catch (UnsupportedEncodingException e) { + // ignore + } catch (URISyntaxException e) { + // ignore + } + return null; + } + /** * Force assigning ids to the routes * http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java new file mode 100644 index 0000000..55be054 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import java.util.List; + +import org.apache.camel.Service; + +/** + * A registry which listen for runtime usage of {@link org.apache.camel.Endpoint} during routing in Camel. + */ +public interface RuntimeEndpointRegistry extends Service { + + /** + * Whether gathering runtime usage is enabled or not. + */ + boolean isEnabled(); + + /** + * Sets whether gathering runtime usage is enabled or not. + */ + void setEnabled(boolean enabled); + + /** + * Maximum number of endpoints to keep in the cache. + * <p/> + * The default value is <tt>1000</tt> + */ + int getLimit(); + + /** + * Sets the maximum number of endpoints to keep in the cache. + */ + void setLimit(int limit); + + /** + * Clears the runtime usage gathered + */ + void reset(); + + /** + * Number of endpoints in the cache. + */ + int size(); + + /** + * Gets all the endpoint uris captured during runtime that are in-use. + * + * @param includeInputs whether to include route inputs + */ + List<String> getAllEndpoints(boolean includeInputs); + + /** + * Gets all the endpoint uris from the given route captured during runtime that are in-use. + * + * @param includeInputs whether to include route inputs + */ + List<String> getEndpointsPerRoute(String routeId, boolean includeInputs); +} http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java index b863d35..dd417ac 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java @@ -51,7 +51,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport { context.stop(); List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", - "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop"); + "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop"); assertEquals(expectedEvents, dummy1.getEvents()); assertEquals(expectedEvents, dummy2.getEvents()); http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/test/java/org/apache/camel/impl/RuntimeEndpointRegistryTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/RuntimeEndpointRegistryTest.java b/camel-core/src/test/java/org/apache/camel/impl/RuntimeEndpointRegistryTest.java new file mode 100644 index 0000000..abb381f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/RuntimeEndpointRegistryTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.RuntimeEndpointRegistry; + +public class RuntimeEndpointRegistryTest extends ContextTestSupport { + + public void testRuntimeEndpointRegistry() throws Exception { + RuntimeEndpointRegistry registry = context.getRuntimeEndpointRegistry(); + assertEquals(0, registry.getAllEndpoints(false).size()); + // we have 2 at the start as we have all endpoints for the route consumers + assertEquals(2, registry.getAllEndpoints(true).size()); + + MockEndpoint mock = getMockEndpoint("mock:foo2"); + mock.expectedMessageCount(1); + template.sendBodyAndHeader("seda:foo", "Hello World", "slip", "mock:foo2"); + mock.assertIsSatisfied(); + + assertEquals(4, registry.getAllEndpoints(true).size()); + assertEquals(3, registry.getEndpointsPerRoute("foo", true).size()); + assertEquals(1, registry.getEndpointsPerRoute("bar", true).size()); + + mock = getMockEndpoint("mock:bar2"); + mock.expectedMessageCount(1); + template.sendBodyAndHeader("seda:bar", "Bye World", "slip", "mock:bar2"); + mock.assertIsSatisfied(); + + assertEquals(6, registry.getAllEndpoints(true).size()); + assertEquals(3, registry.getEndpointsPerRoute("foo", true).size()); + assertEquals(3, registry.getEndpointsPerRoute("bar", true).size()); + + // lets check the json + String json = context.createRouteStaticEndpointJson(null); + assertNotNull(json); + log.info(json); + + assertTrue("Should have outputs", json.contains(" { \"uri\": \"mock://foo\" }")); + assertTrue("Should have outputs", json.contains(" { \"uri\": \"mock://foo2\" }")); + assertTrue("Should have outputs", json.contains(" { \"uri\": \"mock://bar\" }")); + assertTrue("Should have outputs", json.contains(" { \"uri\": \"mock://bar2\" }")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").routeId("foo") + .to("mock:foo") + .recipientList(header("slip")); + + from("seda:bar").routeId("bar") + .to("mock:bar") + .recipientList(header("slip")); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java index 9ee8d57..8223e7b 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedCamelContextTest.java @@ -217,8 +217,8 @@ public class ManagedCamelContextTest extends ManagementTestSupport { assertNotNull(json); assertEquals(7, StringHelper.countChar(json, '{')); assertEquals(7, StringHelper.countChar(json, '}')); - assertTrue(json.contains("{ \"uri\": \"direct:start\" }")); - assertTrue(json.contains("{ \"uri\": \"direct:foo\" }")); + assertTrue(json.contains("{ \"uri\": \"direct://start\" }")); + assertTrue(json.contains("{ \"uri\": \"direct://foo\" }")); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpRouteAsXmlTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpRouteAsXmlTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpRouteAsXmlTest.java index 274cab8..d4ae9bc 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpRouteAsXmlTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDumpRouteAsXmlTest.java @@ -70,8 +70,8 @@ public class ManagedRouteDumpRouteAsXmlTest extends ManagementTestSupport { String json = (String) mbeanServer.invoke(on, "createRouteStaticEndpointJson", null, null); assertNotNull(json); assertTrue(json.contains("\"myRoute\"")); - assertTrue(json.contains("{ \"uri\": \"direct:start\" }")); - assertTrue(json.contains("{ \"uri\": \"mock:result\" }")); + assertTrue(json.contains("{ \"uri\": \"direct://start\" }")); + assertTrue(json.contains("{ \"uri\": \"mock://result\" }")); } http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java b/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java index a3fbb7e..4713c61 100644 --- a/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java +++ b/camel-core/src/test/java/org/apache/camel/model/GatherAllStaticEndpointUrisTest.java @@ -25,23 +25,23 @@ public class GatherAllStaticEndpointUrisTest extends ContextTestSupport { public void testGatherAllStaticEndpointUris() throws Exception { RouteDefinition route = context.getRouteDefinition("foo"); - Set<String> uris = RouteDefinitionHelper.gatherAllStaticEndpointUris(route, true, true); + Set<String> uris = RouteDefinitionHelper.gatherAllStaticEndpointUris(context, route, true, true); assertNotNull(uris); assertEquals(5, uris.size()); RouteDefinition route2 = context.getRouteDefinition("bar"); - Set<String> uris2 = RouteDefinitionHelper.gatherAllStaticEndpointUris(route2, true, true); + Set<String> uris2 = RouteDefinitionHelper.gatherAllStaticEndpointUris(context, route2, true, true); assertNotNull(uris2); assertEquals(2, uris2.size()); - Set<String> uris2out = RouteDefinitionHelper.gatherAllStaticEndpointUris(route2, false, true); + Set<String> uris2out = RouteDefinitionHelper.gatherAllStaticEndpointUris(context, route2, false, true); assertNotNull(uris2out); assertEquals(1, uris2out.size()); String json = context.createRouteStaticEndpointJson(null); assertNotNull(json); - assertTrue(json.contains("{ \"uri\": \"direct:foo\" }")); - assertTrue(json.contains("{ \"uri\": \"seda:bar\" }")); + assertTrue(json.contains("{ \"uri\": \"direct://foo\" }")); + assertTrue(json.contains("{ \"uri\": \"seda://bar\" }")); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java index 72dc978..0ead959 100644 --- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java +++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelContextFactoryBean.java @@ -98,6 +98,8 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Blu @XmlAttribute(required = false) private String allowUseOriginalMessage; @XmlAttribute(required = false) + private String runtimeEndpointRegistryEnabled; + @XmlAttribute(required = false) private String managementNamePattern; @XmlAttribute(required = false) private String threadNamePattern; @@ -342,6 +344,14 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Blu this.allowUseOriginalMessage = allowUseOriginalMessage; } + public String getRuntimeEndpointRegistryEnabled() { + return runtimeEndpointRegistryEnabled; + } + + public void setRuntimeEndpointRegistryEnabled(String runtimeEndpointRegistryEnabled) { + this.runtimeEndpointRegistryEnabled = runtimeEndpointRegistryEnabled; + } + public String getManagementNamePattern() { return managementNamePattern; } http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index 10e27e3..ead2bf0 100644 --- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -81,6 +81,7 @@ import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.PackageScanFilter; import org.apache.camel.spi.ProcessorFactory; +import org.apache.camel.spi.RuntimeEndpointRegistry; import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.ThreadPoolFactory; @@ -216,6 +217,11 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex LOG.info("Using custom UnitOfWorkFactory: {}", unitOfWorkFactory); getContext().setUnitOfWorkFactory(unitOfWorkFactory); } + RuntimeEndpointRegistry runtimeEndpointRegistry = getBeanForType(RuntimeEndpointRegistry.class); + if (runtimeEndpointRegistry != null) { + LOG.info("Using custom RuntimeEndpointRegistry: {}", runtimeEndpointRegistry); + getContext().setRuntimeEndpointRegistry(runtimeEndpointRegistry); + } // set the event notifier strategies if defined Map<String, EventNotifier> eventNotifiers = getContext().getRegistry().findByTypeWithName(EventNotifier.class); if (eventNotifiers != null && !eventNotifiers.isEmpty()) { @@ -571,6 +577,8 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex public abstract String getAllowUseOriginalMessage(); + public abstract String getRuntimeEndpointRegistryEnabled(); + public abstract String getManagementNamePattern(); public abstract String getThreadNamePattern(); @@ -647,6 +655,9 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex if (getAllowUseOriginalMessage() != null) { ctx.setAllowUseOriginalMessage(CamelContextHelper.parseBoolean(getContext(), getAllowUseOriginalMessage())); } + if (getRuntimeEndpointRegistryEnabled() != null) { + ctx.getRuntimeEndpointRegistry().setEnabled(CamelContextHelper.parseBoolean(getContext(), getRuntimeEndpointRegistryEnabled())); + } if (getManagementNamePattern() != null) { ctx.getManagementNameStrategy().setNamePattern(getManagementNamePattern()); } http://git-wip-us.apache.org/repos/asf/camel/blob/5c13e0d1/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java index 881ef75..7f89623 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java @@ -109,6 +109,8 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Spr @XmlAttribute(required = false) private String allowUseOriginalMessage; @XmlAttribute(required = false) + private String runtimeEndpointRegistryEnabled; + @XmlAttribute(required = false) private String managementNamePattern; @XmlAttribute(required = false) private String threadNamePattern; @@ -579,6 +581,14 @@ public class CamelContextFactoryBean extends AbstractCamelContextFactoryBean<Spr this.allowUseOriginalMessage = allowUseOriginalMessage; } + public String getRuntimeEndpointRegistryEnabled() { + return runtimeEndpointRegistryEnabled; + } + + public void setRuntimeEndpointRegistryEnabled(String runtimeEndpointRegistryEnabled) { + this.runtimeEndpointRegistryEnabled = runtimeEndpointRegistryEnabled; + } + public String getManagementNamePattern() { return managementNamePattern; }