Repository: camel Updated Branches: refs/heads/master 3d46316cd -> 368be192d
[Paho] Added MqttConnectOptions autodetection. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/368be192 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/368be192 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/368be192 Branch: refs/heads/master Commit: 368be192d18c34e35ba4ee8ec0bab9b91f25a7dc Parents: 3d46316 Author: Henryk Konsek <hekon...@gmail.com> Authored: Thu Mar 19 22:40:10 2015 +0100 Committer: Henryk Konsek <hekon...@gmail.com> Committed: Thu Mar 19 22:40:10 2015 +0100 ---------------------------------------------------------------------- .../apache/camel/component/paho/PahoEndpoint.java | 15 +++++++++++++++ .../camel/component/paho/PahoComponentTest.java | 11 +++++++++++ 2 files changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/368be192/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java index a83380d..1c2e46a 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.paho; +import java.util.Set; + import static java.lang.System.nanoTime; import org.apache.camel.Component; @@ -32,12 +34,16 @@ import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.camel.component.paho.PahoPersistence.MEMORY; @UriEndpoint(scheme = "paho", consumerClass = PahoConsumer.class, label = "messaging", syntax = "paho:topic") public class PahoEndpoint extends DefaultEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(PahoEndpoint.class); + // Constants private static final String DEFAULT_BROKER_URL = "tcp://localhost:1883"; @@ -119,6 +125,15 @@ public class PahoEndpoint extends DefaultEndpoint { if (connectOptions != null) { return connectOptions; } + Set<MqttConnectOptions> connectOptions = getCamelContext().getRegistry().findByType(MqttConnectOptions.class); + if (connectOptions.size() == 1) { + LOG.info("Single MqttConnectOptions instance found in the registry. It will be used by the endpoint."); + return connectOptions.iterator().next(); + } else if (connectOptions.size() > 1) { + LOG.warn("Found {} instances of the MqttConnectOptions in the registry. None of these will be used by the endpoint. " + + "Please use 'connectOptions' endpoint option to select one.", + connectOptions.size()); + } return new MqttConnectOptions(); } http://git-wip-us.apache.org/repos/asf/camel/blob/368be192/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java index 43674da..905f6fd 100644 --- a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java @@ -121,6 +121,17 @@ public class PahoComponentTest extends CamelTestSupport { } @Test + public void shouldAutomaticallyUseConnectionOptionsFromRegistry() { + // Given + PahoEndpoint pahoWithConnectOptionsFromRegistry = getMandatoryEndpoint( + "paho:registryConnectOptions?brokerUrl=tcp://localhost:" + mqttPort, + PahoEndpoint.class); + + // Then + assertSame(connectOptions, pahoWithConnectOptionsFromRegistry.resolveMqttConnectOptions()); + } + + @Test public void shouldKeepOriginalMessageInHeader() throws InterruptedException { // Given final String msg = "msg";