Author: fhanik Date: Fri May 19 15:42:51 2006 New Revision: 407923 URL: http://svn.apache.org/viewvc?rev=407923&view=rev Log: Added in absolute order utility. Added in the complete test for the tcp failure detector
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java?rev=407923&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java Fri May 19 15:42:51 2006 @@ -0,0 +1,99 @@ +package org.apache.catalina.tribes.group; + +import org.apache.catalina.tribes.Member; +import java.util.Comparator; +import java.util.Arrays; + +/** + * <p>Title: Membership - Absolute Order</p> + * + * <p>Description: A simple, yet agreeable and efficient way of ordering members</p> + * <p> + * Ordering members can serve as a basis for electing a leader or coordinating efforts.<br> + * This is stinky simple, it works on the basis of the <code>Member</code> interface + * and orders members in the following format: + * + * <ol> + * <li>IP comparison - byte by byte, lower byte higher rank</li> + * <li>IPv4 addresses rank higher than IPv6, ie the lesser number of bytes, the higher rank</li> + * <li>Port comparison - lower port, higher rank</li> + * <li>UniqueId comparison- byte by byte, lower byte higher rank</li> + * </ol> + * + * </p> + * + * @author Filip Hanik + * @version 1.0 + * @see org.apache.catalina.tribes.Member + */ +public class AbsoluteOrder { + protected static AbsoluteComparator comp = new AbsoluteComparator(); + + protected AbsoluteOrder() { + super(); + } + + + + public static void absoluteOrder(Member[] members) { + Arrays.sort(members,comp); + } + + + public static class AbsoluteComparator implements Comparator { + public int compare(Object o1, Object o2) { + if ( !((o1 instanceof Member) && (o2 instanceof Member)) ) return 0; + return compareMembers((Member)o1,(Member)o2); + } + + public int compareMembers(Member m1, Member m2) { + int result = compareIps(m1,m2); + if ( result == 0 ) result = comparePorts(m1,m2); + if ( result == 0 ) result = compareIds(m1,m2); + return result; + } + + public int compareIps(Member m1, Member m2) { + return compareBytes(m1.getHost(),m2.getHost()); + } + + public int comparePorts(Member m1, Member m2) { + return compareInts(m1.getPort(),m2.getPort()); + } + + public int compareIds(Member m1, Member m2) { + return compareBytes(m1.getUniqueId(),m2.getUniqueId()); + } + + protected int compareBytes(byte[] d1, byte[] d2) { + int result = 0; + if ( d1.length == d2.length ) { + for (int i=0; (result==0) && (i<d1.length); i++) { + result = compareBytes(d1[i],d2[i]); + } + } else if ( d1.length < d2.length) { + result = -1; + } else { + result = 1; + } + return result; + } + + protected int compareBytes(byte b1, byte b2) { + return compareInts((int)b1,(int)b2); + } + + protected int compareInts(int b1, int b2) { + int result = 0; + if ( b1 == b2 ) { + + } else if ( b1 < b2) { + result = -1; + } else { + result = 1; + } + return result; + } + } + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=407923&r1=407922&r2=407923&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Fri May 19 15:42:51 2006 @@ -117,12 +117,12 @@ removeSuspects.remove(member); } else { //if we add it here, then add it upwards too - if ( membership.memberAlive((MemberImpl)member) ) { + if ( membership.getMember((MemberImpl)member) == null) { //check to see if it is alive if (memberAlive(member)) { + membership.memberAlive((MemberImpl)member); super.memberAdded(member); } else { - membership.removeMember((MemberImpl)member); addSuspects.put(member, new Long(System.currentTimeMillis())); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=407923&r1=407922&r2=407923&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Fri May 19 15:42:51 2006 @@ -178,6 +178,8 @@ socket.setTimeToLive(mcastTTL); } } + + /** * Start the service Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java?rev=407923&r1=407922&r2=407923&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java Fri May 19 15:42:51 2006 @@ -34,8 +34,10 @@ super.setUp(); channel1 = new GroupChannel(); channel2 = new GroupChannel(); - mbrlist1 = new TestMbrListener(); - mbrlist2 = new TestMbrListener(); + channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII")); + channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII")); + mbrlist1 = new TestMbrListener("Channel-1"); + mbrlist2 = new TestMbrListener("Channel-2"); tcpFailureDetector1 = new TcpFailureDetector(); tcpFailureDetector2 = new TcpFailureDetector(); channel1.addInterceptor(tcpFailureDetector1); @@ -44,12 +46,19 @@ channel2.addMembershipListener(mbrlist2); } + public void clear() { + mbrlist1.members.clear(); + mbrlist2.members.clear(); + } public void testTcpSendFailureMemberDrop() throws Exception { + System.out.println("testTcpSendFailureMemberDrop()"); + clear(); channel1.start(channel1.DEFAULT); channel2.start(channel2.DEFAULT); - channel2.stop(channel2.SND_RX_SEQ); + Thread.sleep(1000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); + channel2.stop(channel2.SND_RX_SEQ); ByteMessage msg = new ByteMessage(new byte[1024]); try { channel1.send(channel1.getMembers(), msg, 0); @@ -62,9 +71,27 @@ channel2.stop(Channel.DEFAULT); } + public void testTcpFailureMemberAdd() throws Exception { + System.out.println("testTcpFailureMemberAdd()"); + clear(); + channel1.start(channel1.DEFAULT); + channel2.start(channel2.SND_RX_SEQ); + channel2.start(channel2.SND_TX_SEQ); + channel2.start(channel2.MBR_RX_SEQ); + channel2.stop(channel2.SND_RX_SEQ); + channel2.start(channel2.MBR_TX_SEQ); + Thread.sleep(1000); + assertEquals("Expecting member count to not be equal",mbrlist1.members.size()+1,mbrlist2.members.size()); + channel1.stop(Channel.DEFAULT); + channel2.stop(Channel.DEFAULT); + } + public void testTcpMcastFail() throws Exception { + System.out.println("testTcpMcastFail()"); + clear(); channel1.start(channel1.DEFAULT); channel2.start(channel2.DEFAULT); + Thread.sleep(1000); assertEquals("Expecting member count to be equal",mbrlist1.members.size(),mbrlist2.members.size()); channel2.stop(channel2.MBR_TX_SEQ); ByteMessage msg = new ByteMessage(new byte[1024]); @@ -91,13 +118,31 @@ } public class TestMbrListener implements MembershipListener { + public String name = null; + public TestMbrListener(String name) { + this.name = name; + } public ArrayList members = new ArrayList(); public void memberAdded(Member member) { - if ( !members.contains(member) ) members.add(member); + if ( !members.contains(member) ) { + members.add(member); + try{ + System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "]"); + }catch ( Exception x ) { + System.out.println(name + ":member added[unknown]"); + } + } } public void memberDisappeared(Member member) { - if ( members.contains(member) ) members.remove(member); + if ( members.contains(member) ) { + members.remove(member); + try{ + System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "]"); + }catch ( Exception x ) { + System.out.println(name + ":member disappeared[unknown]"); + } + } } } Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407923&r1=407922&r2=407923&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri May 19 15:42:51 2006 @@ -42,6 +42,15 @@ Code Tasks: =========================================== +9. CoordinatorInterceptor - manages the selection of a cluster coordinator + just had a brilliant idea, if GroupChannel keeps its own view of members, + the coordinator interceptor can hold on to the member added/disappared event + It can also intercept down going messages if the coordinator disappeared + while a new coordinator is chosen + It can also intercept down going messages for members disappeared that the + calling app not yet knows about, to avoid a ChannelException + The coordinator is needed because of the mixup when two channels startup instantly + 48. Periodic refresh of the replicated map (primary ->backup) 47. Delta(session) versioning. increase version number each time, easier to keep maps in sync @@ -136,15 +145,6 @@ (This is useful when synchronized=false and waitForAck=false, to improve parallel processing, but you want to have all messages sent in parallel and don't return until all have been processed on the remote end.) - -9. CoordinatorInterceptor - manages the selection of a cluster coordinator - just had a brilliant idea, if GroupChannel keeps its own view of members, - the coordinator interceptor can hold on to the member added/disappared event - It can also intercept down going messages if the coordinator disappeared - while a new coordinator is chosen - It can also intercept down going messages for members disappeared that the - calling app not yet knows about, to avoid a ChannelException - 10. Xa2PhaseCommitInterceptor - make sure the message doesn't reach the receiver unless all members got it --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]