Repository: camel
Updated Branches:
  refs/heads/camel-2.13.x 34242af15 -> a6d75c580


CAMEL-7706: Fixed camel-jgroups to disconnect shared JChannel correctly, added 
unit test to verify


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a6d75c58
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a6d75c58
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a6d75c58

Branch: refs/heads/camel-2.13.x
Commit: a6d75c580073582bfbe432f3c700b17745358f6c
Parents: 34242af
Author: Dhiraj Bokde <dhira...@yahoo.com>
Authored: Fri Aug 15 12:30:46 2014 -0700
Committer: Dhiraj Bokde <dhira...@yahoo.com>
Committed: Mon Aug 18 07:35:41 2014 -0700

----------------------------------------------------------------------
 .../component/jgroups/JGroupsConsumer.java      |  8 ++-
 .../component/jgroups/JGroupsEndpoint.java      | 29 +++++++-
 .../component/jgroups/JGroupsProducer.java      | 11 ++--
 .../jgroups/JGroupsSharedChannelTest.java       | 69 ++++++++++++++++++++
 4 files changed, 110 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
index ee6f4ba..a2f6815 100644
--- 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
+++ 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
@@ -30,9 +30,12 @@ public class JGroupsConsumer extends DefaultConsumer {
     private final String clusterName;
 
     private final CamelJGroupsReceiver receiver;
+    private final JGroupsEndpoint endpoint;
 
     public JGroupsConsumer(JGroupsEndpoint endpoint, Processor processor, 
Channel channel, String clusterName) {
         super(endpoint, processor);
+
+        this.endpoint = endpoint;
         this.channel = channel;
         this.clusterName = clusterName;
 
@@ -44,13 +47,14 @@ public class JGroupsConsumer extends DefaultConsumer {
         super.doStart();
         log.debug("Connecting receiver: {} to the cluster: {}.", receiver, 
clusterName);
         channel.setReceiver(receiver);
-        channel.connect(clusterName);
+        endpoint.connect();
     }
 
     @Override
     protected void doStop() throws Exception {
         log.debug("Closing connection to cluster: {} from receiver: {}.", 
clusterName, receiver);
-        channel.disconnect();
+        channel.setReceiver(null);
+        endpoint.disconnect();
         super.doStop();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
index 23af26c..606a8de 100644
--- 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
+++ 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsEndpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.jgroups;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -26,6 +28,8 @@ import org.jgroups.Channel;
 import org.jgroups.JChannel;
 import org.jgroups.Message;
 import org.jgroups.View;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JGroupsEndpoint extends DefaultEndpoint {
 
@@ -37,7 +41,10 @@ public class JGroupsEndpoint extends DefaultEndpoint {
 
     public static final String HEADER_JGROUPS_CHANNEL_ADDRESS = 
"JGROUPS_CHANNEL_ADDRESS";
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JGroupsEndpoint.class);
+
     private Channel channel;
+    private AtomicInteger connectCount = new AtomicInteger(0);
 
     private Channel resolvedChannel;
 
@@ -103,6 +110,7 @@ public class JGroupsEndpoint extends DefaultEndpoint {
 
     @Override
     protected void doStop() throws Exception {
+        LOG.trace("Closing JGroups Channel {}", getEndpointUri());
         resolvedChannel.close();
         super.doStop();
     }
@@ -117,6 +125,26 @@ public class JGroupsEndpoint extends DefaultEndpoint {
         return new JChannel();
     }
 
+    /**
+     * Connect shared channel, called by producer and consumer.
+     * @throws Exception
+     */
+    public void connect() throws Exception {
+        connectCount.incrementAndGet();
+        LOG.trace("Connecting JGroups Channel {}", getEndpointUri());
+        resolvedChannel.connect(clusterName);
+    }
+
+    /**
+     * Disconnect shared channel, called by producer and consumer.
+     */
+    public void disconnect() {
+        if (connectCount.decrementAndGet() == 0) {
+            LOG.trace("Disconnecting JGroups Channel {}", getEndpointUri());
+            resolvedChannel.disconnect();
+        }
+    }
+
     private boolean resolveEnableViewMessages() {
         if (enableViewMessages != null) {
             resolvedEnableViewMessages = enableViewMessages;
@@ -171,5 +199,4 @@ public class JGroupsEndpoint extends DefaultEndpoint {
     public void setResolvedEnableViewMessages(boolean 
resolvedEnableViewMessages) {
         this.resolvedEnableViewMessages = resolvedEnableViewMessages;
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
index b134320..f49ef82 100644
--- 
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
+++ 
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsProducer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.jgroups;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
 import org.jgroups.Address;
@@ -30,14 +29,18 @@ public class JGroupsProducer extends DefaultProducer {
 
     // Producer settings
 
+    private final JGroupsEndpoint endpoint;
+
     private final Channel channel;
 
     private final String clusterName;
 
     // Constructor
 
-    public JGroupsProducer(Endpoint endpoint, Channel channel, String 
clusterName) {
+    public JGroupsProducer(JGroupsEndpoint endpoint, Channel channel, String 
clusterName) {
         super(endpoint);
+
+        this.endpoint = endpoint;
         this.channel = channel;
         this.clusterName = clusterName;
     }
@@ -47,12 +50,12 @@ public class JGroupsProducer extends DefaultProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        channel.connect(clusterName);
+        endpoint.connect();
     }
 
     @Override
     protected void doStop() throws Exception {
-        channel.disconnect();
+        endpoint.disconnect();
         super.doStop();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a6d75c58/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java
 
b/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java
new file mode 100644
index 0000000..3edb57f
--- /dev/null
+++ 
b/components/camel-jgroups/src/test/java/org/apache/camel/component/jgroups/JGroupsSharedChannelTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jgroups;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Test for shared channel in JGroups endpoint
+ */
+public class JGroupsSharedChannelTest extends CamelTestSupport {
+
+    private static final String JGROUPS_SHARED_CHANNEL = 
"jgroups:sharedChannel";
+    private static final String DIRECT_PRODUCER = "direct:producer";
+    private static final String MOCK_CONSUMER = "mock:consumer";
+    private static final String PRODUCER_ROUTE = "producerRoute";
+    private static final String CONSUMER_ROUTE = "consumerRoute";
+    private static final String TEST_MESSAGE = "Test Message";
+
+    @Test
+    public void testStopStartProducer() throws Exception {
+        context().stopRoute(PRODUCER_ROUTE);
+        context().startRoute(PRODUCER_ROUTE);
+        testSendReceive();
+    }
+
+    @Test
+    public void testStopStartConsumer() throws Exception {
+        context().stopRoute(CONSUMER_ROUTE);
+        template().sendBody(DIRECT_PRODUCER, TEST_MESSAGE);
+        context().startRoute(CONSUMER_ROUTE);
+        testSendReceive();
+    }
+
+    private void testSendReceive() throws InterruptedException {
+        template().sendBody(DIRECT_PRODUCER, TEST_MESSAGE);
+        final MockEndpoint mockEndpoint = getMockEndpoint(MOCK_CONSUMER);
+        mockEndpoint.expectedMinimumMessageCount(1);
+        mockEndpoint.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from(DIRECT_PRODUCER).routeId(PRODUCER_ROUTE).to(JGROUPS_SHARED_CHANNEL);
+
+                
from(JGROUPS_SHARED_CHANNEL).routeId(CONSUMER_ROUTE).to(MOCK_CONSUMER);
+            }
+        };
+    }
+}

Reply via email to