Repository: camel Updated Branches: refs/heads/master 38d51034a -> c6a5e030f
CAMEL-8432 MQTT wildcard ('+') subscription broken with thanks to Mark Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c6a5e030 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c6a5e030 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c6a5e030 Branch: refs/heads/master Commit: c6a5e030ffa30edd28a3daa0e91cc745037cd0eb Parents: 38d5103 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Mar 4 20:26:44 2015 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Mar 4 20:27:16 2015 +0800 ---------------------------------------------------------------------- .../camel/component/mqtt/MQTTComponent.java | 5 ++ .../camel/component/mqtt/MQTTBaseTest.java | 4 +- .../component/mqtt/MQTTConfigurationTest.java | 12 ++++ .../mqtt/MQTTConsumerWildcardTopicsTest.java | 69 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java index 7b03a83..6caa305 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java @@ -54,6 +54,11 @@ public class MQTTComponent extends UriEndpointComponent { return endpoint; } + @Override + public boolean useRawUri() { + return true; // to prevent MQTT "+" wildcard from being lost + } + public String getHost() { return host; } http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java index 54bf0d8..f535de2 100644 --- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTBaseTest.java @@ -28,9 +28,11 @@ public abstract class MQTTBaseTest extends CamelTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(MQTTBaseTest.class); protected static final String TEST_TOPIC = "ComponentTestTopic"; protected static final String TEST_TOPIC_2 = "AnotherTestTopic"; + protected static final String TEST_WILDCARD_TOPIC = "base/+/#"; protected static final String TEST_TOPICS = TEST_TOPIC + "," + TEST_TOPIC_2; + protected static final String TEST_TOPICS_WITH_WILDCARDS = TEST_TOPICS + "," + TEST_WILDCARD_TOPIC; protected BrokerService brokerService; - protected int numberOfMessages = 100; + protected int numberOfMessages = 10; public void setUp() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java index e66e6a1..bba2fa3 100644 --- a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConfigurationTest.java @@ -45,4 +45,16 @@ public class MQTTConfigurationTest extends MQTTBaseTest { assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicNames(), TEST_TOPICS); assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain()); } + + @Test + public void testWildcardSubscribeTopicsConfiguration() throws Exception { + Endpoint endpoint = context.getEndpoint("mqtt:todo?byDefaultRetain=true&qualityOfService=exactlyOnce&publishTopicName=" + TEST_TOPIC + "&subscribeTopicNames=" + TEST_TOPICS_WITH_WILDCARDS); + assertTrue("Endpoint not a MQTTEndpoint: " + endpoint, endpoint instanceof MQTTEndpoint); + MQTTEndpoint mqttEndpoint = (MQTTEndpoint) endpoint; + + assertEquals(mqttEndpoint.getConfiguration().getQoS(), QoS.EXACTLY_ONCE); + assertEquals(mqttEndpoint.getConfiguration().getPublishTopicName(), TEST_TOPIC); + assertEquals(mqttEndpoint.getConfiguration().getSubscribeTopicNames(), TEST_TOPICS_WITH_WILDCARDS); + assertTrue(mqttEndpoint.getConfiguration().isByDefaultRetain()); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c6a5e030/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java new file mode 100644 index 0000000..8337320 --- /dev/null +++ b/components/camel-mqtt/src/test/java/org/apache/camel/component/mqtt/MQTTConsumerWildcardTopicsTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mqtt; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.junit.Test; + +public class MQTTConsumerWildcardTopicsTest extends MQTTBaseTest { + + private static final String[] PUBLISH_TOPICS = { + TEST_TOPIC, + TEST_TOPIC_2, + "base", // doesn't match wildcard + "base/foo", // matches + "base/foo/bar", // matches + "base/bat/data/baz/splat" // matches + }; + + @Test + public void testConsumeMultipleTopicsWithWildcards() throws Exception { + MQTT mqtt = new MQTT(); + BlockingConnection publisherConnection = mqtt.blockingConnection(); + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(numberOfMessages * (PUBLISH_TOPICS.length - 1)); + + publisherConnection.connect(); + String payload; + for (String topic : PUBLISH_TOPICS) { + for (int i = 0; i < numberOfMessages; i++) { + payload = "Topic " + topic + ", Message " + i; + publisherConnection.publish(topic, payload.getBytes(), QoS.AT_LEAST_ONCE, false); + } + } + + mock.await(5, TimeUnit.SECONDS); + mock.assertIsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() { + + return new RouteBuilder() { + public void configure() { + from("mqtt:bar?subscribeTopicNames=" + TEST_TOPICS_WITH_WILDCARDS) + .transform(body().convertToString()) + .to("mock:result"); + } + }; + } +}