Repository: camel Updated Branches: refs/heads/master 25c2a0f11 -> 969e7a33d
Component docs Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6a96fda0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6a96fda0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6a96fda0 Branch: refs/heads/master Commit: 6a96fda074c35e639a9bf4405c1674c695ce7891 Parents: 25c2a0f Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Jun 15 13:38:09 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Jun 15 14:04:41 2015 +0200 ---------------------------------------------------------------------- .../camel/component/mqtt/MQTTComponent.java | 9 + .../camel/component/mqtt/MQTTConfiguration.java | 173 +++++++++++++++---- .../camel/component/mqtt/MQTTEndpoint.java | 8 +- 3 files changed, 157 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/6a96fda0/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java index 6caa305..9a287ea 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTComponent.java @@ -63,6 +63,9 @@ public class MQTTComponent extends UriEndpointComponent { return host; } + /** + * The URI of the MQTT broker to connect too - this component also supports SSL - e.g. ssl://127.0.0.1:8883 + */ public void setHost(String host) { this.host = host; } @@ -71,6 +74,9 @@ public class MQTTComponent extends UriEndpointComponent { return userName; } + /** + * Username to be used for authentication against the MQTT broker + */ public void setUserName(String userName) { this.userName = userName; } @@ -79,6 +85,9 @@ public class MQTTComponent extends UriEndpointComponent { return password; } + /** + * Password to be used for authentication against the MQTT broker + */ public void setPassword(String password) { this.password = password; } http://git-wip-us.apache.org/repos/asf/camel/blob/6a96fda0/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java index 9bccf2e..e09bce8 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTConfiguration.java @@ -23,7 +23,7 @@ import javax.net.ssl.SSLContext; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; -import org.fusesource.hawtbuf.UTF8Buffer; + import org.fusesource.hawtdispatch.DispatchQueue; import org.fusesource.hawtdispatch.transport.TcpTransport; import org.fusesource.mqtt.client.MQTT; @@ -36,7 +36,7 @@ public class MQTTConfiguration extends MQTT { public static final String MQTT_PUBLISH_TOPIC = "CamelMQTTPublishTopic"; // inherited options from MQTT - @UriParam + @UriParam(defaultValue = "tcp://127.0.0.1:1883") URI host; @UriParam URI localAddress; @@ -70,18 +70,38 @@ public class MQTTConfiguration extends MQTT { long connectAttemptsMax = -1; @UriParam Tracer tracer; + @UriParam + String clientId; + @UriParam + boolean cleanSession; + @UriParam + short keepAlive; + @UriParam + String willTopic; + @UriParam + String willMessage; + @UriParam(defaultValue = "AT_MOST_ONCE") + QoS willQos = QoS.AT_MOST_ONCE; + @UriParam + QoS willRetain; + @UriParam(defaultValue = "3.1") + String version; /** * These a properties that are looked for in an Exchange - to publish to */ + @UriParam(defaultValue = "MQTTTopicPropertyName") private String mqttTopicPropertyName = "MQTTTopicPropertyName"; + @UriParam(defaultValue = "MQTTRetain") private String mqttRetainPropertyName = "MQTTRetain"; + @UriParam(defaultValue = "MQTTQos") private String mqttQosPropertyName = "MQTTQos"; /** * These are set on the Endpoint - together with properties inherited from MQTT */ @UriParam + @Deprecated private String subscribeTopicName = ""; @UriParam private String subscribeTopicNames = ""; @@ -95,14 +115,17 @@ public class MQTTConfiguration extends MQTT { private int sendWaitInSeconds = 5; @UriParam private boolean byDefaultRetain; - @UriParam - private QoS qos = QoS.AT_LEAST_ONCE; + @UriParam(enums = "AT_MOST_ONCE,AT_LEAST_ONCE,EXACTLY_ONCE", defaultValue = "AT_LEAST_ONCE") private String qualityOfService = QoS.AT_LEAST_ONCE.name(); + private QoS qos = QoS.AT_LEAST_ONCE; public String getQualityOfService() { return qualityOfService; } + /** + * Quality of service level to use for topics. + */ public void setQualityOfService(String qualityOfService) { this.qos = getQoS(qualityOfService); this.qualityOfService = qualityOfService; @@ -112,10 +135,15 @@ public class MQTTConfiguration extends MQTT { return qos; } + @Deprecated public String getSubscribeTopicName() { return subscribeTopicName; } + /** + * The name of the Topic to subscribe to for messages. + */ + @Deprecated public void setSubscribeTopicName(String subscribeTopicName) { this.subscribeTopicName = subscribeTopicName; } @@ -124,6 +152,14 @@ public class MQTTConfiguration extends MQTT { return subscribeTopicNames; } + /** + * A comma-delimited list of Topics to subscribe to for messages. + * Note that each item of this list can contain MQTT wildcards ('\+' and/or '#'), in order to subscribe + * to topics matching a certain pattern within a hierarchy. + * For example, '\+' is a wildcard for all topics at a level within the hierarchy, + * so if a broker has topics "topics/one" and "topics/two", then "topics/\+" can be used to subscribe to both. + * A caveat to consider here is that if the broker adds "topics/three", the route would also begin to receive messages from that topic. + */ public void setSubscribeTopicNames(String subscribeTopicNames) { this.subscribeTopicNames = subscribeTopicNames; } @@ -132,6 +168,9 @@ public class MQTTConfiguration extends MQTT { return publishTopicName; } + /** + * The default Topic to publish messages on + */ public void setPublishTopicName(String publishTopicName) { this.publishTopicName = publishTopicName; } @@ -156,6 +195,10 @@ public class MQTTConfiguration extends MQTT { return mqttRetainPropertyName; } + /** + * The property name to look for on an Exchange for an individual published message. + * If this is set (expects a Boolean value) - then the retain property will be set on the message sent to the MQTT message broker. + */ public void setMqttRetainPropertyName(String mqttRetainPropertyName) { this.mqttRetainPropertyName = mqttRetainPropertyName; } @@ -164,6 +207,10 @@ public class MQTTConfiguration extends MQTT { return mqttQosPropertyName; } + /** + * The property name to look for on an Exchange for an individual published message. + * If this is set (one of AtMostOnce, AtLeastOnce or ExactlyOnce ) - then that QoS will be set on the message sent to the MQTT message broker. + */ public void setMqttQosPropertyName(String mqttQosPropertyName) { this.mqttQosPropertyName = mqttQosPropertyName; } @@ -172,6 +219,9 @@ public class MQTTConfiguration extends MQTT { return connectWaitInSeconds; } + /** + * Delay in seconds the Component will wait for a connection to be established to the MQTT broker + */ public void setConnectWaitInSeconds(int connectWaitInSeconds) { this.connectWaitInSeconds = connectWaitInSeconds; } @@ -180,6 +230,9 @@ public class MQTTConfiguration extends MQTT { return disconnectWaitInSeconds; } + /** + * The number of seconds the Component will wait for a valid disconnect on stop() from the MQTT broker + */ public void setDisconnectWaitInSeconds(int disconnectWaitInSeconds) { this.disconnectWaitInSeconds = disconnectWaitInSeconds; } @@ -188,6 +241,9 @@ public class MQTTConfiguration extends MQTT { return sendWaitInSeconds; } + /** + * The maximum time the Component will wait for a receipt from the MQTT broker to acknowledge a published message before throwing an exception + */ public void setSendWaitInSeconds(int sendWaitInSeconds) { this.sendWaitInSeconds = sendWaitInSeconds; } @@ -196,6 +252,9 @@ public class MQTTConfiguration extends MQTT { return byDefaultRetain; } + /** + * The default retain policy to be used on messages sent to the MQTT broker + */ public void setByDefaultRetain(boolean byDefaultRetain) { this.byDefaultRetain = byDefaultRetain; } @@ -223,56 +282,68 @@ public class MQTTConfiguration extends MQTT { super.setTracer(tracer); } + /** + * Use to set the client Id of the session. + * This is what an MQTT server uses to identify a session where setCleanSession(false); is being used. + * The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp). + */ @Override public void setClientId(String clientId) { super.setClientId(clientId); } + /** + * Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true. + */ @Override - public void setClientId(UTF8Buffer clientId) { - super.setClientId(clientId); + public void setCleanSession(boolean cleanSession) { + super.setCleanSession(cleanSession); } + /** + * Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client. + * It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout. + */ @Override public void setKeepAlive(short keepAlive) { super.setKeepAlive(keepAlive); } + /** + * Password to be used for authentication against the MQTT broker + */ @Override public void setPassword(String password) { super.setPassword(password); } - @Override - public void setPassword(UTF8Buffer password) { - super.setPassword(password); - } - + /** + * Username to be used for authentication against the MQTT broker + */ @Override public void setUserName(String userName) { super.setUserName(userName); } - @Override - public void setUserName(UTF8Buffer userName) { - super.setUserName(userName); - } - + /** + * The Will message to send. Defaults to a zero length message. + */ @Override public void setWillMessage(String willMessage) { super.setWillMessage(willMessage); } - @Override - public void setWillMessage(UTF8Buffer willMessage) { - super.setWillMessage(willMessage); - } - + /** + * Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE. + */ @Override public void setWillQos(QoS willQos) { super.setWillQos(willQos); } + /** + * Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version. + */ @Override public void setVersion(String version) { super.setVersion(version); @@ -283,22 +354,23 @@ public class MQTTConfiguration extends MQTT { return super.getVersion(); } + /** + * Set to true if you want the Will to be published with the retain option. + */ @Override public void setWillRetain(boolean willRetain) { super.setWillRetain(willRetain); } + /** + * If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection. + */ @Override public void setWillTopic(String willTopic) { super.setWillTopic(willTopic); } @Override - public void setWillTopic(UTF8Buffer willTopic) { - super.setWillTopic(willTopic); - } - - @Override public Executor getBlockingExecutor() { return super.getBlockingExecutor(); } @@ -323,6 +395,9 @@ public class MQTTConfiguration extends MQTT { return super.getLocalAddress(); } + /** + * The local InetAddress and port to use + */ @Override public void setLocalAddress(String localAddress) throws URISyntaxException { super.setLocalAddress(localAddress); @@ -338,6 +413,10 @@ public class MQTTConfiguration extends MQTT { return super.getMaxReadRate(); } + /** + * Sets the maximum bytes per second that this transport will receive data at. + * This setting throttles reads so that the rate is not exceeded. Defaults to 0 which disables throttling. + */ @Override public void setMaxReadRate(int maxReadRate) { super.setMaxReadRate(maxReadRate); @@ -348,6 +427,10 @@ public class MQTTConfiguration extends MQTT { return super.getMaxWriteRate(); } + /** + * Sets the maximum bytes per second that this transport will send data at. + * This setting throttles writes so that the rate is not exceeded. Defaults to 0 which disables throttling. + */ @Override public void setMaxWriteRate(int maxWriteRate) { super.setMaxWriteRate(maxWriteRate); @@ -358,6 +441,9 @@ public class MQTTConfiguration extends MQTT { return super.getReceiveBufferSize(); } + /** + * Sets the size of the internal socket receive buffer. Defaults to 65536 (64k) + */ @Override public void setReceiveBufferSize(int receiveBufferSize) { super.setReceiveBufferSize(receiveBufferSize); @@ -368,16 +454,17 @@ public class MQTTConfiguration extends MQTT { return super.getHost(); } - @Override - public void setHost(String host, int port) throws URISyntaxException { - super.setHost(host, port); - } - + /** + * The URI of the MQTT broker to connect too - this component also supports SSL - e.g. ssl://127.0.0.1:8883 + */ @Override public void setHost(String host) throws URISyntaxException { super.setHost(host); } + /** + * The URI of the MQTT broker to connect too - this component also supports SSL - e.g. ssl://127.0.0.1:8883 + */ @Override public void setHost(URI host) { super.setHost(host); @@ -388,6 +475,9 @@ public class MQTTConfiguration extends MQTT { return super.getSendBufferSize(); } + /** + * Sets the size of the internal socket send buffer. Defaults to 65536 (64k) + */ @Override public void setSendBufferSize(int sendBufferSize) { super.setSendBufferSize(sendBufferSize); @@ -408,6 +498,10 @@ public class MQTTConfiguration extends MQTT { return super.getTrafficClass(); } + /** + * Sets traffic class or type-of-service octet in the IP header for packets sent from the transport. + * Defaults to 8 which means the traffic should be optimized for throughput. + */ @Override public void setTrafficClass(int trafficClass) { super.setTrafficClass(trafficClass); @@ -428,6 +522,10 @@ public class MQTTConfiguration extends MQTT { return super.getConnectAttemptsMax(); } + /** + * The maximum number of reconnect attempts before an error is reported back to the client on the first attempt + * by the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1. + */ @Override public void setConnectAttemptsMax(long connectAttemptsMax) { super.setConnectAttemptsMax(connectAttemptsMax); @@ -438,6 +536,10 @@ public class MQTTConfiguration extends MQTT { return super.getReconnectAttemptsMax(); } + /** + * The maximum number of reconnect attempts before an error is reported back to the client after a server + * connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1. + */ @Override public void setReconnectAttemptsMax(long reconnectAttemptsMax) { super.setReconnectAttemptsMax(reconnectAttemptsMax); @@ -448,6 +550,9 @@ public class MQTTConfiguration extends MQTT { return super.getReconnectBackOffMultiplier(); } + /** + * The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to 2. + */ @Override public void setReconnectBackOffMultiplier(double reconnectBackOffMultiplier) { super.setReconnectBackOffMultiplier(reconnectBackOffMultiplier); @@ -458,6 +563,9 @@ public class MQTTConfiguration extends MQTT { return super.getReconnectDelay(); } + /** + * How long to wait in ms before the first reconnect attempt. Defaults to 10. + */ @Override public void setReconnectDelay(long reconnectDelay) { super.setReconnectDelay(reconnectDelay); @@ -468,6 +576,9 @@ public class MQTTConfiguration extends MQTT { return super.getReconnectDelayMax(); } + /** + * The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000. + */ @Override public void setReconnectDelayMax(long reconnectDelayMax) { super.setReconnectDelayMax(reconnectDelayMax); http://git-wip-us.apache.org/repos/asf/camel/blob/6a96fda0/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java index cfd2eb9..92c8d17 100644 --- a/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java +++ b/components/camel-mqtt/src/main/java/org/apache/camel/component/mqtt/MQTTEndpoint.java @@ -52,12 +52,13 @@ public class MQTTEndpoint extends DefaultEndpoint { private static final int PUBLISH_MAX_RECONNECT_ATTEMPTS = 3; private CallbackConnection connection; + private volatile boolean connected; + private final List<MQTTConsumer> consumers = new CopyOnWriteArrayList<MQTTConsumer>(); + @UriPath @Metadata(required = "true") private String name; @UriParam private final MQTTConfiguration configuration; - private volatile boolean connected; - private final List<MQTTConsumer> consumers = new CopyOnWriteArrayList<MQTTConsumer>(); public MQTTEndpoint(String uri, MQTTComponent component, MQTTConfiguration properties) { super(uri, component); @@ -84,6 +85,9 @@ public class MQTTEndpoint extends DefaultEndpoint { return name; } + /** + * A logical name to use which is not the topic name. + */ public void setName(String name) { this.name = name; }