Author: fhanik Date: Tue May 23 13:23:49 2006 New Revision: 409000 URL: http://svn.apache.org/viewvc?rev=409000&view=rev Log: Slowly implementing to match the state diagram
Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia?rev=409000&r1=408999&r2=409000&view=diff ============================================================================== Binary files - no diff available. Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg?rev=409000&r1=408999&r2=409000&view=diff ============================================================================== Binary files - no diff available. Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=409000&r1=408999&r2=409000&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue May 23 13:23:49 2006 @@ -30,6 +30,7 @@ import org.apache.catalina.tribes.Channel; import java.util.HashMap; import java.util.LinkedHashMap; +import org.apache.catalina.tribes.membership.*; /** * <p>Title: NonBlockingCoordinator</p> @@ -132,6 +133,17 @@ protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20}; /** + * Alive message + */ + protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46, + -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111, + 74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26, + 119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55}; + /** + * Time to wait for coordination timeout + */ + protected long waitForCoordMsgTimeout = 5000; + /** * Our current view */ protected Membership view = null; @@ -163,17 +175,11 @@ } public void start(int svc) throws ChannelException { - try { - halt(); - if ( started ) return; - super.start(startsvc); - started = true; - - }finally { - release(); - } - //coordination can happen before this line of code executes - Member local = getLocalMember(false); + if ( membership == null ) setupMembership(); + if (started)return; + super.start(startsvc); + startElection(false); + started = true; } public void stop(int svc) throws ChannelException { @@ -191,78 +197,82 @@ return (Membership)rotatingViews.get(id); } - public void elect() { + public void startElection(boolean force) throws ChannelException { synchronized (electionMutex) { - try { - Member[] mbrs = super.getMembers(); - //no members, exit - if ( mbrs.length == 0 ) return; - AbsoluteOrder.absoluteOrder(mbrs); - MemberImpl local = (MemberImpl)getLocalMember(false); - //I'm not the higest, exit - if ( !local.equals(mbrs[0]) ) return; - //I'm already running an election - if ( suggestedviewId != null ) return; - //create a suggestedview - suggestedviewId = new UniqueId(UUIDGenerator.randomUUID(true)); - Membership suggestedview = new Membership((MemberImpl)local,AbsoluteOrder.comp); - rotatingViews.put(suggestedviewId,suggestedview); - suggestedview.addMember((MemberImpl)local); - Arrays.fill(suggestedview,mbrs); - suggestedviewId = new UniqueId(UUIDGenerator.randomUUID(true)); - CoordinationMessage msg = new CoordinationMessage(local,local,suggestedview.getMembers(),suggestedviewId,COORD_REQUEST); - for (int i=0; i<mbrs.length; i++ ) { - try { - sendMessage(msg,mbrs[i]); - break; - } catch ( ChannelException x ) { - log.error("Unable to send election message, trying next node.",x); - } + if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them + MemberImpl local = (MemberImpl)getLocalMember(false); + MemberImpl[] others = (MemberImpl[])membership.getMembers(); + if ( others.length == 0 ) return; //the only member, no need for an election + int prio = AbsoluteOrder.comp.compare(local,others[0]); + MemberImpl leader = ( prio < 0 )?local:others[0]; + if ( local.equals(leader) || force ) sendElectionMsg(local,leader,others); + else { + try { + electionMutex.wait(waitForCoordMsgTimeout); + }catch ( InterruptedException x ) { + Thread.currentThread().interrupted(); } - halt(); - } finally { - //dont release, election running - //release will happen on processCoordMessage - } + if ( rotatingViews.size() == 0 ) { + //no message arrived, send the coord msg + startElection(true); + } + }//end if + } + } + + protected void sendElectionMsg(MemberImpl local, MemberImpl leader, MemberImpl[] others) throws ChannelException { + synchronized (electionMutex) { + if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them + Membership m = new Membership(local,AbsoluteOrder.comp); + m.addMember(local); + Arrays.fill(m,others); + MemberImpl[] mbrs = m.getMembers(); + CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST); + suggestedviewId = msg.getId(); + rotatingViews.put(suggestedviewId, msg); + super.sendMessage(new Member[] {others[0]}, createData(msg, local), null); } } + public ChannelData createData(CoordinationMessage msg, MemberImpl local) { + ChannelData data = new ChannelData(true); + data.setAddress(local); + data.setMessage(msg.getBuffer()); + data.setOptions(Channel.SEND_OPTIONS_USE_ACK); + data.setTimestamp(System.currentTimeMillis()); + return data; + } + protected void viewChange(UniqueId viewId, Member[] view) { //invoke any listeners } + protected boolean alive(Member mbr) { + return TcpFailureDetector.memberAlive(mbr, + COORD_ALIVE, + false, + false, + waitForCoordMsgTimeout, + waitForCoordMsgTimeout, + getOptionFlag()); + } + + protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { + + MemberImpl local = (MemberImpl)getLocalMember(false); + Membership merged = new Membership(local,AbsoluteOrder.comp); + Arrays.fill(merged,msg.getMembers()); + Arrays.fill(merged,getMembers()); + Member[] diff = Arrays.diff(merged,membership,local); + for ( int i=0; i<diff.length; i++ ) { + if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]); + } + return merged; + } + protected void processCoordMessage(CoordinationMessage msg, Member sender) { synchronized (electionMutex) { - MemberImpl local = (MemberImpl) getLocalMember(false); - if (suggestedviewId != null) { - //we are running our own election - if (suggestedviewId.equals(msg.getId())) { - //we received our own token - Membership suggestedview = getView(msg.getId()); - Member[] suggested = suggestedview.getMembers(); - Member[] received = msg.getMembers(); - if (Arrays.sameMembers(suggested,received) ) { - //we completed the loop - view = suggestedview; - viewId = suggestedviewId; - suggestedviewId = null; - rotatingViews.remove(viewId); - suggestedview.reset(); - viewChange(viewId,view.getMembers()); - release(); - } else { - //view or leadership changed - if ( !local.equals(msg.getLeader()) ) { - //leadership changed - } else { - //leadership didn't change - //circulate it again - } - } - } - } else { - - } + msg.timestamp = System.currentTimeMillis(); } } @@ -289,7 +299,9 @@ } public void messageReceived(ChannelMessage msg) { - if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { + if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) { + //ignore message, its an alive message + } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { processCoordMessage(new CoordinationMessage(msg.getMessage()),msg.getAddress()); } else { super.messageReceived(msg); @@ -302,23 +314,31 @@ public void memberAdded(Member member) { try { - if ( membership == null ) setupMembership(); + if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); - halt(); + try { + if (started) startElection(false); + }catch ( ChannelException x ) { + log.error("Unable to start election when member was added.",x); + } }finally { - release(); } } public void memberDisappeared(Member member) { try { - halt(); - if ( started ) elect(); + + membership.removeMember((MemberImpl)member); + super.memberDisappeared(member); + try { + if (started) startElection(false); + }catch ( ChannelException x ) { + log.error("Unable to start election when member was removed.",x); + } }finally { - release(); } - super.memberDisappeared(member); + } public void heartbeat() { @@ -329,7 +349,7 @@ * has members */ public boolean hasMembers() { - if ( membership == null ) setupMembership(); + return membership.hasMembers(); } @@ -338,8 +358,8 @@ * @return all members or empty array */ public Member[] getMembers() { - if ( membership == null ) setupMembership(); - throw new UnsupportedOperationException("Not yet implemented"); + + return membership.getMembers(); } /** @@ -348,8 +368,8 @@ * @return Member */ public Member getMember(Member mbr) { - if ( membership == null ) setupMembership(); - throw new UnsupportedOperationException("Not yet implemented"); + + return membership.getMember(mbr); } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=409000&r1=408999&r2=409000&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Tue May 23 13:23:49 2006 @@ -58,7 +58,7 @@ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class ); - protected static byte[] testMessage = new byte[] { + protected static byte[] TCP_FAIL_DETECT = new byte[] { 79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20, 125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74, 55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50, @@ -100,8 +100,8 @@ boolean process = true; if ( okToProcess(msg.getOptions()) ) { //check to see if it is a testMessage, if so, process = false - process = ( (msg.getMessage().getLength() != testMessage.length) || - (!Arrays.equals(testMessage,msg.getMessage().getBytes()) ) ); + process = ( (msg.getMessage().getLength() != TCP_FAIL_DETECT.length) || + (!Arrays.equals(TCP_FAIL_DETECT,msg.getMessage().getBytes()) ) ); }//end if //ignore the message, it doesnt have the flag set @@ -218,8 +218,14 @@ } - protected boolean memberAlive(Member mbr) { + return memberAlive(mbr,TCP_FAIL_DETECT,performSendTest,performReadTest,readTestTimeout,connectTimeout,getOptionFlag()); + } + + protected static boolean memberAlive(Member mbr, byte[] msgData, + boolean sendTest, boolean readTest, + long readTimeout, long conTimeout, + int optionFlag) { //could be a shutdown notification if ( Arrays.equals(mbr.getPayload(),Member.SHUTDOWN_PAYLOAD) ) return false; @@ -227,20 +233,20 @@ try { InetAddress ia = InetAddress.getByAddress(mbr.getHost()); InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort()); - socket.setSoTimeout((int)readTestTimeout); - socket.connect(addr, (int) connectTimeout); - if ( performSendTest ) { + socket.setSoTimeout((int)readTimeout); + socket.connect(addr, (int) conTimeout); + if ( sendTest ) { ChannelData data = new ChannelData(true); data.setAddress(mbr); - data.setMessage(new XByteBuffer(testMessage,false)); + data.setMessage(new XByteBuffer(msgData,false)); data.setTimestamp(System.currentTimeMillis()); - int options = getOptionFlag() | Channel.SEND_OPTIONS_BYTE_MESSAGE; - if ( performReadTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK); + int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE; + if ( readTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK); else options = (options & (~Channel.SEND_OPTIONS_USE_ACK)); data.setOptions(options); byte[] message = XByteBuffer.createDataPackage(data); socket.getOutputStream().write(message); - if ( performReadTest ) { + if ( readTest ) { int length = socket.getInputStream().read(message); return length > 0; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=409000&r1=408999&r2=409000&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Tue May 23 13:23:49 2006 @@ -101,6 +101,17 @@ for (int i=0; i<m.length; i++ ) mbrship.addMember((MemberImpl)m[i]); } + public static Member[] diff(Membership complete, Membership local, MemberImpl ignore) { + ArrayList result = new ArrayList(); + MemberImpl[] comp = complete.getMembers(); + for ( int i=0; i<comp.length; i++ ) { + if ( ignore!=null && ignore.equals(comp[i]) ) continue; + if ( local.getMember(comp[i]) == null ) result.add(comp[i]); + } + return (MemberImpl[])result.toArray(new MemberImpl[result.size()]); + } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]