This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 93af946 CAMEL-17228: Route reload issue when using Kamelet EIP. 93af946 is described below commit 93af946d6927e3f11b6493b5c8fee95c79ff3b9b Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Nov 25 10:45:02 2021 +0100 CAMEL-17228: Route reload issue when using Kamelet EIP. --- .../apache/camel/component/kamelet/KameletConsumer.java | 8 +++++--- .../src/main/java/org/apache/camel/CamelContext.java | 8 ++++++++ .../org/apache/camel/impl/engine/SimpleCamelContext.java | 5 +++++ .../java/org/apache/camel/impl/DefaultCamelContext.java | 16 ++++++++++++++++ .../main/java/org/apache/camel/impl/DefaultModel.java | 10 ++++++++++ .../apache/camel/impl/lw/LightweightCamelContext.java | 10 ++++++++++ .../camel/impl/lw/LightweightRuntimeCamelContext.java | 5 +++++ .../src/main/java/org/apache/camel/model/Model.java | 8 ++++++++ .../apache/camel/support/RouteWatcherReloadStrategy.java | 3 +++ .../src/main/java/org/apache/camel/main/KameletMain.java | 2 +- .../src/test/resources/my-camel-k.yaml | 2 -- 11 files changed, 71 insertions(+), 6 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java index 15a02cd..d875701 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java @@ -19,11 +19,13 @@ package org.apache.camel.component.kamelet; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.Suspendable; +import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.support.DefaultConsumer; final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable { + private final InflightRepository inflight; private final KameletComponent component; private final String key; @@ -31,6 +33,7 @@ final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Su super(endpoint, processor); this.component = endpoint.getComponent(); this.key = key; + this.inflight = endpoint.getCamelContext().getInflightRepository(); } @Override @@ -71,9 +74,8 @@ final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Su @Override public int getPendingExchangesSize() { - // return 0 as we do not have an internal memory queue with a variable - // size of inflight messages. - return 0; + // capture the inflight counter from the route + return inflight.size(getRouteId()); } @Override diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java index 49c5bee..436c270 100644 --- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -635,6 +635,14 @@ public interface CamelContext extends CamelContextLifecycle, RuntimeConfiguratio throws Exception; /** + * Removes the route templates matching the pattern + * + * @param pattern pattern, such as * for all, or foo* to remove all foo templates + * @throws Exception is thrown if error during removing route templates + */ + void removeRouteTemplates(String pattern) throws Exception; + + /** * Adds the given route policy factory * * @param routePolicyFactory the factory diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index 11e0f67..9dee233 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -631,6 +631,11 @@ public class SimpleCamelContext extends AbstractCamelContext { } @Override + public void removeRouteTemplates(String pattern) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override public String getTestExcludeRoutes() { throw new UnsupportedOperationException(); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 5b88933..d66704f 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -408,6 +408,14 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame } @Override + public void removeRouteTemplateDefinitions(String pattern) throws Exception { + if (model == null && isLightweight()) { + throw new IllegalStateException("Access to model not supported in lightweight mode"); + } + model.removeRouteTemplateDefinitions(pattern); + } + + @Override public void addRouteTemplateDefinitionConverter(String templateIdPattern, RouteTemplateDefinition.Converter converter) { if (model == null && isLightweight()) { throw new IllegalStateException("Access to model not supported in lightweight mode"); @@ -434,6 +442,14 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame } @Override + public void removeRouteTemplates(String pattern) throws Exception { + if (model == null && isLightweight()) { + throw new IllegalStateException("Access to model not supported in lightweight mode"); + } + model.removeRouteTemplateDefinitions(pattern); + } + + @Override public List<RestDefinition> getRestDefinitions() { if (model == null && isLightweight()) { throw new IllegalStateException("Access to model not supported in lightweight mode"); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java index b658b6b..2ff0362 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java @@ -65,6 +65,7 @@ import org.apache.camel.spi.RouteTemplateLoaderListener; import org.apache.camel.spi.RouteTemplateParameterSource; import org.apache.camel.spi.ScriptingLanguage; import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.support.PatternHelper; import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.support.RouteTemplateHelper; import org.apache.camel.support.ScriptHelper; @@ -214,6 +215,15 @@ public class DefaultModel implements Model { } @Override + public synchronized void removeRouteTemplateDefinitions(String pattern) throws Exception { + for (RouteTemplateDefinition def : new ArrayList<>(routeTemplateDefinitions)) { + if (PatternHelper.matchPattern(def.getId(), pattern)) { + removeRouteTemplateDefinition(def); + } + } + } + + @Override public synchronized List<RouteDefinition> getRouteDefinitions() { return routeDefinitions; } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java index d73e066..1ed731c 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java @@ -1124,6 +1124,11 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam delegate.setAutowiredEnabled(autowiredEnabled); } + @Override + public void removeRouteTemplates(String pattern) throws Exception { + delegate.removeRouteTemplates(pattern); + } + // // ExtendedCamelContext // @@ -1796,6 +1801,11 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam } @Override + public void removeRouteTemplateDefinitions(String pattern) throws Exception { + getModelCamelContext().removeRouteTemplateDefinitions(pattern); + } + + @Override public List<RestDefinition> getRestDefinitions() { return getModelCamelContext().getRestDefinitions(); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java index d9f7f3b..910c19d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java @@ -1941,6 +1941,11 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat } @Override + public void removeRouteTemplates(String pattern) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override public void setLightweight(boolean lightweight) { throw new UnsupportedOperationException(); } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java b/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java index 225965d..6d3ca19 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/Model.java @@ -179,6 +179,14 @@ public interface Model { void removeRouteTemplateDefinition(RouteTemplateDefinition routeTemplateDefinition) throws Exception; /** + * Removes the route templates matching the pattern - stopping any previously running routes if any of them are + * actively running + * + * @param pattern pattern, such as * for all, or foo* to remove all foo templates + */ + void removeRouteTemplateDefinitions(String pattern) throws Exception; + + /** * Add a converter to translate a {@link RouteTemplateDefinition} to a {@link RouteDefinition}. * * @param templateIdPattern the route template ut to whom a pattern should eb applied diff --git a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java index 89e56b6..25443ef 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java @@ -123,6 +123,9 @@ public class RouteWatcherReloadStrategy extends FileWatcherResourceReloadStrateg for (Route route : getCamelContext().getRoutes()) { getCamelContext().removeRoute(route.getRouteId()); } + // remove left-over route templates and endpoints, so we can start on a fresh + getCamelContext().removeRouteTemplates("*"); + getCamelContext().getEndpointRegistry().clear(); } Set<String> ids = getCamelContext().adapt(ExtendedCamelContext.class).getRoutesLoader().updateRoutes(resource); diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java index 93dcc81..bb655ff 100644 --- a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java +++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java @@ -171,7 +171,7 @@ public class KameletMain extends MainCommandLineSupport { * Sets initial properties that are specific to camel-kamelet-main */ protected void configureInitialProperties() { - addInitialProperty("camel.component.kamelet.location", "classpath:/kamelets,github:apache:camel-kamelets"); + addInitialProperty("camel.component.kamelet.location", "classpath:/kamelets,github:apache:camel-kamelets/kamelets"); } } diff --git a/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml b/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml index 60b805e..ade286b 100644 --- a/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml +++ b/dsl/camel-kamelet-main/src/test/resources/my-camel-k.yaml @@ -27,8 +27,6 @@ spec: message: Hello Camel K period: 1000 steps: -# this works -# - to: "kamelet:log-sink?showHeaders=false" - kamelet: name: log-sink parameters: