Author: azeez Date: Fri Nov 18 18:00:55 2011 New Revision: 1203771 URL: http://svn.apache.org/viewvc?rev=1203771&view=rev Log: Introduced messging functionality at the Cluster level. Messages can be sent in RPC or non-RPC mode. This functionality is needed since there are usecases where one member needs to send custom messages to other members in the cluster.
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1203771&r1=1203770&r2=1203771&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Fri Nov 18 18:00:55 2011 @@ -22,12 +22,7 @@ package org.apache.axis2.clustering.trib import org.apache.axiom.om.OMAttribute; import org.apache.axiom.om.OMElement; import org.apache.axis2.AxisFault; -import org.apache.axis2.clustering.ClusteringAgent; -import org.apache.axis2.clustering.ClusteringConstants; -import org.apache.axis2.clustering.ClusteringFault; -import org.apache.axis2.clustering.MembershipListener; -import org.apache.axis2.clustering.MembershipScheme; -import org.apache.axis2.clustering.RequestBlockingHandler; +import org.apache.axis2.clustering.*; import org.apache.axis2.clustering.control.ControlCommand; import org.apache.axis2.clustering.control.GetConfigurationCommand; import org.apache.axis2.clustering.control.GetStateCommand; @@ -48,8 +43,10 @@ import org.apache.axis2.engine.DispatchP import org.apache.axis2.engine.Phase; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ErrorHandler; import org.apache.catalina.tribes.ManagedChannel; import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.Response; import org.apache.catalina.tribes.group.RpcChannel; @@ -83,7 +80,14 @@ public class TribesClusteringAgent imple private final HashMap<String, Parameter> parameters; private ManagedChannel channel; + /** + * RpcChannel used for cluster initialization interactions + */ private RpcChannel rpcInitChannel; + /** + * RpcChannel used for RPC messaging interactions + */ + private RpcChannel rpcMessagingChannel; private ConfigurationContext configurationContext; private Axis2ChannelListener axis2ChannelListener; private ChannelSender channelSender; @@ -104,6 +108,7 @@ public class TribesClusteringAgent imple private final Map<String, GroupManagementAgent> groupManagementAgents = new HashMap<String, GroupManagementAgent>(); private boolean clusterManagementMode; + private RpcMessagingHandler rpcMessagingHandler; public TribesClusteringAgent() { parameters = new HashMap<String, Parameter>(); @@ -166,10 +171,19 @@ public class TribesClusteringAgent imple // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair rpcInitRequestHandler = new RpcInitializationRequestHandler(configurationContext); rpcInitChannel = - new RpcChannel(TribesUtil.getRpcInitChannelId(domain), - channel, rpcInitRequestHandler); + new RpcChannel(TribesUtil.getRpcInitChannelId(domain), channel, + rpcInitRequestHandler); if (log.isDebugEnabled()) { - log.debug("Created RPC Channel for domain " + new String(domain)); + log.debug("Created RPC Init Channel for domain " + new String(domain)); + } + + // Initialize RpcChannel used for messaging + rpcMessagingHandler = new RpcMessagingHandler(configurationContext); + rpcMessagingChannel = + new RpcChannel(TribesUtil.getRpcMessagingChannelId(domain), channel, + rpcMessagingHandler); + if (log.isDebugEnabled()) { + log.debug("Created RPC Messaging Channel for domain " + new String(domain)); } setMaximumRetries(); @@ -231,6 +245,48 @@ public class TribesClusteringAgent imple } } + public List<ClusteringCommand> sendMessage(ClusteringMessage message, + boolean isRpcMessage) throws ClusteringFault { + List<ClusteringCommand> responseList = new ArrayList<ClusteringCommand>(); + Member[] members = primaryMembershipManager.getMembers(); + if (members.length == 0) { + return responseList; + } + if (isRpcMessage) { + try { + Response[] responses = rpcMessagingChannel.send(members, message, RpcChannel.ALL_REPLY, + Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, + 10000); + for (Response response : responses) { + responseList.add((ClusteringCommand)response.getMessage()); + } + } catch (ChannelException e) { + String msg = "Error occurred while sending RPC message to cluster."; + log.error(msg, e); + throw new ClusteringFault(msg, e); + } + } else { + try { + channel.send(members, message, 10000, new ErrorHandler(){ + public void handleError(ChannelException e, UniqueId uniqueId) { + log.error("Sending failed " + uniqueId, e ); + } + + public void handleCompletion(UniqueId uniqueId) { + if(log.isDebugEnabled()){ + log.debug("Sending successful " + uniqueId); + } + } + }); + } catch (ChannelException e) { + String msg = "Error occurred while sending message to cluster."; + log.error(msg, e); + throw new ClusteringFault(msg, e); + } + } + return responseList; + } + private void setMemberInfo() throws ClusteringFault { Properties memberInfo = new Properties(); AxisConfiguration axisConfig = configurationContext.getAxisConfiguration(); @@ -602,7 +658,7 @@ public class TribesClusteringAgent imple * Get some information from a neighbour. This information will be used by this node to * initialize itself * <p/> - * rpcChannel is The utility for sending RPC style messages to the channel + * rpcInitChannel is The utility for sending RPC style messages to the channel * * @param command The control command to send * @throws ClusteringFault If initialization code failed on this node @@ -717,6 +773,7 @@ public class TribesClusteringAgent imple if (channel != null) { try { channel.removeChannelListener(rpcInitChannel); + channel.removeChannelListener(rpcMessagingChannel); channel.removeChannelListener(axis2ChannelListener); channel.stop(Channel.DEFAULT); } catch (ChannelException e) { @@ -736,6 +793,9 @@ public class TribesClusteringAgent imple if (rpcInitRequestHandler != null) { rpcInitRequestHandler.setConfigurationContext(configurationContext); } + if (rpcMessagingHandler!= null) { + rpcMessagingHandler.setConfigurationContext(configurationContext); + } if (axis2ChannelListener != null) { axis2ChannelListener.setConfigurationContext(configurationContext); } Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=1203771&r1=1203770&r2=1203771&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java Fri Nov 18 18:00:55 2011 @@ -26,6 +26,11 @@ public final class TribesConstants { public static final String RPC_INIT_CHANNEL = "rpc.init.channel"; /** + * The ID of the RPC messaging channel + */ + public static final String RPC_MESSAGING_CHANNEL = "rpc.msg.channel"; + + /** * The ID of the RPC membership message channel. This channel is only used when WKA * membership discovery mechanism is used */ Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=1203771&r1=1203770&r2=1203771&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Fri Nov 18 18:00:55 2011 @@ -101,6 +101,10 @@ public class TribesUtil { return (new String(domain) + ":" + TribesConstants.RPC_INIT_CHANNEL).getBytes(); } + public static byte[] getRpcMessagingChannelId(byte[] domain) { + return (new String(domain) + ":" + TribesConstants.RPC_MESSAGING_CHANNEL).getBytes(); + } + public static boolean isInDomain(Member member, byte[] domain) { return Arrays.equals(domain, member.getDomain()); } Modified: axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java?rev=1203771&r1=1203770&r2=1203771&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java (original) +++ axis/axis2/java/core/trunk/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java Fri Nov 18 18:00:55 2011 @@ -184,4 +184,15 @@ public interface ClusteringAgent extends * @return the domains of this ClusteringAgent */ Set<String> getDomains(); + + + /** + * Send a message to all members in this member's primary cluster + * + * @param msg The message to be sent + * @param isRpcMessage Indicates whether the message has to be sent in RPC mode + * @return A list of responses if the message is sent in RPC mode + * @throws ClusteringFault If an error occurs while sending the message + */ + List<ClusteringCommand> sendMessage(ClusteringMessage msg, boolean isRpcMessage) throws ClusteringFault; } \ No newline at end of file