Repository: camel Updated Branches: refs/heads/master 05b97d4bd -> fec035a51
CAMEL-9873: Component should provide detail if a consumer/producer is native async supported Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fec035a5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fec035a5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fec035a5 Branch: refs/heads/master Commit: fec035a515b6e71d1d059bf319ce6f9b181c6a86 Parents: e3f391b Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Apr 18 09:59:30 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Apr 18 10:02:26 2016 +0200 ---------------------------------------------------------------------- .../component/scheduler/SchedulerEndpoint.java | 3 +- .../camel/component/timer/TimerEndpoint.java | 3 +- .../apache/camel/component/ahc/AhcEndpoint.java | 3 +- .../camel/component/amqp/AMQPEndpoint.java | 3 +- .../camel/component/avro/AvroEndpoint.java | 3 +- .../component/beanstalk/BeanstalkEndpoint.java | 3 +- .../crypto/DigitalSignatureProducer.java | 6 +-- .../apache/camel/component/cxf/CxfEndpoint.java | 3 +- .../component/disruptor/DisruptorEndpoint.java | 3 +- .../component/jetty8/JettyHttpEndpoint8.java | 3 +- .../component/jetty9/JettyHttpEndpoint9.java | 3 +- .../apache/camel/component/jms/JmsEndpoint.java | 3 +- .../camel/component/mqtt/MQTTEndpoint.java | 3 +- .../component/netty/http/NettyHttpEndpoint.java | 3 +- .../camel/component/netty/NettyEndpoint.java | 3 +- .../netty4/http/NettyHttpEndpoint.java | 3 +- .../camel/component/netty4/NettyEndpoint.java | 3 +- .../camel/component/paho/PahoProducer.java | 2 - .../component/pgevent/PgEventProducer.java | 40 ++++++++------------ .../component/rabbitmq/RabbitMQEndpoint.java | 3 +- .../component/restlet/RestletEndpoint.java | 3 +- .../routebox/seda/RouteboxSedaEndpoint.java | 3 +- .../camel/component/sjms/SjmsEndpoint.java | 3 +- .../camel/component/stomp/StompEndpoint.java | 3 +- .../component/undertow/UndertowEndpoint.java | 3 +- .../camel/component/vertx/VertxEndpoint.java | 3 +- 26 files changed, 60 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java index 1430030..8f03af8 100644 --- a/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java @@ -18,7 +18,6 @@ package org.apache.camel.component.scheduler; import java.util.concurrent.ScheduledExecutorService; -import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -35,7 +34,7 @@ import org.apache.camel.spi.UriPath; * Also this component uses JDK ScheduledExecutorService. Where as the timer uses a JDK Timer. */ @UriEndpoint(scheme = "scheduler", title = "Scheduler", syntax = "scheduler:name", consumerOnly = true, consumerClass = SchedulerConsumer.class, label = "core,scheduling") -public class SchedulerEndpoint extends ScheduledPollEndpoint implements AsyncEndpoint { +public class SchedulerEndpoint extends ScheduledPollEndpoint { @UriPath @Metadata(required = "true") private String name; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java index e3c705d..9473bc7 100644 --- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java @@ -19,7 +19,6 @@ package org.apache.camel.component.timer; import java.util.Date; import java.util.Timer; -import org.apache.camel.AsyncEndpoint; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.MultipleConsumersSupport; @@ -41,7 +40,7 @@ import org.apache.camel.spi.UriPath; */ @ManagedResource(description = "Managed TimerEndpoint") @UriEndpoint(scheme = "timer", title = "Timer", syntax = "timer:timerName", consumerOnly = true, consumerClass = TimerConsumer.class, label = "core,scheduling") -public class TimerEndpoint extends DefaultEndpoint implements AsyncEndpoint, MultipleConsumersSupport { +public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { @UriPath @Metadata(required = "true") private String timerName; @UriParam(defaultValue = "1000") http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java index 9a2c098..89b2c8e 100644 --- a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java +++ b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java @@ -22,6 +22,7 @@ import javax.net.ssl.SSLContext; import com.ning.http.client.AsyncHttpClient; import com.ning.http.client.AsyncHttpClientConfig; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -39,7 +40,7 @@ import org.apache.camel.util.jsse.SSLContextParameters; * To call external HTTP services using <a href="http://github.com/sonatype/async-http-client">Async Http Client</a>. */ @UriEndpoint(scheme = "ahc", title = "AHC", syntax = "ahc:httpUri", producerOnly = true, label = "http", lenientProperties = true) -public class AhcEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { +public class AhcEndpoint extends DefaultEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware { private AsyncHttpClient client; @UriPath @Metadata(required = "true") http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java index ec6e95a..70d7a8a 100644 --- a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java +++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.amqp; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.component.jms.JmsConsumer; import org.apache.camel.component.jms.JmsEndpoint; import org.apache.camel.spi.UriEndpoint; @@ -28,6 +29,6 @@ import org.apache.camel.spi.UriEndpoint; */ @UriEndpoint(scheme = "amqp", extendsScheme = "jms", title = "AMQP", syntax = "amqp:destinationType:destinationName", consumerClass = JmsConsumer.class, label = "messaging") -public class AMQPEndpoint extends JmsEndpoint { +public class AMQPEndpoint extends JmsEndpoint implements AsyncEndpoint { } http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java index 8ea5720..4117f8a 100644 --- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java +++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java @@ -19,6 +19,7 @@ package org.apache.camel.component.avro; import org.apache.avro.Protocol; import org.apache.avro.Schema; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -32,7 +33,7 @@ import org.apache.camel.spi.UriParam; * Working with Apache Avro for data serialization. */ @UriEndpoint(scheme = "avro", title = "Avro", syntax = "avro:transport:host:port/messageName", consumerClass = AvroConsumer.class, label = "messaging,transformation") -public abstract class AvroEndpoint extends DefaultEndpoint { +public abstract class AvroEndpoint extends DefaultEndpoint implements AsyncEndpoint { @UriParam private AvroConfiguration configuration; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java index 3fe4f77..4864157 100644 --- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java @@ -17,6 +17,7 @@ package org.apache.camel.component.beanstalk; import com.surftools.BeanstalkClient.Client; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -37,7 +38,7 @@ import org.apache.camel.spi.UriPath; * The beanstalk component is used for job retrieval and post-processing of Beanstalk jobs. */ @UriEndpoint(scheme = "beanstalk", title = "Beanstalk", syntax = "beanstalk:connectionSettings", consumerClass = BeanstalkConsumer.class, label = "messaging") -public class BeanstalkEndpoint extends ScheduledPollEndpoint { +public class BeanstalkEndpoint extends ScheduledPollEndpoint implements AsyncEndpoint { final ConnectionSettings conn; @UriPath(description = "Connection settings host:port/tube") http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java b/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java index b39cf13..342fa35 100644 --- a/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java +++ b/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java @@ -32,10 +32,6 @@ public class DigitalSignatureProducer extends DefaultProducer { } public void process(Exchange exchange) throws Exception { - try { - processor.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } + processor.process(exchange); } } http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java index 2794100..b9d6749 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java @@ -40,6 +40,7 @@ import javax.xml.ws.Provider; import javax.xml.ws.WebServiceProvider; import javax.xml.ws.handler.Handler; +import org.apache.camel.AsyncEndpoint; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -113,7 +114,7 @@ import org.slf4j.LoggerFactory; * The cxf component is used for SOAP WebServices using Apache CXF. */ @UriEndpoint(scheme = "cxf", title = "CXF", syntax = "cxf:beanId:address", consumerClass = CxfConsumer.class, label = "soap,webservice") -public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, Service, Cloneable { +public class CxfEndpoint extends DefaultEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware, Service, Cloneable { private static final Logger LOG = LoggerFactory.getLogger(CxfEndpoint.class); http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java index ec758a8..ce23073 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import com.lmax.disruptor.InsufficientCapacityException; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory; */ @ManagedResource(description = "Managed Disruptor Endpoint") @UriEndpoint(scheme = "disruptor,disruptor-vm", title = "Disruptor,Disruptor VM", syntax = "disruptor:name", consumerClass = DisruptorConsumer.class, label = "endpoint") -public class DisruptorEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { +public class DisruptorEndpoint extends DefaultEndpoint implements AsyncEndpoint, MultipleConsumersSupport { public static final String DISRUPTOR_IGNORE_EXCHANGE = "disruptor.ignoreExchange"; private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorEndpoint.class); http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java ---------------------------------------------------------------------- diff --git a/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java b/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java index c6d766f..215e499 100644 --- a/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java +++ b/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java @@ -19,6 +19,7 @@ package org.apache.camel.component.jetty8; import java.net.URI; import java.net.URISyntaxException; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.component.jetty.JettyContentExchange; import org.apache.camel.component.jetty.JettyHttpComponent; import org.apache.camel.component.jetty.JettyHttpEndpoint; @@ -30,7 +31,7 @@ import org.apache.camel.spi.UriEndpoint; */ @UriEndpoint(scheme = "jetty", extendsScheme = "http", title = "Jetty", syntax = "jetty:httpUri", consumerClass = HttpConsumer.class, label = "http", lenientProperties = true) -public class JettyHttpEndpoint8 extends JettyHttpEndpoint { +public class JettyHttpEndpoint8 extends JettyHttpEndpoint implements AsyncEndpoint { public JettyHttpEndpoint8(JettyHttpComponent component, String uri, URI httpURL) throws URISyntaxException { super(component, uri, httpURL); http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java ---------------------------------------------------------------------- diff --git a/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java b/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java index add21e4..7f00a58 100644 --- a/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java +++ b/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java @@ -19,6 +19,7 @@ package org.apache.camel.component.jetty9; import java.net.URI; import java.net.URISyntaxException; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.component.jetty.JettyContentExchange; import org.apache.camel.component.jetty.JettyHttpComponent; import org.apache.camel.component.jetty.JettyHttpEndpoint; @@ -31,7 +32,7 @@ import org.apache.camel.spi.UriEndpoint; */ @UriEndpoint(scheme = "jetty", extendsScheme = "http", title = "Jetty 9", syntax = "jetty:httpUri", consumerClass = HttpConsumer.class, label = "http", lenientProperties = true) -public class JettyHttpEndpoint9 extends JettyHttpEndpoint { +public class JettyHttpEndpoint9 extends JettyHttpEndpoint implements AsyncEndpoint { private HttpBinding binding; public JettyHttpEndpoint9(JettyHttpComponent component, String uri, URI httpURL) throws URISyntaxException { http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index b1c0038..c671e75 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -30,6 +30,7 @@ import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.Topic; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.LoggingLevel; @@ -69,7 +70,7 @@ import org.springframework.util.ErrorHandler; */ @ManagedResource(description = "Managed JMS Endpoint") @UriEndpoint(scheme = "jms", title = "JMS", syntax = "jms:destinationType:destinationName", consumerClass = JmsConsumer.class, label = "messaging") -public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service { +public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware, MultipleConsumersSupport, Service { protected final Logger log = LoggerFactory.getLogger(getClass()); private final AtomicInteger runningMessageListeners = new AtomicInteger(); private boolean pubSubDomain; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index d079778..71e333e 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -21,6 +21,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -61,7 +62,7 @@ import org.slf4j.LoggerFactory; * Component for communicating with MQTT M2M message brokers using FuseSource MQTT Client. */ @UriEndpoint(scheme = "mqtt", title = "MQTT", syntax = "mqtt:name", consumerClass = MQTTConsumer.class, label = "messaging,iot") -public class MQTTEndpoint extends DefaultEndpoint { +public class MQTTEndpoint extends DefaultEndpoint implements AsyncEndpoint { private static final Logger LOG = LoggerFactory.getLogger(MQTTEndpoint.class); private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java index 93fba20..545591b 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java @@ -18,6 +18,7 @@ package org.apache.camel.component.netty.http; import java.util.Map; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory; syntax = "netty-http:protocol:host:port/path", consumerClass = NettyHttpConsumer.class, label = "http", lenientProperties = true, excludeProperties = "textline,delimiter,autoAppendDelimiter,decoderMaxLineLength,encoding,allowDefaultCodec,udpConnectionlessSending,networkInterface" + ",clientMode,reconnect,reconnectInterval,broadcast") -public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStrategyAware { +public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware { private static final Logger LOG = LoggerFactory.getLogger(NettyHttpEndpoint.class); @UriParam http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java index b86bfbc..6d5fa6a 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java @@ -22,6 +22,7 @@ import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; import javax.security.cert.X509Certificate; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -41,7 +42,7 @@ import org.jboss.netty.util.Timer; * Socket level networking using TCP or UDP with the Netty 3.x library. */ @UriEndpoint(scheme = "netty", title = "Netty", syntax = "netty:protocol:host:port", consumerClass = NettyConsumer.class, label = "networking,tcp,udp") -public class NettyEndpoint extends DefaultEndpoint { +public class NettyEndpoint extends DefaultEndpoint implements AsyncEndpoint { @UriParam private NettyConfiguration configuration; @UriParam(label = "advanced", javaType = "org.apache.camel.component.netty.NettyServerBootstrapConfiguration", http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java index 894a606..43adfcf 100644 --- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java +++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java @@ -20,6 +20,7 @@ import java.util.Map; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory; syntax = "netty4-http:protocol:host:port/path", consumerClass = NettyHttpConsumer.class, label = "http", lenientProperties = true, excludeProperties = "textline,delimiter,autoAppendDelimiter,decoderMaxLineLength,encoding,allowDefaultCodec,udpConnectionlessSending,networkInterface" + ",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast") -public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStrategyAware { +public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware { private static final Logger LOG = LoggerFactory.getLogger(NettyHttpEndpoint.class); @UriParam http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java index 04dbc11..b16e45c 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java @@ -26,6 +26,7 @@ import javax.security.cert.X509Certificate; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.ssl.SslHandler; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -41,7 +42,7 @@ import org.apache.camel.util.ObjectHelper; * Socket level networking using TCP or UDP with the Netty 4.x library. */ @UriEndpoint(scheme = "netty4", title = "Netty4", syntax = "netty4:protocol:host:port", consumerClass = NettyConsumer.class, label = "networking,tcp,udp") -public class NettyEndpoint extends DefaultEndpoint { +public class NettyEndpoint extends DefaultEndpoint implements AsyncEndpoint { @UriParam private NettyConfiguration configuration; @UriParam(label = "advanced", javaType = "org.apache.camel.component.netty4.NettyServerBootstrapConfiguration", http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java index cdd360a..777f52d 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java @@ -43,8 +43,6 @@ public class PahoProducer extends DefaultProducer { client.publish(topic, message); } - - @Override public PahoEndpoint getEndpoint() { return (PahoEndpoint)super.getEndpoint(); http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java index a586a22..6b21304 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java @@ -18,18 +18,16 @@ package org.apache.camel.component.pgevent; import java.sql.CallableStatement; import java.sql.PreparedStatement; -import java.sql.SQLException; import com.impossibl.postgres.api.jdbc.PGConnection; -import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.impl.DefaultProducer; /** * The PgEvent producer. */ -public class PgEventProducer extends DefaultAsyncProducer { +public class PgEventProducer extends DefaultProducer { private final PgEventEndpoint endpoint; private PGConnection dbConnection; @@ -39,36 +37,28 @@ public class PgEventProducer extends DefaultAsyncProducer { } @Override - public boolean process(final Exchange exchange, final AsyncCallback callback) { + public void process(Exchange exchange) throws Exception { try { if (dbConnection.isClosed()) { dbConnection = endpoint.initJdbc(); } } catch (Exception e) { - exchange.setException(new InvalidStateException("Database connection closed and could not be re-opened.", e)); - callback.done(true); - return true; + throw new InvalidStateException("Database connection closed and could not be re-opened.", e); } - try { - String payload = exchange.getIn().getBody(String.class); - if (dbConnection.isServerMinimumVersion(9, 0)) { - try (CallableStatement statement = dbConnection.prepareCall("{call pg_notify(?, ?)}")) { - statement.setString(1, endpoint.getChannel()); - statement.setString(2, payload); - statement.execute(); - } - } else { - String sql = String.format("NOTIFY %s, '%s'", endpoint.getChannel(), payload); - try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { - statement.execute(); - } + String payload = exchange.getIn().getBody(String.class); + if (dbConnection.isServerMinimumVersion(9, 0)) { + try (CallableStatement statement = dbConnection.prepareCall("{call pg_notify(?, ?)}")) { + statement.setString(1, endpoint.getChannel()); + statement.setString(2, payload); + statement.execute(); + } + } else { + String sql = String.format("NOTIFY %s, '%s'", endpoint.getChannel(), payload); + try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { + statement.execute(); } - } catch (SQLException e) { - exchange.setException(e); } - callback.done(true); - return true; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 71721cf..145bd49 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -33,6 +33,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -45,7 +46,7 @@ import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging") -public class RabbitMQEndpoint extends DefaultEndpoint { +public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint { // header to indicate that the message body needs to be de-serialized public static final String SERIALIZE_HEADER = "CamelSerialize"; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java index 5fb8e43..63ae6fa 100644 --- a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java +++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java @@ -19,6 +19,7 @@ package org.apache.camel.component.restlet; import java.util.List; import java.util.Map; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -40,7 +41,7 @@ import org.restlet.data.Method; */ @UriEndpoint(scheme = "restlet", title = "Restlet", syntax = "restlet:protocol:host:port/uriPattern", consumerClass = RestletConsumer.class, label = "rest", lenientProperties = true) -public class RestletEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { +public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware { private static final int DEFAULT_PORT = 80; private static final String DEFAULT_PROTOCOL = "http"; private static final String DEFAULT_HOST = "localhost"; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java index bd99705..0bebcd2 100644 --- a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java +++ b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.MultipleConsumersSupport; @@ -37,7 +38,7 @@ import org.apache.camel.component.routebox.RouteboxEndpoint; import org.apache.camel.component.routebox.RouteboxProducer; import org.apache.camel.spi.BrowsableEndpoint; -public class RouteboxSedaEndpoint extends RouteboxEndpoint implements BrowsableEndpoint, MultipleConsumersSupport { +public class RouteboxSedaEndpoint extends RouteboxEndpoint implements AsyncEndpoint, BrowsableEndpoint, MultipleConsumersSupport { private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; private volatile BlockingQueue<Exchange> queue; private volatile Set<RouteboxProducer> producers = new CopyOnWriteArraySet<RouteboxProducer>(); http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index 6a68a34..272e816 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -19,6 +19,7 @@ package org.apache.camel.component.sjms; import javax.jms.Message; import javax.jms.Session; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory; * This component uses plain JMS API where as the jms component uses Spring JMS. */ @UriEndpoint(scheme = "sjms", title = "Simple JMS", syntax = "sjms:destinationType:destinationName", consumerClass = SjmsConsumer.class, label = "messaging") -public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, HeaderFilterStrategyAware { +public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, MultipleConsumersSupport, HeaderFilterStrategyAware { protected final Logger logger = LoggerFactory.getLogger(getClass()); private boolean topic; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java index f8b73b2..5e600f9 100644 --- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -49,7 +50,7 @@ import static org.fusesource.stomp.client.Constants.UNSUBSCRIBE; * The stomp component is used for communicating with Stomp compliant message brokers. */ @UriEndpoint(scheme = "stomp", title = "Stomp", syntax = "stomp:destination", consumerClass = StompConsumer.class, label = "messaging") -public class StompEndpoint extends DefaultEndpoint { +public class StompEndpoint extends DefaultEndpoint implements AsyncEndpoint { @UriPath(description = "Name of the queue") @Metadata(required = "true") private String destination; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java index 8a19679..1123c29 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java @@ -23,6 +23,7 @@ import java.util.Map; import javax.net.ssl.SSLContext; import io.undertow.server.HttpServerExchange; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -48,7 +49,7 @@ import org.xnio.Options; */ @UriEndpoint(scheme = "undertow", title = "Undertow", syntax = "undertow:httpURI", consumerClass = UndertowConsumer.class, label = "http", lenientProperties = true) -public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { +public class UndertowEndpoint extends DefaultEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware { private static final Logger LOG = LoggerFactory.getLogger(UndertowEndpoint.class); private UndertowComponent component; http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java index 7fbdccd..aeca5e6 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java @@ -18,6 +18,7 @@ package org.apache.camel.component.vertx; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; +import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; @@ -32,7 +33,7 @@ import org.apache.camel.spi.UriPath; * The vertx component is used for sending and receive messages from a vertx event bus. */ @UriEndpoint(scheme = "vertx", title = "Vert.x", syntax = "vertx:address", consumerClass = VertxConsumer.class, label = "eventbus") -public class VertxEndpoint extends DefaultEndpoint implements MultipleConsumersSupport { +public class VertxEndpoint extends DefaultEndpoint implements AsyncEndpoint, MultipleConsumersSupport { @UriPath @Metadata(required = "true") private String address;