Repository: camel Updated Branches: refs/heads/master f01ab0513 -> f88d6dda3
Created Paho component. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f88d6dda Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f88d6dda Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f88d6dda Branch: refs/heads/master Commit: f88d6dda3fcb7f2d948de280dfa2353b06585f8e Parents: f01ab05 Author: Henryk Konsek <hekon...@gmail.com> Authored: Sat Mar 14 20:45:43 2015 +0100 Committer: Henryk Konsek <hekon...@gmail.com> Committed: Sat Mar 14 20:45:43 2015 +0100 ---------------------------------------------------------------------- components/camel-paho/pom.xml | 78 +++++++++ .../camel/component/paho/PahoComponent.java | 33 ++++ .../camel/component/paho/PahoConsumer.java | 78 +++++++++ .../camel/component/paho/PahoEndpoint.java | 169 +++++++++++++++++++ .../camel/component/paho/PahoPersistence.java | 23 +++ .../camel/component/paho/PahoProducer.java | 47 ++++++ .../services/org/apache/camel/component/paho | 1 + .../camel/component/paho/PahoComponentTest.java | 121 +++++++++++++ components/pom.xml | 1 + 9 files changed, 551 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-paho/pom.xml b/components/camel-paho/pom.xml new file mode 100644 index 0000000..7245efc --- /dev/null +++ b/components/camel-paho/pom.xml @@ -0,0 +1,78 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.16-SNAPSHOT</version> + </parent> + + <artifactId>camel-paho</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Paho</name> + <description>Camel Eclipse Paho support</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.paho.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=paho</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> + <version>1.0.2</version> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-mqtt</artifactId> + <version>${activemq-version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <repositories> + <repository> + <id>eclipse-paho</id> + <url>https://repo.eclipse.org/content/repositories/paho-releases</url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java new file mode 100644 index 0000000..98e80b0 --- /dev/null +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoComponent.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.impl.DefaultComponent; + +public class PahoComponent extends DefaultComponent { + + @Override + protected Endpoint createEndpoint(String uri, String s1, Map<String, Object> options) throws Exception { + PahoEndpoint pahoEndpoint = new PahoEndpoint(uri, this); + setProperties(pahoEndpoint, options); + return pahoEndpoint; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java new file mode 100644 index 0000000..c4d2386 --- /dev/null +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.ExchangeBuilder; +import org.apache.camel.impl.DefaultConsumer; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class PahoConsumer extends DefaultConsumer { + + public PahoConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + String topic = getEndpoint().getTopic(); + getEndpoint().getClient().subscribe(topic); + getEndpoint().getClient().setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + Exchange exchange = ExchangeBuilder.anExchange(getEndpoint().getCamelContext()).withBody(message.getPayload()).build(); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + + } + }); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + + } + }); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (getEndpoint().getClient().isConnected()) { + String topic = getEndpoint().getTopic(); + getEndpoint().getClient().unsubscribe(topic); + } + } + + @Override + public PahoEndpoint getEndpoint() { + return (PahoEndpoint) super.getEndpoint(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java new file mode 100644 index 0000000..b1380dd --- /dev/null +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import static java.lang.System.nanoTime; + +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import static org.apache.camel.component.paho.PahoPersistence.MEMORY; + +public class PahoEndpoint extends DefaultEndpoint { + + // Configuration members + + private String clientId = "camel-" + nanoTime(); + + private String brokerUrl = "tcp://localhost:1883"; + + private String topic; + + private int qos = 2; + + private PahoPersistence persistence = MEMORY; + + // Collaboration members + + private MqttConnectOptions connectOptions; + + // Auto-configuration members + + private MqttClient client; + + public PahoEndpoint(String uri, Component component) { + super(uri, component); + if (topic == null) { + topic = uri.substring(7); + } + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + client = new MqttClient(getBrokerUrl(), getClientId(), resolvePersistence()); + client.connect(resolveMqttConnectOptions()); + } + + @Override + protected void doStop() throws Exception { + if (getClient().isConnected()) { + getClient().disconnect(); + } + super.doStop(); + } + + @Override + public Producer createProducer() throws Exception { + return new PahoProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new PahoConsumer(this, processor); + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public PahoComponent getComponent() { + return (PahoComponent) super.getComponent(); + } + + // Resolvers + + protected MqttClientPersistence resolvePersistence() { + return persistence == MEMORY ? new MemoryPersistence() : new MqttDefaultFilePersistence(); + } + + protected MqttConnectOptions resolveMqttConnectOptions() { + if (connectOptions != null) { + return connectOptions; + } + return new MqttConnectOptions(); + } + + // Configuration getters & setters + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getBrokerUrl() { + return brokerUrl; + } + + public void setBrokerUrl(String brokerUrl) { + this.brokerUrl = brokerUrl; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getQos() { + return qos; + } + + public void setQos(int qos) { + this.qos = qos; + } + + // Auto-configuration getters & setters + + public PahoPersistence getPersistence() { + return persistence; + } + + public void setPersistence(PahoPersistence persistence) { + this.persistence = persistence; + } + + public MqttClient getClient() { + return client; + } + + public void setClient(MqttClient client) { + this.client = client; + } + + public MqttConnectOptions getConnectOptions() { + return connectOptions; + } + + public void setConnectOptions(MqttConnectOptions connOpts) { + this.connectOptions = connOpts; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoPersistence.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoPersistence.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoPersistence.java new file mode 100644 index 0000000..0cd8603 --- /dev/null +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoPersistence.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +public enum PahoPersistence { + + FILE, MEMORY + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java new file mode 100644 index 0000000..b585831 --- /dev/null +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class PahoProducer extends DefaultProducer { + + public PahoProducer(PahoEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + MqttClient client = getEndpoint().getClient(); + String topic = getEndpoint().getTopic(); + int qos = getEndpoint().getQos(); + byte[] payload = exchange.getIn().getBody(byte[].class); + + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + client.publish(topic, message); + } + + @Override + public PahoEndpoint getEndpoint() { + return (PahoEndpoint) super.getEndpoint(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/src/main/resources/META-INF/services/org/apache/camel/component/paho ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/main/resources/META-INF/services/org/apache/camel/component/paho b/components/camel-paho/src/main/resources/META-INF/services/org/apache/camel/component/paho new file mode 100644 index 0000000..2b88b5a --- /dev/null +++ b/components/camel-paho/src/main/resources/META-INF/services/org/apache/camel/component/paho @@ -0,0 +1 @@ +class=org.apache.camel.component.paho.PahoComponent \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java new file mode 100644 index 0000000..07a5d5c --- /dev/null +++ b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoComponentTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.paho; + +import org.apache.activemq.broker.BrokerService; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.junit.Test; + +public class PahoComponentTest extends CamelTestSupport { + + MqttConnectOptions connectOptions = new MqttConnectOptions(); + + @EndpointInject(uri = "mock:test") + MockEndpoint mock; + + BrokerService broker; + + int mqttPort = AvailablePortFinder.getNextAvailable(); + + @Override + protected boolean useJmx() { + return false; + } + + @Override + public void doPreSetup() throws Exception { + super.doPreSetup(); + broker = new BrokerService(); + broker.setPersistent(false); + broker.addConnector("mqtt://localhost:" + mqttPort); + broker.start(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + broker.stop(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:test").to("paho:queue?brokerUrl=tcp://localhost:" + mqttPort); + + from("paho:queue?brokerUrl=tcp://localhost:" + mqttPort).to("mock:test"); + + from("paho:persistenceTest?persistence=FILE&brokerUrl=tcp://localhost:" + mqttPort).to("mock:persistenceTest"); + + from("direct:connectOptions").to("paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort); + } + }; + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("connectOptions", connectOptions); + return registry; + } + + // Tests + + @Test + public void shouldReadMessageFromMqtt() throws InterruptedException { + // Given + String msg = "msg"; + mock.expectedBodiesReceived(msg); + + // When + template.sendBody("mock:test", msg); + + // Then + mock.assertIsSatisfied(); + } + + @Test + public void shouldNotReadMessageFromUnregisteredTopic() throws InterruptedException { + // Given + mock.expectedMessageCount(0); + + // When + template.sendBody("paho:someRandomQueue?brokerUrl=tcp://localhost:" + mqttPort, "msg"); + + // Then + mock.assertIsSatisfied(); + } + + @Test + public void shouldUseConnectionOptionsFromRegistry() { + // Given + PahoEndpoint pahoWithConnectOptionsFromRegistry = getMandatoryEndpoint( + "paho:registryConnectOptions?connectOptions=#connectOptions&brokerUrl=tcp://localhost:" + mqttPort, + PahoEndpoint.class); + + // Then + assertSame(connectOptions, pahoWithConnectOptionsFromRegistry.resolveMqttConnectOptions()); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/f88d6dda/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index 5c808dd..e5bc573 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -170,6 +170,7 @@ <module>camel-olingo2</module> <module>camel-openshift</module> <module>camel-optaplanner</module> + <module>camel-paho</module> <module>camel-paxlogging</module> <module>camel-pgevent</module> <module>camel-printer</module>