Repository: camel
Updated Branches:
  refs/heads/master 05b97d4bd -> fec035a51


CAMEL-9873: Component should provide detail if a consumer/producer is native 
async supported


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fec035a5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fec035a5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fec035a5

Branch: refs/heads/master
Commit: fec035a515b6e71d1d059bf319ce6f9b181c6a86
Parents: e3f391b
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Apr 18 09:59:30 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Apr 18 10:02:26 2016 +0200

----------------------------------------------------------------------
 .../component/scheduler/SchedulerEndpoint.java  |  3 +-
 .../camel/component/timer/TimerEndpoint.java    |  3 +-
 .../apache/camel/component/ahc/AhcEndpoint.java |  3 +-
 .../camel/component/amqp/AMQPEndpoint.java      |  3 +-
 .../camel/component/avro/AvroEndpoint.java      |  3 +-
 .../component/beanstalk/BeanstalkEndpoint.java  |  3 +-
 .../crypto/DigitalSignatureProducer.java        |  6 +--
 .../apache/camel/component/cxf/CxfEndpoint.java |  3 +-
 .../component/disruptor/DisruptorEndpoint.java  |  3 +-
 .../component/jetty8/JettyHttpEndpoint8.java    |  3 +-
 .../component/jetty9/JettyHttpEndpoint9.java    |  3 +-
 .../apache/camel/component/jms/JmsEndpoint.java |  3 +-
 .../camel/component/mqtt/MQTTEndpoint.java      |  3 +-
 .../component/netty/http/NettyHttpEndpoint.java |  3 +-
 .../camel/component/netty/NettyEndpoint.java    |  3 +-
 .../netty4/http/NettyHttpEndpoint.java          |  3 +-
 .../camel/component/netty4/NettyEndpoint.java   |  3 +-
 .../camel/component/paho/PahoProducer.java      |  2 -
 .../component/pgevent/PgEventProducer.java      | 40 ++++++++------------
 .../component/rabbitmq/RabbitMQEndpoint.java    |  3 +-
 .../component/restlet/RestletEndpoint.java      |  3 +-
 .../routebox/seda/RouteboxSedaEndpoint.java     |  3 +-
 .../camel/component/sjms/SjmsEndpoint.java      |  3 +-
 .../camel/component/stomp/StompEndpoint.java    |  3 +-
 .../component/undertow/UndertowEndpoint.java    |  3 +-
 .../camel/component/vertx/VertxEndpoint.java    |  3 +-
 26 files changed, 60 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java
 
b/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java
index 1430030..8f03af8 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/scheduler/SchedulerEndpoint.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.scheduler;
 
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -35,7 +34,7 @@ import org.apache.camel.spi.UriPath;
  * Also this component uses JDK ScheduledExecutorService. Where as the timer 
uses a JDK Timer.
  */
 @UriEndpoint(scheme = "scheduler", title = "Scheduler", syntax = 
"scheduler:name", consumerOnly = true, consumerClass = SchedulerConsumer.class, 
label = "core,scheduling")
-public class SchedulerEndpoint extends ScheduledPollEndpoint implements 
AsyncEndpoint {
+public class SchedulerEndpoint extends ScheduledPollEndpoint {
 
     @UriPath @Metadata(required = "true")
     private String name;

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
index e3c705d..9473bc7 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.timer;
 import java.util.Date;
 import java.util.Timer;
 
-import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.MultipleConsumersSupport;
@@ -41,7 +40,7 @@ import org.apache.camel.spi.UriPath;
  */
 @ManagedResource(description = "Managed TimerEndpoint")
 @UriEndpoint(scheme = "timer", title = "Timer", syntax = "timer:timerName", 
consumerOnly = true, consumerClass = TimerConsumer.class, label = 
"core,scheduling")
-public class TimerEndpoint extends DefaultEndpoint implements AsyncEndpoint, 
MultipleConsumersSupport {
+public class TimerEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport {
     @UriPath @Metadata(required = "true")
     private String timerName;
     @UriParam(defaultValue = "1000")

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
 
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
index 9a2c098..89b2c8e 100644
--- 
a/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
+++ 
b/components/camel-ahc/src/main/java/org/apache/camel/component/ahc/AhcEndpoint.java
@@ -22,6 +22,7 @@ import javax.net.ssl.SSLContext;
 
 import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.AsyncHttpClientConfig;
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -39,7 +40,7 @@ import org.apache.camel.util.jsse.SSLContextParameters;
  * To call external HTTP services using <a 
href="http://github.com/sonatype/async-http-client";>Async Http Client</a>.
  */
 @UriEndpoint(scheme = "ahc", title = "AHC", syntax = "ahc:httpUri", 
producerOnly = true, label = "http", lenientProperties = true)
-public class AhcEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
+public class AhcEndpoint extends DefaultEndpoint implements AsyncEndpoint, 
HeaderFilterStrategyAware {
 
     private AsyncHttpClient client;
     @UriPath @Metadata(required = "true")

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java
 
b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java
index ec6e95a..70d7a8a 100644
--- 
a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java
+++ 
b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.amqp;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.component.jms.JmsConsumer;
 import org.apache.camel.component.jms.JmsEndpoint;
 import org.apache.camel.spi.UriEndpoint;
@@ -28,6 +29,6 @@ import org.apache.camel.spi.UriEndpoint;
  */
 @UriEndpoint(scheme = "amqp", extendsScheme = "jms", title = "AMQP",
         syntax = "amqp:destinationType:destinationName", consumerClass = 
JmsConsumer.class, label = "messaging")
-public class AMQPEndpoint extends JmsEndpoint {
+public class AMQPEndpoint extends JmsEndpoint implements AsyncEndpoint {
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
 
b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
index 8ea5720..4117f8a 100644
--- 
a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
+++ 
b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.avro;
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -32,7 +33,7 @@ import org.apache.camel.spi.UriParam;
  * Working with Apache Avro for data serialization.
  */
 @UriEndpoint(scheme = "avro", title = "Avro", syntax = 
"avro:transport:host:port/messageName", consumerClass = AvroConsumer.class, 
label = "messaging,transformation")
-public abstract class AvroEndpoint extends DefaultEndpoint {
+public abstract class AvroEndpoint extends DefaultEndpoint implements 
AsyncEndpoint {
 
     @UriParam
     private AvroConfiguration configuration;

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
 
b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
index 3fe4f77..4864157 100644
--- 
a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
+++ 
b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.beanstalk;
 
 import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -37,7 +38,7 @@ import org.apache.camel.spi.UriPath;
  * The beanstalk component is used for job retrieval and post-processing of 
Beanstalk jobs.
  */
 @UriEndpoint(scheme = "beanstalk", title = "Beanstalk", syntax = 
"beanstalk:connectionSettings", consumerClass = BeanstalkConsumer.class, label 
= "messaging")
-public class BeanstalkEndpoint extends ScheduledPollEndpoint {
+public class BeanstalkEndpoint extends ScheduledPollEndpoint implements 
AsyncEndpoint {
     final ConnectionSettings conn;
 
     @UriPath(description = "Connection settings host:port/tube")

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java
 
b/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java
index b39cf13..342fa35 100644
--- 
a/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java
+++ 
b/components/camel-crypto/src/main/java/org/apache/camel/component/crypto/DigitalSignatureProducer.java
@@ -32,10 +32,6 @@ public class DigitalSignatureProducer extends 
DefaultProducer {
     }
 
     public void process(Exchange exchange) throws Exception {
-        try {
-            processor.process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
+        processor.process(exchange);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
index 2794100..b9d6749 100644
--- 
a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
+++ 
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
@@ -40,6 +40,7 @@ import javax.xml.ws.Provider;
 import javax.xml.ws.WebServiceProvider;
 import javax.xml.ws.handler.Handler;
 
+import org.apache.camel.AsyncEndpoint;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -113,7 +114,7 @@ import org.slf4j.LoggerFactory;
  * The cxf component is used for SOAP WebServices using Apache CXF.
  */
 @UriEndpoint(scheme = "cxf", title = "CXF", syntax = "cxf:beanId:address", 
consumerClass = CxfConsumer.class, label = "soap,webservice")
-public class CxfEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware, Service, Cloneable {
+public class CxfEndpoint extends DefaultEndpoint implements AsyncEndpoint, 
HeaderFilterStrategyAware, Service, Cloneable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CxfEndpoint.class);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
index ec758a8..ce23073 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 import com.lmax.disruptor.InsufficientCapacityException;
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory;
  */
 @ManagedResource(description = "Managed Disruptor Endpoint")
 @UriEndpoint(scheme = "disruptor,disruptor-vm", title = "Disruptor,Disruptor 
VM", syntax = "disruptor:name", consumerClass = DisruptorConsumer.class, label 
= "endpoint")
-public class DisruptorEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport {
+public class DisruptorEndpoint extends DefaultEndpoint implements 
AsyncEndpoint, MultipleConsumersSupport {
     public static final String DISRUPTOR_IGNORE_EXCHANGE = 
"disruptor.ignoreExchange";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DisruptorEndpoint.class);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java
 
b/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java
index c6d766f..215e499 100644
--- 
a/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java
+++ 
b/components/camel-jetty8/src/main/java/org/apache/camel/component/jetty8/JettyHttpEndpoint8.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.jetty8;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.component.jetty.JettyContentExchange;
 import org.apache.camel.component.jetty.JettyHttpComponent;
 import org.apache.camel.component.jetty.JettyHttpEndpoint;
@@ -30,7 +31,7 @@ import org.apache.camel.spi.UriEndpoint;
  */
 @UriEndpoint(scheme = "jetty", extendsScheme = "http", title = "Jetty",
         syntax = "jetty:httpUri", consumerClass = HttpConsumer.class, label = 
"http", lenientProperties = true)
-public class JettyHttpEndpoint8 extends JettyHttpEndpoint {
+public class JettyHttpEndpoint8 extends JettyHttpEndpoint implements 
AsyncEndpoint {
 
     public JettyHttpEndpoint8(JettyHttpComponent component, String uri, URI 
httpURL) throws URISyntaxException {
         super(component, uri, httpURL);

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java
 
b/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java
index add21e4..7f00a58 100644
--- 
a/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java
+++ 
b/components/camel-jetty9/src/main/java/org/apache/camel/component/jetty9/JettyHttpEndpoint9.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.jetty9;
 import java.net.URI;
 import java.net.URISyntaxException;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.component.jetty.JettyContentExchange;
 import org.apache.camel.component.jetty.JettyHttpComponent;
 import org.apache.camel.component.jetty.JettyHttpEndpoint;
@@ -31,7 +32,7 @@ import org.apache.camel.spi.UriEndpoint;
  */
 @UriEndpoint(scheme = "jetty", extendsScheme = "http", title = "Jetty 9",
         syntax = "jetty:httpUri", consumerClass = HttpConsumer.class, label = 
"http", lenientProperties = true)
-public class JettyHttpEndpoint9 extends JettyHttpEndpoint {
+public class JettyHttpEndpoint9 extends JettyHttpEndpoint implements 
AsyncEndpoint {
     private HttpBinding binding;
 
     public JettyHttpEndpoint9(JettyHttpComponent component, String uri, URI 
httpURL) throws URISyntaxException {

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index b1c0038..c671e75 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -30,6 +30,7 @@ import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.LoggingLevel;
@@ -69,7 +70,7 @@ import org.springframework.util.ErrorHandler;
  */
 @ManagedResource(description = "Managed JMS Endpoint")
 @UriEndpoint(scheme = "jms", title = "JMS", syntax = 
"jms:destinationType:destinationName", consumerClass = JmsConsumer.class, label 
= "messaging")
-public class JmsEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
+public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, 
HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
     protected final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicInteger runningMessageListeners = new AtomicInteger();
     private boolean pubSubDomain;

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index d079778..71e333e 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -61,7 +62,7 @@ import org.slf4j.LoggerFactory;
  * Component for communicating with MQTT M2M message brokers using FuseSource 
MQTT Client.
  */
 @UriEndpoint(scheme = "mqtt", title = "MQTT", syntax = "mqtt:name", 
consumerClass = MQTTConsumer.class, label = "messaging,iot")
-public class MQTTEndpoint extends DefaultEndpoint {
+public class MQTTEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(MQTTEndpoint.class);
 
     private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3;

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
index 93fba20..545591b 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.netty.http;
 
 import java.util.Map;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
         syntax = "netty-http:protocol:host:port/path", consumerClass = 
NettyHttpConsumer.class, label = "http", lenientProperties = true,
         excludeProperties = 
"textline,delimiter,autoAppendDelimiter,decoderMaxLineLength,encoding,allowDefaultCodec,udpConnectionlessSending,networkInterface"
                 + ",clientMode,reconnect,reconnectInterval,broadcast")
-public class NettyHttpEndpoint extends NettyEndpoint implements 
HeaderFilterStrategyAware {
+public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, 
HeaderFilterStrategyAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyHttpEndpoint.class);
     @UriParam

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
index b86bfbc..6d5fa6a 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
@@ -22,6 +22,7 @@ import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 import javax.security.cert.X509Certificate;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -41,7 +42,7 @@ import org.jboss.netty.util.Timer;
  * Socket level networking using TCP or UDP with the Netty 3.x library.
  */
 @UriEndpoint(scheme = "netty", title = "Netty", syntax = 
"netty:protocol:host:port", consumerClass = NettyConsumer.class, label = 
"networking,tcp,udp")
-public class NettyEndpoint extends DefaultEndpoint {
+public class NettyEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     @UriParam
     private NettyConfiguration configuration;
     @UriParam(label = "advanced", javaType = 
"org.apache.camel.component.netty.NettyServerBootstrapConfiguration",

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
index 894a606..43adfcf 100644
--- 
a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
+++ 
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpEndpoint.java
@@ -20,6 +20,7 @@ import java.util.Map;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory;
         syntax = "netty4-http:protocol:host:port/path", consumerClass = 
NettyHttpConsumer.class, label = "http", lenientProperties = true,
         excludeProperties = 
"textline,delimiter,autoAppendDelimiter,decoderMaxLineLength,encoding,allowDefaultCodec,udpConnectionlessSending,networkInterface"
                 + 
",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast")
-public class NettyHttpEndpoint extends NettyEndpoint implements 
HeaderFilterStrategyAware {
+public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, 
HeaderFilterStrategyAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyHttpEndpoint.class);
     @UriParam

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
index 04dbc11..b16e45c 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
@@ -26,6 +26,7 @@ import javax.security.cert.X509Certificate;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.ssl.SslHandler;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -41,7 +42,7 @@ import org.apache.camel.util.ObjectHelper;
  * Socket level networking using TCP or UDP with the Netty 4.x library.
  */
 @UriEndpoint(scheme = "netty4", title = "Netty4", syntax = 
"netty4:protocol:host:port", consumerClass = NettyConsumer.class, label = 
"networking,tcp,udp")
-public class NettyEndpoint extends DefaultEndpoint {
+public class NettyEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     @UriParam
     private NettyConfiguration configuration;
     @UriParam(label = "advanced", javaType = 
"org.apache.camel.component.netty4.NettyServerBootstrapConfiguration",

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
index cdd360a..777f52d 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
@@ -43,8 +43,6 @@ public class PahoProducer extends DefaultProducer {
         client.publish(topic, message);
     }
 
-   
-
     @Override
     public PahoEndpoint getEndpoint() {
         return (PahoEndpoint)super.getEndpoint();

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
index a586a22..6b21304 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventProducer.java
@@ -18,18 +18,16 @@ package org.apache.camel.component.pgevent;
 
 import java.sql.CallableStatement;
 import java.sql.PreparedStatement;
-import java.sql.SQLException;
 
 import com.impossibl.postgres.api.jdbc.PGConnection;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.impl.DefaultProducer;
 
 /**
  * The PgEvent producer.
  */
-public class PgEventProducer extends DefaultAsyncProducer {
+public class PgEventProducer extends DefaultProducer {
     private final PgEventEndpoint endpoint;
     private PGConnection dbConnection;
 
@@ -39,36 +37,28 @@ public class PgEventProducer extends DefaultAsyncProducer {
     }
 
     @Override
-    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+    public void process(Exchange exchange) throws Exception {
         try {
             if (dbConnection.isClosed()) {
                 dbConnection = endpoint.initJdbc();
             }
         } catch (Exception e) {
-            exchange.setException(new InvalidStateException("Database 
connection closed and could not be re-opened.", e));
-            callback.done(true);
-            return true;
+            throw new InvalidStateException("Database connection closed and 
could not be re-opened.", e);
         }
 
-        try {
-            String payload = exchange.getIn().getBody(String.class);
-            if (dbConnection.isServerMinimumVersion(9, 0)) {
-                try (CallableStatement statement = 
dbConnection.prepareCall("{call pg_notify(?, ?)}")) {
-                    statement.setString(1, endpoint.getChannel());
-                    statement.setString(2, payload);
-                    statement.execute();
-                }
-            } else {
-                String sql = String.format("NOTIFY %s, '%s'", 
endpoint.getChannel(), payload);
-                try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
-                    statement.execute();
-                }
+        String payload = exchange.getIn().getBody(String.class);
+        if (dbConnection.isServerMinimumVersion(9, 0)) {
+            try (CallableStatement statement = dbConnection.prepareCall("{call 
pg_notify(?, ?)}")) {
+                statement.setString(1, endpoint.getChannel());
+                statement.setString(2, payload);
+                statement.execute();
+            }
+        } else {
+            String sql = String.format("NOTIFY %s, '%s'", 
endpoint.getChannel(), payload);
+            try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
+                statement.execute();
             }
-        } catch (SQLException e) {
-            exchange.setException(e);
         }
-        callback.done(true);
-        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 71721cf..145bd49 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -33,6 +33,7 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -45,7 +46,7 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
 @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = 
"rabbitmq:hostname:portNumber/exchangeName", consumerClass = 
RabbitMQConsumer.class, label = "messaging")
-public class RabbitMQEndpoint extends DefaultEndpoint {
+public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint 
{
     // header to indicate that the message body needs to be de-serialized
     public static final String SERIALIZE_HEADER = "CamelSerialize";
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
 
b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
index 5fb8e43..63ae6fa 100644
--- 
a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
+++ 
b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.restlet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -40,7 +41,7 @@ import org.restlet.data.Method;
  */
 @UriEndpoint(scheme = "restlet", title = "Restlet", syntax = 
"restlet:protocol:host:port/uriPattern",
         consumerClass = RestletConsumer.class, label = "rest", 
lenientProperties = true)
-public class RestletEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
+public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, 
HeaderFilterStrategyAware {
     private static final int DEFAULT_PORT = 80;
     private static final String DEFAULT_PROTOCOL = "http";
     private static final String DEFAULT_HOST = "localhost";

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
 
b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
index bd99705..0bebcd2 100644
--- 
a/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
+++ 
b/components/camel-routebox/src/main/java/org/apache/camel/component/routebox/seda/RouteboxSedaEndpoint.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.MultipleConsumersSupport;
@@ -37,7 +38,7 @@ import org.apache.camel.component.routebox.RouteboxEndpoint;
 import org.apache.camel.component.routebox.RouteboxProducer;
 import org.apache.camel.spi.BrowsableEndpoint;
 
-public class RouteboxSedaEndpoint extends RouteboxEndpoint implements 
BrowsableEndpoint, MultipleConsumersSupport {
+public class RouteboxSedaEndpoint extends RouteboxEndpoint implements 
AsyncEndpoint, BrowsableEndpoint, MultipleConsumersSupport {
     private WaitForTaskToComplete waitForTaskToComplete = 
WaitForTaskToComplete.IfReplyExpected;
     private volatile BlockingQueue<Exchange> queue;
     private volatile Set<RouteboxProducer> producers = new 
CopyOnWriteArraySet<RouteboxProducer>();

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 6a68a34..272e816 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.sjms;
 import javax.jms.Message;
 import javax.jms.Session;
 
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
  * This component uses plain JMS API where as the jms component uses Spring 
JMS.
  */
 @UriEndpoint(scheme = "sjms", title = "Simple JMS", syntax = 
"sjms:destinationType:destinationName", consumerClass = SjmsConsumer.class, 
label = "messaging")
-public class SjmsEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport, HeaderFilterStrategyAware {
+public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, 
MultipleConsumersSupport, HeaderFilterStrategyAware {
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
     private boolean topic;

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
index f8b73b2..5e600f9 100644
--- 
a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
+++ 
b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -49,7 +50,7 @@ import static 
org.fusesource.stomp.client.Constants.UNSUBSCRIBE;
  * The stomp component is used for communicating with Stomp compliant message 
brokers.
  */
 @UriEndpoint(scheme = "stomp", title = "Stomp", syntax = "stomp:destination", 
consumerClass = StompConsumer.class, label = "messaging")
-public class StompEndpoint extends DefaultEndpoint {
+public class StompEndpoint extends DefaultEndpoint implements AsyncEndpoint {
 
     @UriPath(description = "Name of the queue") @Metadata(required = "true")
     private String destination;

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
index 8a19679..1123c29 100644
--- 
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
+++ 
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import javax.net.ssl.SSLContext;
 
 import io.undertow.server.HttpServerExchange;
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -48,7 +49,7 @@ import org.xnio.Options;
  */
 @UriEndpoint(scheme = "undertow", title = "Undertow", syntax = 
"undertow:httpURI",
         consumerClass = UndertowConsumer.class, label = "http", 
lenientProperties = true)
-public class UndertowEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
+public class UndertowEndpoint extends DefaultEndpoint implements 
AsyncEndpoint, HeaderFilterStrategyAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(UndertowEndpoint.class);
     private UndertowComponent component;

http://git-wip-us.apache.org/repos/asf/camel/blob/fec035a5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
index 7fbdccd..aeca5e6 100644
--- 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.vertx;
 
 import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.EventBus;
+import org.apache.camel.AsyncEndpoint;
 import org.apache.camel.Consumer;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
@@ -32,7 +33,7 @@ import org.apache.camel.spi.UriPath;
  * The vertx component is used for sending and receive messages from a vertx 
event bus.
  */
 @UriEndpoint(scheme = "vertx", title = "Vert.x", syntax = "vertx:address", 
consumerClass = VertxConsumer.class, label = "eventbus")
-public class VertxEndpoint extends DefaultEndpoint implements 
MultipleConsumersSupport {
+public class VertxEndpoint extends DefaultEndpoint implements AsyncEndpoint, 
MultipleConsumersSupport {
 
     @UriPath @Metadata(required = "true")
     private String address;

Reply via email to