Author: fhanik Date: Tue Jun 13 15:45:25 2006 New Revision: 413996 URL: http://svn.apache.org/viewvc?rev=413996&view=rev Log: Fine tuned the test cases, we don't need long sleeps, it should all go pretty quick
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=413996&r1=413995&r2=413996&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue Jun 13 15:45:25 2006 @@ -241,8 +241,19 @@ protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local,msg.getMembers()); + int current = next; msg.leader = msg.getMembers()[0]; - if ( next >= 0 ) sendElectionMsg(local,(MemberImpl)msg.getMembers()[next],msg); + boolean sent = false; + while ( !sent && current >= 0 ) { + try { + sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg); + sent = true; + }catch ( ChannelException x ) { + log.warn("Unable to send election message to:"+msg.getMembers()[current]); + current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers()); + if ( current == next ) throw x; + } + } } public Member getNextInLine(MemberImpl local, MemberImpl[] others) { @@ -479,7 +490,7 @@ if ( membership == null ) setupMembership(); if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); try { - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add")); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")")); if (started && elect) startElection(false); }catch ( ChannelException x ) { log.error("Unable to start election when member was added.",x); @@ -495,7 +506,7 @@ membership.removeMember((MemberImpl)member); super.memberDisappeared(member); try { - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove")); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")")); if ( started && (isCoordinator() || member.equals(getCoordinator())) ) startElection(false); //to do, if a member disappears, only the coordinator can start @@ -696,7 +707,8 @@ } public void fireInterceptorEvent(InterceptorEvent event) { - System.out.println(event); + if (event instanceof CoordinationEvent && + ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX) System.out.println(event); } public static class CoordinationEvent implements InterceptorEvent { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=413996&r1=413995&r2=413996&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Tue Jun 13 15:45:25 2006 @@ -191,11 +191,13 @@ Object mutex = this.getPool().getInterestOpsMutex(); synchronized (mutex) { try { - // cycle the selector so this key is active again - key.selector().wakeup(); - // resume interest in OP_READ, OP_WRITE - int resumeOps = key.interestOps() | SelectionKey.OP_READ; - key.interestOps(resumeOps); + if ( key.isValid() ) { + // cycle the selector so this key is active again + key.selector().wakeup(); + // resume interest in OP_READ, OP_WRITE + int resumeOps = key.interestOps() | SelectionKey.OP_READ; + key.interestOps(resumeOps); + } }catch ( Exception x ) { try {key.selector().close();}catch ( Exception ignore){} log.error("Unable to cycle the selector, connection disconnected?",x); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java?rev=413996&r1=413995&r2=413996&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java Tue Jun 13 15:45:25 2006 @@ -48,6 +48,7 @@ try { sender.sendMessage(destination, message); }finally { + if ( !connected ) disconnect(); returnSender(sender); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java?rev=413996&r1=413995&r2=413996&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Tue Jun 13 15:45:25 2006 @@ -1,21 +1,19 @@ package org.apache.catalina.tribes.test.interceptors; -import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; -import junit.framework.TestCase; -import org.apache.catalina.tribes.group.GroupChannel; -import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.Member; -import junit.framework.TestSuite; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import junit.framework.TestCase; import junit.framework.TestResult; -import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; -import org.apache.catalina.tribes.ChannelInterceptor; +import junit.framework.TestSuite; public class TestNonBlockingCoordinator extends TestCase { GroupChannel[] channels = null; NonBlockingCoordinator[] coordinators = null; - int channelCount = 3; + int channelCount = 10; Thread[] threads = null; protected void setUp() throws Exception { System.out.println("Setup"); @@ -42,7 +40,7 @@ } for ( int i=0; i<channelCount; i++ ) threads[i].start(); for ( int i=0; i<channelCount; i++ ) threads[i].join(); - Thread.sleep(10000); + Thread.sleep(1000); } public void testCoord1() throws Exception { @@ -56,15 +54,23 @@ } - public void stestCoord2() throws Exception { + public void testCoord2() throws Exception { Member member = coordinators[1].getCoordinator(); System.out.println("Coordinator[2a] is:" + member); - System.out.println("Shutting down:"+channels[0].getLocalMember(true).toString()); - channels[0].stop(Channel.DEFAULT); + int index = -1; + for ( int i=0; i<channelCount; i++ ) { + if ( channels[i].getLocalMember(false).equals(member) ) { + System.out.println("Shutting down:" + channels[i].getLocalMember(true).toString()); + channels[i].stop(Channel.DEFAULT); + index = i; + } + } + int dead = index; Thread.sleep(1000); - System.out.println("Member count:"+channels[1].getMembers().length); - member = coordinators[1].getCoordinator(); - for (int i = 1; i < channelCount; i++)super.assertEquals(member, coordinators[i].getCoordinator()); + if ( index == 0 ) index = 1; else index = 0; + System.out.println("Member count:"+channels[index].getMembers().length); + member = coordinators[index].getCoordinator(); + for (int i = 1; i < channelCount; i++) if ( i != dead ) super.assertEquals(member, coordinators[i].getCoordinator()); System.out.println("Coordinator[2b] is:" + member); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]