Repository: camel Updated Branches: refs/heads/camel-2.13.x 34242af15 -> a6d75c580
CAMEL-7706: Fixed camel-jgroups to disconnect shared JChannel correctly, added unit test to verify Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a6d75c58 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a6d75c58 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a6d75c58 Branch: refs/heads/camel-2.13.x Commit: a6d75c580073582bfbe432f3c700b17745358f6c Parents: 34242af Author: Dhiraj Bokde <dhira...@yahoo.com> Authored: Fri Aug 15 12:30:46 2014 -0700 Committer: Dhiraj Bokde <dhira...@yahoo.com> Committed: Mon Aug 18 07:35:41 2014 -0700 ---------------------------------------------------------------------- .../component/jgroups/JGroupsConsumer.java | 8 ++- .../component/jgroups/JGroupsEndpoint.java | 29 +++++++- .../component/jgroups/JGroupsProducer.java | 11 ++-- .../jgroups/JGroupsSharedChannelTest.java | 69 ++++++++++++++++++++ 4 files changed, 110 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java index ee6f4ba..a2f6815 100644 --- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java +++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java @@ -30,9 +30,12 @@ public class JGroupsConsumer extends DefaultConsumer { private final String clusterName; private final CamelJGroupsReceiver receiver; + private final JGroupsEndpoint endpoint; public JGroupsConsumer(JGroupsEndpoint endpoint, Processor processor, Channel channel, String clusterName) { super(endpoint, processor); + + this.endpoint = endpoint; this.channel = channel; this.clusterName = clusterName; @@ -44,13 +47,14 @@ public class JGroupsConsumer extends DefaultConsumer { super.doStart(); log.debug("Connecting receiver: {} to the cluster: {}.", receiver, clusterName); channel.setReceiver(receiver); - channel.connect(clusterName); + endpoint.connect(); } @Override protected void doStop() throws Exception { log.debug("Closing connection to cluster: {} from receiver: {}.", clusterName, receiver); - channel.disconnect(); + channel.setReceiver(null); + endpoint.disconnect(); super.doStop(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java index 23af26c..606a8de 100644 --- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java +++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.jgroups; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -26,6 +28,8 @@ import org.jgroups.Channel; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.View; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JGroupsEndpoint extends DefaultEndpoint { @@ -37,7 +41,10 @@ public class JGroupsEndpoint extends DefaultEndpoint { public static final String HEADER_JGROUPS_CHANNEL_ADDRESS = "JGROUPS_CHANNEL_ADDRESS"; + private static final Logger LOG = LoggerFactory.getLogger(JGroupsEndpoint.class); + private Channel channel; + private AtomicInteger connectCount = new AtomicInteger(0); private Channel resolvedChannel; @@ -103,6 +110,7 @@ public class JGroupsEndpoint extends DefaultEndpoint { @Override protected void doStop() throws Exception { + LOG.trace("Closing JGroups Channel {}", getEndpointUri()); resolvedChannel.close(); super.doStop(); } @@ -117,6 +125,26 @@ public class JGroupsEndpoint extends DefaultEndpoint { return new JChannel(); } + /** + * Connect shared channel, called by producer and consumer. + * @throws Exception + */ + public void connect() throws Exception { + connectCount.incrementAndGet(); + LOG.trace("Connecting JGroups Channel {}", getEndpointUri()); + resolvedChannel.connect(clusterName); + } + + /** + * Disconnect shared channel, called by producer and consumer. + */ + public void disconnect() { + if (connectCount.decrementAndGet() == 0) { + LOG.trace("Disconnecting JGroups Channel {}", getEndpointUri()); + resolvedChannel.disconnect(); + } + } + private boolean resolveEnableViewMessages() { if (enableViewMessages != null) { resolvedEnableViewMessages = enableViewMessages; @@ -171,5 +199,4 @@ public class JGroupsEndpoint extends DefaultEndpoint { public void setResolvedEnableViewMessages(boolean resolvedEnableViewMessages) { this.resolvedEnableViewMessages = resolvedEnableViewMessages; } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java index b134320..f49ef82 100644 --- a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java +++ b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.jgroups; -import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.jgroups.Address; @@ -30,14 +29,18 @@ public class JGroupsProducer extends DefaultProducer { // Producer settings + private final JGroupsEndpoint endpoint; + private final Channel channel; private final String clusterName; // Constructor - public JGroupsProducer(Endpoint endpoint, Channel channel, String clusterName) { + public JGroupsProducer(JGroupsEndpoint endpoint, Channel channel, String clusterName) { super(endpoint); + + this.endpoint = endpoint; this.channel = channel; this.clusterName = clusterName; } @@ -47,12 +50,12 @@ public class JGroupsProducer extends DefaultProducer { @Override protected void doStart() throws Exception { super.doStart(); - channel.connect(clusterName); + endpoint.connect(); } @Override protected void doStop() throws Exception { - channel.disconnect(); + endpoint.disconnect(); super.doStop(); } http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java b/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java new file mode 100644 index 0000000..3edb57f --- /dev/null +++ b/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jgroups; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Test for shared channel in JGroups endpoint + */ +public class JGroupsSharedChannelTest extends CamelTestSupport { + + private static final String JGROUPS_SHARED_CHANNEL = "jgroups:sharedChannel"; + private static final String DIRECT_PRODUCER = "direct:producer"; + private static final String MOCK_CONSUMER = "mock:consumer"; + private static final String PRODUCER_ROUTE = "producerRoute"; + private static final String CONSUMER_ROUTE = "consumerRoute"; + private static final String TEST_MESSAGE = "Test Message"; + + @Test + public void testStopStartProducer() throws Exception { + context().stopRoute(PRODUCER_ROUTE); + context().startRoute(PRODUCER_ROUTE); + testSendReceive(); + } + + @Test + public void testStopStartConsumer() throws Exception { + context().stopRoute(CONSUMER_ROUTE); + template().sendBody(DIRECT_PRODUCER, TEST_MESSAGE); + context().startRoute(CONSUMER_ROUTE); + testSendReceive(); + } + + private void testSendReceive() throws InterruptedException { + template().sendBody(DIRECT_PRODUCER, TEST_MESSAGE); + final MockEndpoint mockEndpoint = getMockEndpoint(MOCK_CONSUMER); + mockEndpoint.expectedMinimumMessageCount(1); + mockEndpoint.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(DIRECT_PRODUCER).routeId(PRODUCER_ROUTE).to(JGROUPS_SHARED_CHANNEL); + + from(JGROUPS_SHARED_CHANNEL).routeId(CONSUMER_ROUTE).to(MOCK_CONSUMER); + } + }; + } +}