Author: fhanik Date: Thu Jun 8 15:36:22 2006 New Revision: 412871 URL: http://svn.apache.org/viewvc?rev=412871&view=rev Log: Almost there, debugging the code and the algorithm left
Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.dia?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== Binary files - no diff available. Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/doc/leader-election-message-arrives.jpg?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== Binary files - no diff available. Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Thu Jun 8 15:36:22 2006 @@ -59,7 +59,7 @@ * If <code>heartbeat == true</code> then how often do we want this * heartbeat to run. default is one minute */ - protected long heartbeatSleeptime = 60*1000;//only run once a minute + protected long heartbeatSleeptime = 5*1000;//every 5 seconds /** * Internal heartbeat thread Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Thu Jun 8 15:36:22 2006 @@ -185,10 +185,15 @@ public void startElection(boolean force) throws ChannelException { synchronized (electionMutex) { - if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them MemberImpl local = (MemberImpl)getLocalMember(false); MemberImpl[] others = (MemberImpl[])membership.getMembers(); - if ( others.length == 0 ) return; //the only member, no need for an election + if ( others.length == 0 ) { + this.viewId = new UniqueId(UUIDGenerator.randomUUID(false)); + this.view = new Membership(local,AbsoluteOrder.comp, true); + this.handleViewConf(this.createElectionMsg(local,others,local),local,view); + return; //the only member, no need for an election + } + if ( suggestedviewId != null ) return;//election already running, I'm not allowed to have two of them int prio = AbsoluteOrder.comp.compare(local,others[0]); MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view? if ( local.equals(leader) || force ) { @@ -225,9 +230,9 @@ super.sendMessage(new Member[] {next}, createData(msg, local), null); } - protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) { + protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local,msg.getMembers()); - if ( AbsoluteOrder.comp.compare(local,msg.getLeader()) > 0 ) msg.leader = local; + msg.leader = msg.getMembers()[0]; if ( next >= 0 ) sendElectionMsg(local,(MemberImpl)msg.getMembers()[next],msg); } @@ -305,15 +310,29 @@ handleViewConf(msg,local,merged); } else { //membership change - + suggestedView = new Membership(local,AbsoluteOrder.comp,true); + suggestedviewId = msg.getId(); + Arrays.fill(suggestedView,merged.getMembers()); + msg.view = (MemberImpl[])merged.getMembers(); + sendElectionMsgToNextInline(local,msg); } } else { //leadership change + suggestedView = null; + suggestedviewId = null; + msg.view = (MemberImpl[])merged.getMembers(); + sendElectionMsgToNextInline(local,msg); } } protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { - + if ( local.equals(msg.getLeader()) ) { + //I am the new leader + startElection(false); + } else { + msg.view = (MemberImpl[])merged.getMembers(); + sendElectionMsgToNextInline(local,msg); + } } protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { @@ -326,6 +345,12 @@ suggestedView = null; suggestedviewId = null; } + + if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) { + suggestedView = null; + suggestedviewId = null; + } + viewChange(viewId,view.getMembers()); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { @@ -381,9 +406,9 @@ // OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE //============================================================================================================ public void start(int svc) throws ChannelException { - if ( membership == null ) setupMembership(); if (started)return; super.start(startsvc); + if ( membership == null ) setupMembership(); startElection(false); started = true; } @@ -392,8 +417,8 @@ try { halt(); if ( !started ) return; - super.stop(startsvc); started = false; + super.stop(startsvc); }finally { release(); } @@ -429,7 +454,7 @@ public void memberAdded(Member member,boolean elect) { try { - + if ( membership == null ) setupMembership(); if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); try { if (started && elect) startElection(false); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java Thu Jun 8 15:36:22 2006 @@ -77,7 +77,7 @@ */ public Membership(MemberImpl local, boolean includeLocal) { this.local = local; - this.addMember(local); + if ( includeLocal ) addMember(local); } public Membership(MemberImpl local) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java Thu Jun 8 15:36:22 2006 @@ -54,7 +54,7 @@ private boolean soKeepAlive = false; private boolean ooBInline = true; private boolean soReuseAddress = true; - private boolean soLingerOn = true; + private boolean soLingerOn = false; private int soLingerTime = 3; private int soTrafficClass = 0x04 | 0x08 | 0x010; private boolean throwOnFailedAck = false; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Thu Jun 8 15:36:22 2006 @@ -27,6 +27,7 @@ import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.BufferPool; +import java.nio.channels.CancelledKeyException; /** * A worker thread class which can drain channels and echo-back the input. Each @@ -79,7 +80,9 @@ } catch (Exception e) { //this is common, since the sockets on the other //end expire after a certain time. - if ( e instanceof IOException ) { + if ( e instanceof CancelledKeyException ) { + //do nothing + } else if ( e instanceof IOException ) { //dont spew out stack traces for IO exceptions unless debug is enabled. if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e); else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed."); @@ -187,11 +190,15 @@ //acquire the interestOps mutex Object mutex = this.getPool().getInterestOpsMutex(); synchronized (mutex) { - // cycle the selector so this key is active again - key.selector().wakeup(); - // resume interest in OP_READ, OP_WRITE - int resumeOps = key.interestOps() | SelectionKey.OP_READ; - key.interestOps(resumeOps); + try { + // cycle the selector so this key is active again + key.selector().wakeup(); + // resume interest in OP_READ, OP_WRITE + int resumeOps = key.interestOps() | SelectionKey.OP_READ; + key.interestOps(resumeOps); + }catch ( Exception x ) { + log.error("Unable to cycle the selector, connection disconnected?",x); + } } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java Thu Jun 8 15:36:22 2006 @@ -37,7 +37,7 @@ * @version 1.0 */ public class PooledParallelSender extends PooledSender implements MultiPointSender { - protected boolean connected = false; + protected boolean connected = true; public PooledParallelSender() { super(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=412871&r1=412870&r2=412871&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Thu Jun 8 15:36:22 2006 @@ -34,9 +34,9 @@ public static boolean contains(byte[] source, int srcoffset, byte[] key, int keyoffset, int length) { if ( srcoffset < 0 || srcoffset >= source.length) throw new ArrayIndexOutOfBoundsException("srcoffset is out of bounds."); if ( keyoffset < 0 || keyoffset >= key.length) throw new ArrayIndexOutOfBoundsException("keyoffset is out of bounds."); - if ( length >= (key.length-keyoffset) ) throw new ArrayIndexOutOfBoundsException("not enough data elements in the key, length is out of bounds."); + if ( length > (key.length-keyoffset) ) throw new ArrayIndexOutOfBoundsException("not enough data elements in the key, length is out of bounds."); //we don't have enough data to validate it - if ( length >= (source.length-srcoffset) ) return false; + if ( length > (source.length-srcoffset) ) return false; boolean match = true; int pos = keyoffset; for ( int i=srcoffset; match && i<length; i++ ) { @@ -116,7 +116,8 @@ } public static Member[] extract(Member[] all, Member[] remove) { - List list = java.util.Arrays.asList(all); + List alist = java.util.Arrays.asList(all); + ArrayList list = new ArrayList(alist); for (int i=0; i<remove.length; i++ ) list.remove(remove[i]); return (Member[])list.toArray(new Member[list.size()]); } Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java?rev=412871&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Thu Jun 8 15:36:22 2006 @@ -0,0 +1,82 @@ +package org.apache.catalina.tribes.test.interceptors; + +import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; +import junit.framework.TestCase; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.Member; +import junit.framework.TestSuite; +import junit.framework.TestResult; + +public class TestNonBlockingCoordinator extends TestCase { + + GroupChannel[] channels = null; + NonBlockingCoordinator[] coordinators = null; + int channelCount = 6; + Thread[] threads = null; + protected void setUp() throws Exception { + super.setUp(); + channels = new GroupChannel[channelCount]; + coordinators = new NonBlockingCoordinator[channelCount]; + threads = new Thread[channelCount]; + for ( int i=0; i<channelCount; i++ ) { + channels[i] = new GroupChannel(); + coordinators[i] = new NonBlockingCoordinator(); + channels[i].addInterceptor(coordinators[i]); + channels[i].addInterceptor(new TcpFailureDetector()); + final int j = i; + threads[i] = new Thread() { + public void run() { + try { + channels[j].start(Channel.DEFAULT); + Thread.sleep(50); + } catch (Exception x) { + x.printStackTrace(); + } + } + }; + } + for ( int i=0; i<channelCount; i++ ) threads[i].start(); + for ( int i=0; i<channelCount; i++ ) threads[i].join(); + } + + public void testCoord1() throws Exception { + for (int i=1; i<channelCount; i++ ) + assertEquals("Message count expected to be equal.",channels[i-1].getMembers().length,channels[i].getMembers().length); + + Member member = coordinators[0].getCoordinator(); + int cnt = 0; + while ( member == null && (cnt++ < 100 ) ) try {Thread.sleep(100); member = coordinators[0].getCoordinator();}catch ( Exception x){} + for (int i=0; i<channelCount; i++ ) super.assertEquals(member,coordinators[i].getCoordinator()); + System.out.println("Coordinator[1] is:"+member); + } + + public void testCoord2() throws Exception { + channels[0].stop(Channel.DEFAULT); + if ( channelCount > 3 ) channels[channelCount-1].start(Channel.DEFAULT); + Thread.sleep(1000); + System.out.println("Member count:"+channels[1].getMembers().length); + Member member = coordinators[1].getCoordinator(); + for (int i = 1; i < channelCount; i++)super.assertEquals(member, coordinators[i].getCoordinator()); + Thread.sleep(3000); + System.out.println("Coordinator[2] is:" + member); + + } + + protected void tearDown() throws Exception { + super.tearDown(); + for ( int i=0; i<channelCount; i++ ) { + channels[i].stop(Channel.DEFAULT); + } + } + + public static void main(String[] args) throws Exception { + TestSuite suite = new TestSuite(); + suite.addTestSuite(TestNonBlockingCoordinator.class); + suite.run(new TestResult()); + } + + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]