Author: fhanik Date: Wed May 17 12:56:22 2006 New Revision: 407359 URL: http://svn.apache.org/viewvc?rev=407359&view=rev Log: Implemented soft membership ping
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java?rev=407359&r1=407358&r2=407359&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/RpcChannel.java Wed May 17 12:56:22 2006 @@ -90,7 +90,7 @@ } } catch ( InterruptedException ix ) { Thread.currentThread().interrupted(); - throw new ChannelException(ix); + //throw new ChannelException(ix); }finally { responseMap.remove(key); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=407359&r1=407358&r2=407359&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Wed May 17 12:56:22 2006 @@ -16,7 +16,6 @@ package org.apache.catalina.tribes.tipis; -import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -33,10 +32,15 @@ import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.Response; +import org.apache.catalina.tribes.group.RpcCallback; +import org.apache.catalina.tribes.group.RpcChannel; import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.membership.MemberImpl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.catalina.tribes.Heartbeat; +import java.util.HashMap; import org.apache.catalina.tribes.group.*; /** @@ -51,7 +55,7 @@ * @author not attributable * @version 1.0 */ -public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener { +public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat { protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); /** @@ -78,10 +82,12 @@ private transient byte[] mapContextName; private transient boolean stateTransferred = false; private transient Object stateMutex = new Object(); - private transient ArrayList mapMembers = new ArrayList(); + private transient HashMap mapMembers = new HashMap(); private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; private transient Object mapOwner; private transient ClassLoader[] externalLoaders; + protected transient int currentNode = 0; + private transient long accessTimeout = 5000; //------------------------------------------------------------------------------ @@ -149,6 +155,34 @@ } + private void ping(long timeout) throws ChannelException { + //send out a map membership message, only wait for the first reply + MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_INIT, + false, null, null, null, wrap(channel.getLocalMember(false))); + Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.ALL_REPLY, (channelSendOptions), (int)accessTimeout); + for (int i = 0; i < resp.length; i++) { + memberAlive(resp[i].getSource()); + }//for + + synchronized (mapMembers) { + Iterator it = mapMembers.entrySet().iterator(); + long now = System.currentTimeMillis(); + while ( it.hasNext() ) { + Map.Entry entry = (Map.Entry)it.next(); + long access = ((Long)entry.getValue()).longValue(); + if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey()); + } + }//synch + } + + private void memberAlive(Member member) { + synchronized (mapMembers) { + if (!mapMembers.containsKey(member)) { + mapMemberAdded(member); + } //end if + mapMembers.put(member, new Long(System.currentTimeMillis())); + } + } private void broadcast(int msgtype, boolean rpc) throws ChannelException { //send out a map membership message, only wait for the first reply @@ -190,21 +224,22 @@ //------------------------------------------------------------------------------ // GROUP COM INTERFACES //------------------------------------------------------------------------------ - public Member[] getMapMembers() { - synchronized (mapMembers) { - Member[] result = new Member[mapMembers.size()]; - mapMembers.toArray(result); + public Member[] getMapMembers(HashMap members) { + synchronized (members) { + Member[] result = new Member[members.size()]; + members.keySet().toArray(result); return result; } } + public Member[] getMapMembers() { + return getMapMembers(this.mapMembers); + } public Member[] getMapMembersExcl(Member[] exclude) { synchronized (mapMembers) { - ArrayList list = (ArrayList)mapMembers.clone(); + HashMap list = (HashMap)mapMembers.clone(); for (int i=0; i<exclude.length;i++) list.remove(exclude[i]); - Member[] result = new Member[list.size()]; - list.toArray(result); - return result; + return getMapMembers(list); } } @@ -375,6 +410,8 @@ mapmsg.deserialize(getExternalLoaders()); if (mapmsg.getMsgType() == MapMessage.MSG_START) { mapMemberAdded(mapmsg.getBackupNodes()[0]); + } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { + memberAlive(mapmsg.getBackupNodes()[0]); } } catch (IOException x ) { log.error("Unable to deserialize MapMessage.",x); @@ -469,8 +506,8 @@ boolean memberAdded = false; //select a backup node if we don't have one synchronized (mapMembers) { - if (!mapMembers.contains(member) ) { - mapMembers.add(member); + if (!mapMembers.containsKey(member) ) { + mapMembers.put(member, new Long(System.currentTimeMillis())); memberAdded = true; } } @@ -520,7 +557,7 @@ public void memberDisappeared(Member member) { boolean removed = false; synchronized (mapMembers) { - removed = mapMembers.remove(member); + removed = (mapMembers.remove(member) != null ); } Iterator i = super.entrySet().iterator(); while (i.hasNext()) { @@ -537,7 +574,6 @@ } //while } - protected int currentNode = 0; public int getNextBackupIndex() { int size = mapMembers.size(); if (mapMembers.size() == 0)return -1; @@ -557,6 +593,14 @@ } protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; + + public void heartbeat() { + try { + ping(accessTimeout); + }catch ( Exception x ) { + log.error("Unable to send AbstractReplicatedMap.ping message",x); + } + } //------------------------------------------------------------------------------ // METHODS TO OVERRIDE @@ -901,6 +945,10 @@ return channelSendOptions; } + public long getAccessTimeout() { + return accessTimeout; + } + public void setMapOwner(Object mapOwner) { this.mapOwner = mapOwner; } @@ -911,6 +959,10 @@ public void setChannelSendOptions(int channelSendOptions) { this.channelSendOptions = channelSendOptions; + } + + public void setAccessTimeout(long accessTimeout) { + this.accessTimeout = accessTimeout; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407359&r1=407358&r2=407359&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Wed May 17 12:56:22 2006 @@ -41,21 +41,6 @@ Code Tasks: =========================================== -46. Heartbeat Interface, to notify listeners as well - -44. Soft membership failure detection, ie if a webapp is stopped, but - the AbstractReplicatedMap doesn't broadcast a stop message - This is one potential solution: - 1. keep a static WeakHashMap of all map implementations running - so that we can share one heartbeat thread for timeouts - 2. everytime a message is received, update the last check time for that - member so that we don't need the thread to actively check - 3. when the thread wakes up, it will check maps that are outside - the valid range for check time, - 4. send a RPC message, if no reply, remove the map from itself - Other solution, use the TcpFailureDetector, catch send errors - - 45. McastServiceImpl.receive should have a SO_TIMEOUT so that we can check for members dropping on the same thread @@ -286,3 +271,18 @@ component, only the listener Notes: Completed. added in correct startup sequences. +46. Heartbeat Interface, to notify listeners as well +Notes: Implemented + +44. Soft membership failure detection, ie if a webapp is stopped, but + the AbstractReplicatedMap doesn't broadcast a stop message + This is one potential solution: + 1. keep a static WeakHashMap of all map implementations running + so that we can share one heartbeat thread for timeouts + 2. everytime a message is received, update the last check time for that + member so that we don't need the thread to actively check + 3. when the thread wakes up, it will check maps that are outside + the valid range for check time, + 4. send a RPC message, if no reply, remove the map from itself + Other solution, use the TcpFailureDetector, catch send errors +Notes: Implemented using a periodic ping in the AbstractReplicatedMap \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]