CAMEL-7662 MQTTProducerTest fails once enables it

The assertion expects MQTT publish/disconnect call to be submitted as a async 
task, i.e. enqueued in a dispatch queue

Conflicts:
        components/camel-mqtt/pom.xml


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1fc9b6c0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1fc9b6c0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1fc9b6c0

Branch: refs/heads/camel-2.13.x
Commit: 1fc9b6c068047346c793326d6810affe4afb1939
Parents: 9b790e5
Author: Tomohisa Igarashi <tm.igara...@gmail.com>
Authored: Sun Nov 16 16:55:45 2014 +0900
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Mon Nov 17 12:23:55 2014 +0800

----------------------------------------------------------------------
 components/camel-mqtt/pom.xml                   |  3 --
 .../camel/component/mqtt/MQTTEndpoint.java      | 30 +++++++++++++-------
 2 files changed, 20 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1fc9b6c0/components/camel-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/pom.xml b/components/camel-mqtt/pom.xml
index f4ae747..cf1e43c 100644
--- a/components/camel-mqtt/pom.xml
+++ b/components/camel-mqtt/pom.xml
@@ -80,9 +80,6 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
                     <forkMode>perTest</forkMode>
-                    <excludes>
-                        <exclude>**/MQTTProducerTest.*</exclude>
-                    </excludes>
                 </configuration>
             </plugin>
              <plugin>

http://git-wip-us.apache.org/repos/asf/camel/blob/1fc9b6c0/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 12952bc..5940b74 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.Task;
 import org.fusesource.mqtt.client.Callback;
 import org.fusesource.mqtt.client.CallbackConnection;
 import org.fusesource.mqtt.client.Listener;
@@ -114,15 +115,19 @@ public class MQTTEndpoint extends DefaultEndpoint {
     protected void doStop() throws Exception {
         if (connection != null) {
             final Promise<Void> promise = new Promise<Void>();
-            connection.disconnect(new Callback<Void>() {
-                public void onSuccess(Void value) {
-                    promise.onSuccess(value);
-                }
+            connection.getDispatchQueue().execute(new Task() {
+                @Override
+                public void run() {
+                    connection.disconnect(new Callback<Void>() {
+                        public void onSuccess(Void value) {
+                            promise.onSuccess(value);
+                        }
 
-                public void onFailure(Throwable value) {
-                    promise.onFailure(value);
-                }
-            });
+                        public void onFailure(Throwable value) {
+                            promise.onFailure(value);
+                        }
+                    });
+            }});
             promise.await(configuration.getDisconnectWaitInSeconds(), 
TimeUnit.SECONDS);
         }
         super.doStop();
@@ -169,8 +174,13 @@ public class MQTTEndpoint extends DefaultEndpoint {
         return connected;
     }
  
-    void publish(String topic, byte[] payload, QoS qoS, boolean retain, 
Callback<Void> callback) throws Exception {
-        connection.publish(topic, payload, qoS, retain, callback);
+    void publish(final String topic, final byte[] payload, final QoS qoS, 
final boolean retain, final Callback<Void> callback) throws Exception {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                connection.publish(topic, payload, qoS, retain, callback);
+            }
+        });
     }
 
     void addConsumer(MQTTConsumer consumer) {

Reply via email to