Author: azeez
Date: Fri Nov 18 18:00:55 2011
New Revision: 1203771

URL: http://svn.apache.org/viewvc?rev=1203771&view=rev
Log:
Introduced messging functionality at the Cluster level. Messages can be sent in 
RPC or non-RPC mode. This functionality is needed since there are usecases 
where one member needs to send custom messages to other members in the cluster.

Modified:
    
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
    
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
    
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
    
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java

Modified: 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
URL: 
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
--- 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
 (original)
+++ 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
 Fri Nov 18 18:00:55 2011
@@ -22,12 +22,7 @@ package org.apache.axis2.clustering.trib
 import org.apache.axiom.om.OMAttribute;
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.clustering.ClusteringAgent;
-import org.apache.axis2.clustering.ClusteringConstants;
-import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.MembershipListener;
-import org.apache.axis2.clustering.MembershipScheme;
-import org.apache.axis2.clustering.RequestBlockingHandler;
+import org.apache.axis2.clustering.*;
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
@@ -48,8 +43,10 @@ import org.apache.axis2.engine.DispatchP
 import org.apache.axis2.engine.Phase;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ErrorHandler;
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.UniqueId;
 import org.apache.catalina.tribes.group.GroupChannel;
 import org.apache.catalina.tribes.group.Response;
 import org.apache.catalina.tribes.group.RpcChannel;
@@ -83,7 +80,14 @@ public class TribesClusteringAgent imple
 
     private final HashMap<String, Parameter> parameters;
     private ManagedChannel channel;
+    /**
+     * RpcChannel used for cluster initialization interactions
+     */
     private RpcChannel rpcInitChannel;
+    /**
+     * RpcChannel used for RPC messaging interactions
+     */
+    private RpcChannel rpcMessagingChannel;
     private ConfigurationContext configurationContext;
     private Axis2ChannelListener axis2ChannelListener;
     private ChannelSender channelSender;
@@ -104,6 +108,7 @@ public class TribesClusteringAgent imple
     private final Map<String, GroupManagementAgent> groupManagementAgents =
             new HashMap<String, GroupManagementAgent>();
     private boolean clusterManagementMode;
+    private RpcMessagingHandler rpcMessagingHandler;
 
     public TribesClusteringAgent() {
         parameters = new HashMap<String, Parameter>();
@@ -166,10 +171,19 @@ public class TribesClusteringAgent imple
         // picks it up. Each RPC is given a UUID, hence can correlate the 
request-response pair
         rpcInitRequestHandler = new 
RpcInitializationRequestHandler(configurationContext);
         rpcInitChannel =
-                new RpcChannel(TribesUtil.getRpcInitChannelId(domain),
-                               channel, rpcInitRequestHandler);
+                new RpcChannel(TribesUtil.getRpcInitChannelId(domain), channel,
+                               rpcInitRequestHandler);
         if (log.isDebugEnabled()) {
-            log.debug("Created RPC Channel for domain " + new String(domain));
+            log.debug("Created RPC Init Channel for domain " + new 
String(domain));
+        }
+
+        // Initialize RpcChannel used for messaging
+        rpcMessagingHandler = new RpcMessagingHandler(configurationContext);
+        rpcMessagingChannel =
+                new RpcChannel(TribesUtil.getRpcMessagingChannelId(domain), 
channel,
+                               rpcMessagingHandler);
+        if (log.isDebugEnabled()) {
+            log.debug("Created RPC Messaging Channel for domain " + new 
String(domain));
         }
 
         setMaximumRetries();
@@ -231,6 +245,48 @@ public class TribesClusteringAgent imple
         }
     }
 
+    public List<ClusteringCommand> sendMessage(ClusteringMessage message,
+                                               boolean isRpcMessage) throws 
ClusteringFault {
+        List<ClusteringCommand> responseList = new 
ArrayList<ClusteringCommand>();
+        Member[] members = primaryMembershipManager.getMembers();
+        if (members.length == 0) {
+            return responseList;
+        }
+        if (isRpcMessage) {
+            try {
+                Response[] responses = rpcMessagingChannel.send(members, 
message, RpcChannel.ALL_REPLY,
+                                                                
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK,
+                                                                10000);
+                for (Response response : responses) {
+                    responseList.add((ClusteringCommand)response.getMessage());
+                }
+            } catch (ChannelException e) {
+                String msg = "Error occurred while sending RPC message to 
cluster.";
+                log.error(msg, e);
+                throw new ClusteringFault(msg, e);
+            }
+        } else {
+            try {
+                channel.send(members, message, 10000, new ErrorHandler(){
+                    public void handleError(ChannelException e, UniqueId 
uniqueId) {
+                        log.error("Sending failed " + uniqueId, e );
+                    }
+
+                    public void handleCompletion(UniqueId uniqueId) {
+                        if(log.isDebugEnabled()){
+                            log.debug("Sending successful " + uniqueId);
+                        }
+                    }
+                });
+            } catch (ChannelException e) {
+                String msg = "Error occurred while sending message to 
cluster.";
+                log.error(msg, e);
+                throw new ClusteringFault(msg, e);
+            }
+        }
+        return responseList;
+    }
+
     private void setMemberInfo() throws ClusteringFault {
         Properties memberInfo = new Properties();
         AxisConfiguration axisConfig = 
configurationContext.getAxisConfiguration();
@@ -602,7 +658,7 @@ public class TribesClusteringAgent imple
      * Get some information from a neighbour. This information will be used by 
this node to
      * initialize itself
      * <p/>
-     * rpcChannel is The utility for sending RPC style messages to the channel
+     * rpcInitChannel is The utility for sending RPC style messages to the 
channel
      *
      * @param command The control command to send
      * @throws ClusteringFault If initialization code failed on this node
@@ -717,6 +773,7 @@ public class TribesClusteringAgent imple
         if (channel != null) {
             try {
                 channel.removeChannelListener(rpcInitChannel);
+                channel.removeChannelListener(rpcMessagingChannel);
                 channel.removeChannelListener(axis2ChannelListener);
                 channel.stop(Channel.DEFAULT);
             } catch (ChannelException e) {
@@ -736,6 +793,9 @@ public class TribesClusteringAgent imple
         if (rpcInitRequestHandler != null) {
             
rpcInitRequestHandler.setConfigurationContext(configurationContext);
         }
+        if (rpcMessagingHandler!= null) {
+            rpcMessagingHandler.setConfigurationContext(configurationContext);
+        }
         if (axis2ChannelListener != null) {
             axis2ChannelListener.setConfigurationContext(configurationContext);
         }

Modified: 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL: 
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
--- 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
 (original)
+++ 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
 Fri Nov 18 18:00:55 2011
@@ -26,6 +26,11 @@ public final class TribesConstants {
     public static final String RPC_INIT_CHANNEL = "rpc.init.channel";
 
     /**
+     * The ID of the RPC messaging channel
+     */
+    public static final String RPC_MESSAGING_CHANNEL = "rpc.msg.channel";
+
+    /**
      * The ID of the RPC membership message channel. This channel is only used 
when WKA
      * membership discovery mechanism is used
      */

Modified: 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: 
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
--- 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
 (original)
+++ 
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
 Fri Nov 18 18:00:55 2011
@@ -101,6 +101,10 @@ public class TribesUtil {
         return (new String(domain) + ":" + 
TribesConstants.RPC_INIT_CHANNEL).getBytes();
     }
 
+    public static byte[] getRpcMessagingChannelId(byte[] domain) {
+        return (new String(domain) + ":" + 
TribesConstants.RPC_MESSAGING_CHANNEL).getBytes();
+    }
+
     public static boolean isInDomain(Member member, byte[] domain) {
         return Arrays.equals(domain, member.getDomain());
     }

Modified: 
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
URL: 
http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java?rev=1203771&r1=1203770&r2=1203771&view=diff
==============================================================================
--- 
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
 (original)
+++ 
axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
 Fri Nov 18 18:00:55 2011
@@ -184,4 +184,15 @@ public interface ClusteringAgent extends
      * @return the domains of this ClusteringAgent
      */
     Set<String> getDomains();
+
+
+    /**
+     * Send a message to all members in this member's primary cluster
+     *
+     * @param msg  The message to be sent
+     * @param isRpcMessage Indicates whether the message has to be sent in RPC 
mode
+     * @return A list of responses if the message is sent in RPC mode
+     * @throws ClusteringFault If an error occurs while sending the message
+     */
+    List<ClusteringCommand> sendMessage(ClusteringMessage msg, boolean 
isRpcMessage) throws ClusteringFault;
 }
\ No newline at end of file


Reply via email to