Repository: camel
Updated Branches:
  refs/heads/master db2b2c33e -> f686fcb0d


CAMEL-7662 MQTTProducerTest fails once enables it

The assertion expects MQTT publish/disconnect call to be submitted as a async 
task, i.e. enqueued in a dispatch queue


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f686fcb0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f686fcb0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f686fcb0

Branch: refs/heads/master
Commit: f686fcb0d3c1a97a561474eb75d934310b8f9e44
Parents: db2b2c3
Author: Tomohisa Igarashi <tm.igara...@gmail.com>
Authored: Sun Nov 16 16:55:45 2014 +0900
Committer: Tomohisa Igarashi <tm.igara...@gmail.com>
Committed: Sun Nov 16 18:57:30 2014 +0900

----------------------------------------------------------------------
 components/camel-mqtt/pom.xml                   |  2 --
 .../camel/component/mqtt/MQTTEndpoint.java      | 30 +++++++++++++-------
 2 files changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f686fcb0/components/camel-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mqtt/pom.xml b/components/camel-mqtt/pom.xml
index 240a0ce..3c7af31 100644
--- a/components/camel-mqtt/pom.xml
+++ b/components/camel-mqtt/pom.xml
@@ -80,8 +80,6 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
                     <forkMode>perTest</forkMode>
-                    <!--CAMEL-7662 disabling the assertion this time-->
-                    <enableAssertions>false</enableAssertions>
                 </configuration>
             </plugin>
              <plugin>

http://git-wip-us.apache.org/repos/asf/camel/blob/f686fcb0/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
index 07014ad..664116f 100644
--- 
a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
+++ 
b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.Task;
 import org.fusesource.mqtt.client.Callback;
 import org.fusesource.mqtt.client.CallbackConnection;
 import org.fusesource.mqtt.client.Listener;
@@ -114,15 +115,19 @@ public class MQTTEndpoint extends DefaultEndpoint {
     protected void doStop() throws Exception {
         if (connection != null) {
             final Promise<Void> promise = new Promise<Void>();
-            connection.disconnect(new Callback<Void>() {
-                public void onSuccess(Void value) {
-                    promise.onSuccess(value);
-                }
+            connection.getDispatchQueue().execute(new Task() {
+                @Override
+                public void run() {
+                    connection.disconnect(new Callback<Void>() {
+                        public void onSuccess(Void value) {
+                            promise.onSuccess(value);
+                        }
 
-                public void onFailure(Throwable value) {
-                    promise.onFailure(value);
-                }
-            });
+                        public void onFailure(Throwable value) {
+                            promise.onFailure(value);
+                        }
+                    });
+            }});
             promise.await(configuration.getDisconnectWaitInSeconds(), 
TimeUnit.SECONDS);
         }
         super.doStop();
@@ -169,8 +174,13 @@ public class MQTTEndpoint extends DefaultEndpoint {
         return connected;
     }
  
-    void publish(String topic, byte[] payload, QoS qoS, boolean retain, 
Callback<Void> callback) throws Exception {
-        connection.publish(topic, payload, qoS, retain, callback);
+    void publish(final String topic, final byte[] payload, final QoS qoS, 
final boolean retain, final Callback<Void> callback) throws Exception {
+        connection.getDispatchQueue().execute(new Task() {
+            @Override
+            public void run() {
+                connection.publish(topic, payload, qoS, retain, callback);
+            }
+        });
     }
 
     void addConsumer(MQTTConsumer consumer) {

Reply via email to