Author: azeez Date: Tue Feb 14 11:23:12 2012 New Revision: 1243880 URL: http://svn.apache.org/viewvc?rev=1243880&view=rev Log: This fixes the issue of state replication failing if the classes of the replicated objects are contained within service archive & module archive files. We had to take control of the Tribes message deserialization mechanism at the channel level so that we can set the relevant classloaders before deserialization.
Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java?rev=1243880&r1=1243879&r2=1243880&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java Tue Feb 14 11:23:12 2012 @@ -27,9 +27,6 @@ import org.apache.axis2.clustering.manag import org.apache.axis2.clustering.state.DefaultStateManager; import org.apache.axis2.clustering.state.StateClusteringCommand; import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.description.AxisModule; -import org.apache.axis2.description.AxisServiceGroup; -import org.apache.axis2.engine.AxisConfiguration; import org.apache.catalina.tribes.ByteMessage; import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Member; @@ -40,9 +37,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; /** * This is the Tribes channel listener which is used for listening on the channels, receiving @@ -93,23 +87,11 @@ public class Axis2ChannelListener implem */ public void messageReceived(Serializable msg, Member sender) { try { - AxisConfiguration configuration = configurationContext.getAxisConfiguration(); - List<ClassLoader> classLoaders = new ArrayList<ClassLoader>(); - classLoaders.add(configuration.getSystemClassLoader()); - classLoaders.add(getClass().getClassLoader()); - for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) { - AxisServiceGroup group = (AxisServiceGroup) iter.next(); - classLoaders.add(group.getServiceGroupClassLoader()); - } - for(Object obj: configuration.getModules().values()){ - AxisModule module = (AxisModule) obj; - classLoaders.add(module.getModuleClassLoader()); - } byte[] message = ((ByteMessage) msg).getMessage(); msg = XByteBuffer.deserialize(message, 0, message.length, - classLoaders.toArray(new ClassLoader[classLoaders.size()])); + ClassLoaderUtil.getClassLoaders(configurationContext)); } catch (Exception e) { String errMsg = "Cannot deserialize received message"; log.error(errMsg, e); Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java?rev=1243880&view=auto ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java (added) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java Tue Feb 14 11:23:12 2012 @@ -0,0 +1,105 @@ +/* +* Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +* +* WSO2 Inc. licenses this file to you under the Apache License, +* Version 2.0 (the "License"); you may not use this file except +* in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.axis2.clustering.tribes; + +import org.apache.axis2.context.ConfigurationContext; +import org.apache.catalina.tribes.ByteMessage; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.RemoteProcessException; +import org.apache.catalina.tribes.UniqueId; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.RpcChannel; +import org.apache.catalina.tribes.group.RpcMessage; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.util.Logs; + +import java.io.Serializable; + +/** + * Represents a Tribes GroupChannel. The difference between + * org.apache.catalina.tribes.group.GroupChannel & this class is that the proper classloaders + * are set before message deserialization + */ +public class Axis2GroupChannel extends GroupChannel{ + + private ConfigurationContext configurationContext; + + public Axis2GroupChannel(ConfigurationContext configurationContext) { + this.configurationContext = configurationContext; + } + + @Override + public void messageReceived(ChannelMessage msg) { + if ( msg == null ) return; + try { + if ( Logs.MESSAGES.isTraceEnabled() ) { + Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ + " from "+msg.getAddress().getName()); + } + + Serializable fwd; + if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) { + fwd = new ByteMessage(msg.getMessage().getBytes()); + } else { + try { + fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, + msg.getMessage().getLength(), + ClassLoaderUtil.getClassLoaders(configurationContext)); + }catch (Exception sx) { + log.error("Unable to deserialize message:"+msg,sx); + return; + } + } + if ( Logs.MESSAGES.isTraceEnabled() ) { + Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd); + } + + //get the actual member with the correct alive time + Member source = msg.getAddress(); + boolean rx = false; + boolean delivered = false; + for (Object channelListener1 : channelListeners) { + ChannelListener channelListener = (ChannelListener) channelListener1; + if (channelListener != null && channelListener.accept(fwd, source)) { + channelListener.messageReceived(fwd, source); + delivered = true; + //if the message was accepted by an RPC channel, that channel + //is responsible for returning the reply, otherwise we send an absence reply + if (channelListener instanceof RpcChannel) rx = true; + } + }//for + if ((!rx) && (fwd instanceof RpcMessage)) { + //if we have a message that requires a response, + //but none was given, send back an immediate one + sendNoRpcChannelReply((RpcMessage)fwd,source); + } + if ( Logs.MESSAGES.isTraceEnabled() ) { + Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId())); + } + + } catch ( Exception x ) { + //this could be the channel listener throwing an exception, we should log it + //as a warning. + if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x); + throw new RemoteProcessException("Exception:"+x.getMessage(),x); + } + } +} Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=1243880&r1=1243879&r2=1243880&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Tue Feb 14 11:23:12 2012 @@ -65,6 +65,7 @@ public class ChannelSender implements Me channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | + Channel.SEND_OPTIONS_BYTE_MESSAGE | TribesConstants.MSG_ORDER_OPTION | TribesConstants.AT_MOST_ONCE_OPTION | additionalOptions); @@ -72,6 +73,7 @@ public class ChannelSender implements Me channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_ASYNCHRONOUS | TribesConstants.MSG_ORDER_OPTION | + Channel.SEND_OPTIONS_BYTE_MESSAGE | TribesConstants.AT_MOST_ONCE_OPTION | additionalOptions); } @@ -119,7 +121,8 @@ public class ChannelSender implements Me try { channel.send(new Member[]{channel.getLocalMember(true)}, toByteMessage(msg), - Channel.SEND_OPTIONS_USE_ACK); + Channel.SEND_OPTIONS_USE_ACK | + Channel.SEND_OPTIONS_BYTE_MESSAGE); if (log.isDebugEnabled()) { log.debug("Sent " + msg + " to self"); } @@ -134,6 +137,7 @@ public class ChannelSender implements Me channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | + Channel.SEND_OPTIONS_BYTE_MESSAGE | TribesConstants.MSG_ORDER_OPTION | TribesConstants.AT_MOST_ONCE_OPTION); if (log.isDebugEnabled()) { Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java?rev=1243880&view=auto ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java (added) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java Tue Feb 14 11:23:12 2012 @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +* +* WSO2 Inc. licenses this file to you under the Apache License, +* Version 2.0 (the "License"); you may not use this file except +* in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.axis2.clustering.tribes; + +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.AxisModule; +import org.apache.axis2.description.AxisServiceGroup; +import org.apache.axis2.engine.AxisConfiguration; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * A util for manipulating classloaders to be used while serializing & deserializing Tribes messages + */ +public class ClassLoaderUtil { + + public static ClassLoader[] getClassLoaders(ConfigurationContext configurationContext) { + AxisConfiguration configuration = configurationContext.getAxisConfiguration(); + List<ClassLoader> classLoaders = new ArrayList<ClassLoader>(); + classLoaders.add(configuration.getSystemClassLoader()); + classLoaders.add(ClassLoaderUtil.class.getClassLoader()); + for (Iterator iter = configuration.getServiceGroups(); iter.hasNext(); ) { + AxisServiceGroup group = (AxisServiceGroup) iter.next(); + classLoaders.add(group.getServiceGroupClassLoader()); + } + for (Object obj : configuration.getModules().values()) { + AxisModule module = (AxisModule) obj; + classLoaders.add(module.getModuleClassLoader()); + } + return classLoaders.toArray(new ClassLoader[classLoaders.size()]); + } +} Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1243880&r1=1243879&r2=1243880&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Tue Feb 14 11:23:12 2012 @@ -156,7 +156,7 @@ public class TribesClusteringAgent imple addRequestBlockingHandlerToInFlows(); primaryMembershipManager = new MembershipManager(configurationContext); - channel = new GroupChannel(); + channel = new Axis2GroupChannel(configurationContext); channel.setHeartbeat(true); channelSender = new ChannelSender(channel, primaryMembershipManager, synchronizeAllMembers()); axis2ChannelListener = @@ -225,7 +225,7 @@ public class TribesClusteringAgent imple ClusteringContextListener contextListener = new ClusteringContextListener(channelSender); configurationContext.addContextListener(contextListener); } - + configurationContext.getAxisConfiguration().addObservers(new TribesAxisObserver()); configurationContext. setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true"); log.info("Cluster initialization completed."); @@ -680,7 +680,7 @@ public class TribesClusteringAgent imple primaryMembershipManager.getLongestLivingMember() : // First try to get from the longest member alive primaryMembershipManager.getRandomMember(); // Else get from a random member String memberHost = TribesUtil.getName(member); - log.info("Trying to send intialization request to " + memberHost); + log.info("Trying to send initialization request to " + memberHost); try { if (!sentMembersList.contains(memberHost)) { Response[] responses; @@ -688,7 +688,8 @@ public class TribesClusteringAgent imple responses = rpcInitChannel.send(new Member[]{member}, command, RpcChannel.FIRST_REPLY, - Channel.SEND_OPTIONS_ASYNCHRONOUS, + Channel.SEND_OPTIONS_ASYNCHRONOUS | + Channel.SEND_OPTIONS_BYTE_MESSAGE, 10000); if (responses.length == 0) { try {