CAMEL-7662 MQTTProducerTest fails once enables it The assertion expects MQTT publish/disconnect call to be submitted as a async task, i.e. enqueued in a dispatch queue
Conflicts: components/camel-mqtt/pom.xml Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1fc9b6c0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1fc9b6c0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1fc9b6c0 Branch: refs/heads/camel-2.13.x Commit: 1fc9b6c068047346c793326d6810affe4afb1939 Parents: 9b790e5 Author: Tomohisa Igarashi <tm.igara...@gmail.com> Authored: Sun Nov 16 16:55:45 2014 +0900 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Mon Nov 17 12:23:55 2014 +0800 ---------------------------------------------------------------------- components/camel-mqtt/pom.xml | 3 -- .../camel/component/mqtt/MQTTEndpoint.java | 30 +++++++++++++------- 2 files changed, 20 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1fc9b6c0/components/camel-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/pom.xml b/components/camel-mqtt/pom.xml index f4ae747..cf1e43c 100644 --- a/components/camel-mqtt/pom.xml +++ b/components/camel-mqtt/pom.xml @@ -80,9 +80,6 @@ <artifactId>maven-surefire-plugin</artifactId> <configuration> <forkMode>perTest</forkMode> - <excludes> - <exclude>**/MQTTProducerTest.*</exclude> - </excludes> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/camel/blob/1fc9b6c0/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index 12952bc..5940b74 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -27,6 +27,7 @@ import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.hawtdispatch.Task; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.Listener; @@ -114,15 +115,19 @@ public class MQTTEndpoint extends DefaultEndpoint { protected void doStop() throws Exception { if (connection != null) { final Promise<Void> promise = new Promise<Void>(); - connection.disconnect(new Callback<Void>() { - public void onSuccess(Void value) { - promise.onSuccess(value); - } + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + connection.disconnect(new Callback<Void>() { + public void onSuccess(Void value) { + promise.onSuccess(value); + } - public void onFailure(Throwable value) { - promise.onFailure(value); - } - }); + public void onFailure(Throwable value) { + promise.onFailure(value); + } + }); + }}); promise.await(configuration.getDisconnectWaitInSeconds(), TimeUnit.SECONDS); } super.doStop(); @@ -169,8 +174,13 @@ public class MQTTEndpoint extends DefaultEndpoint { return connected; } - void publish(String topic, byte[] payload, QoS qoS, boolean retain, Callback<Void> callback) throws Exception { - connection.publish(topic, payload, qoS, retain, callback); + void publish(final String topic, final byte[] payload, final QoS qoS, final boolean retain, final Callback<Void> callback) throws Exception { + connection.getDispatchQueue().execute(new Task() { + @Override + public void run() { + connection.publish(topic, payload, qoS, retain, callback); + } + }); } void addConsumer(MQTTConsumer consumer) {