Repository: camel Updated Branches: refs/heads/master db2b2c33e -> f686fcb0d
CAMEL-7662 MQTTProducerTest fails once enables it The assertion expects MQTT publish/disconnect call to be submitted as a async task, i.e. enqueued in a dispatch queue Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f686fcb0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f686fcb0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f686fcb0 Branch: refs/heads/master Commit: f686fcb0d3c1a97a561474eb75d934310b8f9e44 Parents: db2b2c3 Author: Tomohisa Igarashi <tm.igara...@gmail.com> Authored: Sun Nov 16 16:55:45 2014 +0900 Committer: Tomohisa Igarashi <tm.igara...@gmail.com> Committed: Sun Nov 16 18:57:30 2014 +0900 ---------------------------------------------------------------------- components/camel-mqtt/pom.xml | 2 -- .../camel/component/mqtt/MQTTEndpoint.java | 30 +++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f686fcb0/components/camel-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/pom.xml b/components/camel-mqtt/pom.xml index 240a0ce..3c7af31 100644 --- a/components/camel-mqtt/pom.xml +++ b/components/camel-mqtt/pom.xml @@ -80,8 +80,6 @@ <artifactId>maven-surefire-plugin</artifactId> <configuration> <forkMode>perTest</forkMode> - <!--CAMEL-7662 disabling the assertion this time--> - <enableAssertions>false</enableAssertions> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/camel/blob/f686fcb0/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index 07014ad..664116f 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -27,6 +27,7 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.hawtdispatch.Task; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.Listener; @@ -114,15 +115,19 @@ public class MQTTEndpoint extends DefaultEndpoint { protected void doStop() throws Exception { if (connection != null) { final Promise<Void> promise = new Promise<Void>(); - connection.disconnect(new Callback<Void>() { - public void onSuccess(Void value) { - promise.onSuccess(value); - } + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + connection.disconnect(new Callback<Void>() { + public void onSuccess(Void value) { + promise.onSuccess(value); + } - public void onFailure(Throwable value) { - promise.onFailure(value); - } - }); + public void onFailure(Throwable value) { + promise.onFailure(value); + } + }); + }}); promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS); } super.doStop(); @@ -169,8 +174,13 @@ public class MQTTEndpoint extends DefaultEndpoint { return connected; } - void publish(String topic, byte[] payload, QoS qoS, boolean retain, Callback<Void> callback) throws Exception { - connection.publish(topic, payload, qoS, retain, callback); + void publish(final String topic, final byte[] payload, final QoS qoS, final boolean retain, final Callback<Void> callback) throws Exception { + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + connection.publish(topic, payload, qoS, retain, callback); + } + }); } void addConsumer(MQTTConsumer consumer) {