CAMEL-6823: Fixed camel-stomp to allow configuring options from endpoint uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aeaeb3e5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aeaeb3e5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aeaeb3e5 Branch: refs/heads/camel-2.12.x Commit: aeaeb3e5dfdaf71c67929b4b4622f9b87e672b2e Parents: 0ecc2f1 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Oct 12 12:47:32 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Oct 12 12:47:54 2013 +0200 ---------------------------------------------------------------------- .../camel/component/stomp/StompComponent.java | 9 ++- .../component/stomp/StompConfiguration.java | 24 +++---- .../camel/component/stomp/StompEndpoint.java | 12 ++-- .../camel/component/stomp/StompBaseTest.java | 6 +- .../component/stomp/StompConsumerUriTest.java | 69 ++++++++++++++++++++ 5 files changed, 100 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java index 165c24e..a7ac94b 100644 --- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompComponent.java @@ -23,13 +23,18 @@ import org.apache.camel.impl.DefaultComponent; public class StompComponent extends DefaultComponent { - StompConfiguration configuration = new StompConfiguration(); + private StompConfiguration configuration = new StompConfiguration(); @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { String destination = "/" + remaining.replaceAll(":", "/"); - StompEndpoint endpoint = new StompEndpoint(uri, this, getConfiguration(), destination); + // must copy config so we do not have side effects + StompConfiguration config = getConfiguration().copy(); + // allow to configure configuration from uri parameters + setProperties(config, parameters); + + StompEndpoint endpoint = new StompEndpoint(uri, this, config, destination); setProperties(endpoint, parameters); return endpoint; } http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java index 664215b..091c2f7 100644 --- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompConfiguration.java @@ -16,15 +16,25 @@ */ package org.apache.camel.component.stomp; -import org.fusesource.stomp.client.Stomp; +import org.apache.camel.RuntimeCamelException; -public class StompConfiguration { +public class StompConfiguration implements Cloneable { private String brokerURL = "tcp://localhost:61613"; private String login; private String passcode; - private Stomp stomp; + /** + * Returns a copy of this configuration + */ + public StompConfiguration copy() { + try { + StompConfiguration copy = (StompConfiguration) clone(); + return copy; + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } public String getBrokerURL() { return brokerURL; @@ -50,12 +60,4 @@ public class StompConfiguration { this.passcode = passcode; } - public Stomp getStomp() throws Exception { - if (stomp == null) { - stomp = new Stomp(brokerURL); - stomp.setLogin(login); - stomp.setPasscode(passcode); - } - return stomp; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java index 623dea0..d2fbb0e 100644 --- a/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java +++ b/components/camel-stomp/src/main/java/org/apache/camel/component/stomp/StompEndpoint.java @@ -30,6 +30,7 @@ import org.fusesource.hawtdispatch.Task; import org.fusesource.stomp.client.Callback; import org.fusesource.stomp.client.CallbackConnection; import org.fusesource.stomp.client.Promise; +import org.fusesource.stomp.client.Stomp; import org.fusesource.stomp.codec.StompFrame; import static org.fusesource.hawtbuf.UTF8Buffer.utf8; @@ -45,6 +46,7 @@ public class StompEndpoint extends DefaultEndpoint { private CallbackConnection connection; private StompConfiguration configuration; private String destination; + private Stomp stomp; private final List<StompConsumer> consumers = new CopyOnWriteArrayList<StompConsumer>(); @@ -70,7 +72,10 @@ public class StompEndpoint extends DefaultEndpoint { protected void doStart() throws Exception { final Promise<CallbackConnection> promise = new Promise<CallbackConnection>(); - configuration.getStomp().connectCallback(promise); + stomp = new Stomp(configuration.getBrokerURL()); + stomp.setLogin(configuration.getLogin()); + stomp.setPasscode(configuration.getPasscode()); + stomp.connectCallback(promise); connection = promise.await(); @@ -125,11 +130,6 @@ public class StompEndpoint extends DefaultEndpoint { }); } - @Override - protected String createEndpointUri() { - return super.createEndpointUri(); - } - void addConsumer(final StompConsumer consumer) { connection.getDispatchQueue().execute(new Task() { @Override http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java index 052ab75..3193c86 100644 --- a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java +++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompBaseTest.java @@ -24,12 +24,16 @@ public abstract class StompBaseTest extends CamelTestSupport { protected BrokerService brokerService; protected int numberOfMessages = 100; + protected int getPort() { + return 61613; + } + @Override public void setUp() throws Exception { brokerService = new BrokerService(); brokerService.setPersistent(false); brokerService.setAdvisorySupport(false); - brokerService.addConnector("stomp://localhost:61613?trace=true"); + brokerService.addConnector("stomp://localhost:" + getPort() + "?trace=true"); brokerService.start(); brokerService.waitUntilStarted(); super.setUp(); http://git-wip-us.apache.org/repos/asf/camel/blob/aeaeb3e5/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java new file mode 100644 index 0000000..a87e943 --- /dev/null +++ b/components/camel-stomp/src/test/java/org/apache/camel/component/stomp/StompConsumerUriTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stomp; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.fusesource.stomp.client.BlockingConnection; +import org.fusesource.stomp.client.Stomp; +import org.fusesource.stomp.codec.StompFrame; +import org.junit.Test; + +import static org.fusesource.hawtbuf.UTF8Buffer.utf8; +import static org.fusesource.stomp.client.Constants.DESTINATION; +import static org.fusesource.stomp.client.Constants.MESSAGE_ID; +import static org.fusesource.stomp.client.Constants.SEND; + +public class StompConsumerUriTest extends StompBaseTest { + + @Override + protected int getPort() { + return 61614; + } + + @Test + public void testConsume() throws Exception { + Stomp stomp = new Stomp("tcp://localhost:" + getPort()); + final BlockingConnection producerConnection = stomp.connectBlocking(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(numberOfMessages); + + for (int i = 0; i < numberOfMessages; i++) { + StompFrame frame = new StompFrame(SEND); + frame.addHeader(DESTINATION, StompFrame.encodeHeader("/queue/test")); + frame.addHeader(MESSAGE_ID, StompFrame.encodeHeader("msg:" + i)); + frame.content(utf8("Important Message " + i)); + producerConnection.send(frame); + } + + mock.await(5, TimeUnit.SECONDS); + mock.assertIsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("stomp:queue:test?brokerURL=tcp://localhost:61614") + .transform(body().convertToString()) + .to("mock:result"); + } + }; + } +}