CAMEL-8231: camel-stomp should detect failure when sending.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/95b06f27 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/95b06f27 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/95b06f27 Branch: refs/heads/camel-2.14.x Commit: 95b06f27da31f676aee550423c6005f36ef8c107 Parents: 8ed7818 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Feb 15 09:40:21 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Feb 15 09:42:24 2015 +0100 ---------------------------------------------------------------------- .../camel/component/stomp/StompEndpoint.java | 20 ++++++++++++++++---- .../camel/component/stomp/StompProducer.java | 4 ++-- 2 files changed, 18 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/95b06f27/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java index d2fbb0e..b78e3dc 100644 --- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java @@ -19,9 +19,9 @@ package org.apache.camel.component.stomp; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.camel.AsyncCallback; import org.apache.camel.Consumer; import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; @@ -118,14 +118,26 @@ public class StompEndpoint extends DefaultEndpoint { connection.close(null); } - protected void send(Message message) { + protected void send(final Exchange exchange, final AsyncCallback callback) { final StompFrame frame = new StompFrame(SEND); frame.addHeader(DESTINATION, StompFrame.encodeHeader(destination)); - frame.content(utf8(message.getBody().toString())); + frame.content(utf8(exchange.getIn().getBody().toString())); + connection.getDispatchQueue().execute(new Task() { @Override public void run() { - connection.send(frame, null); + connection.send(frame, new Callback<Void>() { + @Override + public void onFailure(Throwable e) { + exchange.setException(e); + callback.done(false); + } + + @Override + public void onSuccess(Void v) { + callback.done(false); + } + }); } }); } http://git-wip-us.apache.org/repos/asf/camel/blob/95b06f27/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java index 74b8b63..81371b4 100644 --- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompProducer.java @@ -32,11 +32,11 @@ public class StompProducer extends DefaultAsyncProducer implements Processor { public boolean process(Exchange exchange, AsyncCallback callback) { try { - stompEndpoint.send(exchange.getIn()); + stompEndpoint.send(exchange, callback); + return false; } catch (Exception e) { exchange.setException(e); } - callback.done(true); return true; }