This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 758cda80af3cd1ccf719bb03540c669ee88b0090 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Nov 12 19:06:41 2020 +0100 Fixes #544: Kamelet component - optimize as we did for direct component --- .../kamelet/KameletEndpointConfigurer.java | 5 ++ .../apache/camel/component/kamelet/kamelet.json | 3 +- .../camel/component/kamelet/KameletComponent.java | 73 +++++++++++++++--- .../camel/component/kamelet/KameletConsumer.java | 19 +++-- .../KameletConsumerNotAvailableException.java | 27 +++++++ .../camel/component/kamelet/KameletEndpoint.java | 89 +++++++--------------- .../camel/component/kamelet/KameletProducer.java | 70 +++++++++++------ 7 files changed, 184 insertions(+), 102 deletions(-) diff --git a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java index dc0964d..cdbc56e 100644 --- a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java +++ b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java @@ -24,6 +24,7 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme map.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class); map.put("exchangePattern", org.apache.camel.ExchangePattern.class); map.put("block", boolean.class); + map.put("failIfNoConsumers", boolean.class); map.put("kameletProperties", java.util.Map.class); map.put("lazyStartProducer", boolean.class); map.put("timeout", long.class); @@ -45,6 +46,8 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true; case "exchangepattern": case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true; + case "failifnoconsumers": + case "failIfNoConsumers": target.setFailIfNoConsumers(property(camelContext, boolean.class, value)); return true; case "kameletproperties": case "kameletProperties": target.setKameletProperties(property(camelContext, java.util.Map.class, value)); return true; case "lazystartproducer": @@ -73,6 +76,8 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme case "exceptionHandler": return target.getExceptionHandler(); case "exchangepattern": case "exchangePattern": return target.getExchangePattern(); + case "failifnoconsumers": + case "failIfNoConsumers": return target.isFailIfNoConsumers(); case "kameletproperties": case "kameletProperties": return target.getKameletProperties(); case "lazystartproducer": diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json index eff6911..91854be 100644 --- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json +++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json @@ -35,7 +35,8 @@ "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...] "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, "block": { "kind": "parameter", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "description": "If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active." }, - "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Custom properties for kamelet" }, + "failIfNoConsumers": { "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "description": "Whether the producer should fail by throwing an exception, when sending to a kamelet endpoint with no active consumers." }, + "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "description": "Custom properties for kamelet" }, "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the [...] "timeout": { "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": 30000, "description": "The timeout value to use if block is enabled." }, "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" }, diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 7f243af..271b261 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -18,6 +18,7 @@ package org.apache.camel.component.kamelet; import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -38,6 +39,7 @@ import org.apache.camel.support.DefaultComponent; import org.apache.camel.support.DefaultEndpoint; import org.apache.camel.support.LifecycleStrategySupport; import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.StopWatch; import org.apache.camel.util.URISupport; import org.apache.camel.util.UnsafeUriCharactersEncoder; import org.slf4j.Logger; @@ -54,8 +56,15 @@ import static org.apache.camel.component.kamelet.Kamelet.addRouteFromTemplate; public class KameletComponent extends DefaultComponent { private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class); - private final Map<String, KameletConsumer> consumers; - private final LifecycleHandler lifecycleHandler; + // active consumers + private final Map<String, KameletConsumer> consumers = new HashMap<>(); + // counter that is used for producers to keep track if any consumer was added/removed since they last checked + // this is used for optimization to avoid each producer to get consumer for each message processed + // (locking via synchronized, and then lookup in the map as the cost) + // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled + private volatile int stateCounter; + + private final LifecycleHandler lifecycleHandler = new LifecycleHandler(); @Metadata(label = "producer", defaultValue = "true") private boolean block = true; @@ -63,8 +72,6 @@ public class KameletComponent extends DefaultComponent { private long timeout = 30000L; public KameletComponent() { - this.lifecycleHandler = new LifecycleHandler(); - this.consumers = new ConcurrentHashMap<>(); } @Override @@ -194,7 +201,7 @@ public class KameletComponent extends DefaultComponent { // Note that at the moment, there's no enforcement around `source` // and `sink' to be defined on the right side (producer or consumer) // - endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers); + endpoint = new KameletEndpoint(uri, this, templateId, routeId); // forward component properties endpoint.setBlock(block); @@ -203,7 +210,7 @@ public class KameletComponent extends DefaultComponent { // set endpoint specific properties setProperties(endpoint, parameters); } else { - endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers) { + endpoint = new KameletEndpoint(uri, this, templateId, routeId) { @Override protected void doInit() throws Exception { super.doInit(); @@ -266,6 +273,53 @@ public class KameletComponent extends DefaultComponent { this.timeout = timeout; } + int getStateCounter() { + return stateCounter; + } + + public void addConsumer(String key, KameletConsumer consumer) { + synchronized (consumers) { + if (consumers.putIfAbsent(key, consumer) != null) { + throw new IllegalArgumentException( + "Cannot add a 2nd consumer to the same endpoint: " + key + + ". KameletEndpoint only allows one consumer."); + } + // state changed so inc counter + stateCounter++; + consumers.notifyAll(); + } + } + + public void removeConsumer(String key, KameletConsumer consumer) { + synchronized (consumers) { + consumers.remove(key, consumer); + // state changed so inc counter + stateCounter++; + consumers.notifyAll(); + } + } + + protected KameletConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException { + synchronized (consumers) { + KameletConsumer answer = consumers.get(key); + if (answer == null && block) { + StopWatch watch = new StopWatch(); + for (;;) { + answer = consumers.get(key); + if (answer != null) { + break; + } + long rem = timeout - watch.taken(); + if (rem <= 0) { + break; + } + consumers.wait(rem); + } + } + return answer; + } + } + @Override protected void doInit() throws Exception { getCamelContext().addLifecycleStrategy(lifecycleHandler); @@ -278,13 +332,12 @@ public class KameletComponent extends DefaultComponent { } @Override - protected void doStop() throws Exception { + protected void doShutdown() throws Exception { getCamelContext().getLifecycleStrategies().remove(lifecycleHandler); - ServiceHelper.stopService(consumers.values()); + ServiceHelper.stopAndShutdownService(consumers); consumers.clear(); - - super.doStop(); + super.doShutdown(); } /* diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java index c99d56c..36123de 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java @@ -23,8 +23,14 @@ import org.apache.camel.spi.ShutdownAware; import org.apache.camel.support.DefaultConsumer; final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable { - public KameletConsumer(KameletEndpoint endpoint, Processor processor) { + + private final KameletComponent component; + private final String key; + + public KameletConsumer(KameletEndpoint endpoint, Processor processor, String key) { super(endpoint, processor); + this.component = endpoint.getComponent(); + this.key = key; } @Override @@ -34,22 +40,25 @@ final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Su @Override protected void doStart() throws Exception { - getEndpoint().addConsumer(this); + super.doStart(); + component.addConsumer(key, this); } @Override protected void doStop() throws Exception { - getEndpoint().removeConsumer(this); + component.removeConsumer(key, this); + super.doStop(); } @Override protected void doSuspend() throws Exception { - getEndpoint().removeConsumer(this); + component.removeConsumer(key, this); } @Override protected void doResume() throws Exception { - getEndpoint().addConsumer(this); + // resume by using the start logic + component.addConsumer(key, this); } @Override diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java new file mode 100644 index 0000000..44f1f4e --- /dev/null +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; + +public class KameletConsumerNotAvailableException extends CamelExchangeException { + + public KameletConsumerNotAvailableException(String message, Exchange exchange) { + super(message, exchange); + } +} diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java index c3760f3..415fba7 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java @@ -30,20 +30,22 @@ import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.support.DefaultEndpoint; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.StopWatch; @UriEndpoint( - firstVersion = "3.5.0", - scheme = "kamelet", - syntax = "kamelet:templateId/routeId", - title = "Kamelet", - lenientProperties = true, - category = Category.CORE) + firstVersion = "3.5.0", + scheme = "kamelet", + syntax = "kamelet:templateId/routeId", + title = "Kamelet", + lenientProperties = true, + category = Category.CORE) public class KameletEndpoint extends DefaultEndpoint { + + private final String key; + @Metadata(required = true) @UriPath(description = "The Route Template ID") private final String templateId; - @Metadata(required = false) + @Metadata @UriPath(description = "The Route ID", defaultValueNote = "The ID will be auto-generated if not provided") private final String routeId; @@ -51,18 +53,16 @@ public class KameletEndpoint extends DefaultEndpoint { private boolean block = true; @UriParam(label = "producer", defaultValue = "30000") private long timeout = 30000L; - @UriParam(label = "producer", defaultValue = "true") - + @UriParam(label = "producer") private final Map<String, Object> kameletProperties; - private final Map<String, KameletConsumer> consumers; - private final String key; + @UriParam(label = "producer", defaultValue = "true") + private boolean failIfNoConsumers = true; public KameletEndpoint( String uri, KameletComponent component, String templateId, - String routeId, - Map<String, KameletConsumer> consumers) { + String routeId) { super(uri, component); @@ -73,7 +73,6 @@ public class KameletEndpoint extends DefaultEndpoint { this.routeId = routeId; this.key = templateId + "/" + routeId; this.kameletProperties = new HashMap<>(); - this.consumers = consumers; } public boolean isBlock() { @@ -101,6 +100,18 @@ public class KameletEndpoint extends DefaultEndpoint { this.timeout = timeout; } + public boolean isFailIfNoConsumers() { + return failIfNoConsumers; + } + + /** + * Whether the producer should fail by throwing an exception, when sending to a kamelet endpoint with no active + * consumers. + */ + public void setFailIfNoConsumers(boolean failIfNoConsumers) { + this.failIfNoConsumers = failIfNoConsumers; + } + @Override public KameletComponent getComponent() { return (KameletComponent) super.getComponent(); @@ -140,58 +151,14 @@ public class KameletEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - return new KameletProducer(this); + return new KameletProducer(this, key); } @Override public Consumer createConsumer(Processor processor) throws Exception { - Consumer answer = new KameletConsumer(this, processor); + Consumer answer = new KameletConsumer(this, processor, key); configureConsumer(answer); return answer; } - // ********************************* - // - // Helpers - // - // ********************************* - - void addConsumer(KameletConsumer consumer) { - synchronized (consumers) { - if (consumers.putIfAbsent(key, consumer) != null) { - throw new IllegalArgumentException( - "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer."); - } - consumers.notifyAll(); - } - } - - void removeConsumer(KameletConsumer consumer) { - synchronized (consumers) { - consumers.remove(key, consumer); - consumers.notifyAll(); - } - } - - KameletConsumer getConsumer() throws InterruptedException { - synchronized (consumers) { - KameletConsumer answer = consumers.get(key); - if (answer == null && block) { - StopWatch watch = new StopWatch(); - for (; ; ) { - answer =consumers.get(key); - if (answer != null) { - break; - } - long rem = timeout - watch.taken(); - if (rem <= 0) { - break; - } - consumers.wait(rem); - } - } - - return answer; - } - } } \ No newline at end of file diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java index 10bd42c..726c22d 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java @@ -17,49 +17,68 @@ package org.apache.camel.component.kamelet; import org.apache.camel.AsyncCallback; -import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultAsyncProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class KameletProducer extends DefaultAsyncProducer { - public KameletProducer(KameletEndpoint endpoint) { - super(endpoint); - } - @Override - public KameletEndpoint getEndpoint() { - return (KameletEndpoint)super.getEndpoint(); + private static final Logger LOG = LoggerFactory.getLogger(KameletProducer.class); + + private volatile KameletConsumer consumer; + private int stateCounter; + + private final KameletEndpoint endpoint; + private final KameletComponent component; + private final String key; + private final boolean block; + private final long timeout; + + public KameletProducer(KameletEndpoint endpoint, String key) { + super(endpoint); + this.endpoint = endpoint; + this.component = endpoint.getComponent(); + this.key = key; + this.block = endpoint.isBlock(); + this.timeout = endpoint.getTimeout(); } @Override public void process(Exchange exchange) throws Exception { - final KameletConsumer consumer = getEndpoint().getConsumer(); - - if (consumer != null) { - consumer.getProcessor().process(exchange); + if (consumer == null || stateCounter != component.getStateCounter()) { + stateCounter = component.getStateCounter(); + consumer = component.getConsumer(key, block, timeout); + } + if (consumer == null) { + if (endpoint.isFailIfNoConsumers()) { + throw new KameletConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } else { + LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint); + } } else { - exchange.setException( - new CamelExchangeException( - "No consumers available on endpoint: " + getEndpoint(), exchange) - ); + consumer.getProcessor().process(exchange); } } @Override public boolean process(Exchange exchange, AsyncCallback callback) { try { - final KameletConsumer consumer = getEndpoint().getConsumer(); - - if (consumer != null) { - return consumer.getAsyncProcessor().process(exchange, callback); - } else { - exchange.setException( - new CamelExchangeException( - "No consumers available on endpoint: " + getEndpoint(), exchange) - ); - + if (consumer == null || stateCounter != component.getStateCounter()) { + stateCounter = component.getStateCounter(); + consumer = component.getConsumer(key, block, timeout); + } + if (consumer == null) { + if (endpoint.isFailIfNoConsumers()) { + exchange.setException(new KameletConsumerNotAvailableException( + "No consumers available on endpoint: " + endpoint, exchange)); + } else { + LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint); + } callback.done(true); return true; + } else { + return consumer.getAsyncProcessor().process(exchange, callback); } } catch (Exception e) { exchange.setException(e); @@ -67,4 +86,5 @@ final class KameletProducer extends DefaultAsyncProducer { return true; } } + }