Author: fhanik
Date: Wed May 17 09:39:59 2006
New Revision: 407309

URL: http://svn.apache.org/viewvc?rev=407309&view=rev
Log:
Added test case for the TCP failure detector

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=407309&r1=407308&r2=407309&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
 Wed May 17 09:39:59 2006
@@ -31,6 +31,8 @@
 import org.apache.catalina.tribes.membership.Membership;
 import org.apache.catalina.tribes.membership.MemberImpl;
 import java.util.Iterator;
+import org.apache.catalina.tribes.ChannelException.FaultyMember;
+import org.apache.catalina.tribes.RemoteProcessException;
 
 /**
  * <p>Title: A perfect failure detector </p>
@@ -42,9 +44,11 @@
  * is not getting enough time to update its table, members can be &quot;timed 
out&quot;
  * This failure detector will intercept the memberDisappeared message(unless 
its a true shutdown message)
  * and connect to the member using TCP.
- * 
- * NOT YET COMPLETE
- *    
+ * </p>
+ * <p>
+ * The TcpFailureDetector works in two ways. <br>
+ * 1. It intercepts memberDisappeared events
+ * 2. It catches send errors 
  * </p>
  *
  * @author Filip Hanik
@@ -75,7 +79,18 @@
     protected HashMap suspect = new HashMap();
     
     public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
-        super.sendMessage(destination,msg,payload);
+        try {
+            super.sendMessage(destination, msg, payload);
+        }catch ( ChannelException cx ) {
+            FaultyMember[] mbrs = cx.getFaultyMembers();
+            for ( int i=0; i<mbrs.length; i++ ) {
+                if ( mbrs[i].getCause()!=null &&  
+                     (!(mbrs[i].getCause() instanceof RemoteProcessException)) 
) {//RemoteProcessException's are ok
+                    this.memberDisappeared(mbrs[i].getMember());
+                }//end if
+            }//for
+            throw cx;
+        }
     }
 
     public void messageReceived(ChannelMessage msg) {
@@ -99,9 +114,8 @@
             //previously marked suspect, system below picked up the member 
again
             suspect.remove(member);
         } else {
-            //not correct, this could make the membership out of sync
-            membership.addMember((MemberImpl)member);
-            super.memberAdded(member);
+            //if we add it here, then add it upwards too
+            if ( membership.memberAlive((MemberImpl)member))  
super.memberAdded(member);
         }
     }
 
@@ -144,8 +158,13 @@
         Member[] members = super.getMembers();
         for ( int i=0; i<members.length; i++ ) {
             if ( membership.memberAlive((MemberImpl)members[i]) ) {
-                log.warn("Member added, even though we werent 
notified:"+members[i]);
-                super.memberAdded(members[i]);
+                //we don't have this one in our membership, check to see if 
he/she is alive
+                if ( memberAlive(members[i]) ) {
+                    log.warn("Member added, even though we werent 
notified:"+members[i]);
+                    super.memberAdded(members[i]);
+                } else {
+                    membership.removeMember((MemberImpl)members[i]);
+                }//end if
             }//end if
         }//for
         

Added: 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java?rev=407309&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
 Wed May 17 09:39:59 2006
@@ -0,0 +1,105 @@
+package org.apache.catalina.tribes.test.membership;
+
+import junit.framework.*;
+import org.apache.catalina.tribes.group.interceptors.*;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.Member;
+import java.util.ArrayList;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ManagedChannel;
+
+/**
+ * <p>Title: </p> 
+ * 
+ * <p>Description: </p> 
+ * 
+ * <p>Copyright: Copyright (c) 2005</p> 
+ * 
+ * <p>Company: </p>
+ * 
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestTcpFailureDetector extends TestCase {
+    private TcpFailureDetector tcpFailureDetector1 = null;
+    private TcpFailureDetector tcpFailureDetector2 = null;
+    private ManagedChannel channel1 = null;
+    private ManagedChannel channel2 = null;
+    private TestMbrListener mbrlist1 = null;
+    private TestMbrListener mbrlist2 = null;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channel1 = new GroupChannel();
+        channel2 = new GroupChannel();
+        mbrlist1 = new TestMbrListener();
+        mbrlist2 = new TestMbrListener();
+        tcpFailureDetector1 = new TcpFailureDetector();
+        tcpFailureDetector2 = new TcpFailureDetector();
+        channel1.addInterceptor(tcpFailureDetector1);
+        channel2.addInterceptor(tcpFailureDetector2);
+        channel1.addMembershipListener(mbrlist1);
+        channel2.addMembershipListener(mbrlist2);
+    }
+    
+    
+    public void testTcpSendFailureMemberDrop() throws Exception {
+        channel1.start(channel1.DEFAULT);
+        channel2.start(channel2.DEFAULT);
+        channel2.stop(channel2.SND_RX_SEQ);
+        assertEquals("Expecting member count to be 
equal",mbrlist1.members.size(),mbrlist2.members.size());
+        ByteMessage msg = new ByteMessage(new byte[1024]);
+        try {
+            channel1.send(channel1.getMembers(), msg, 0);
+            assertEquals("Message send should have failed.",true,false);
+        } catch ( ChannelException x ) {
+            
+        }
+        assertEquals("Expecting member count to not be 
equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+        channel1.stop(Channel.DEFAULT);
+        channel2.stop(Channel.DEFAULT);
+    }
+    
+    public void testTcpMcastFail() throws Exception {
+        channel1.start(channel1.DEFAULT);
+        channel2.start(channel2.DEFAULT);
+        assertEquals("Expecting member count to be 
equal",mbrlist1.members.size(),mbrlist2.members.size());
+        channel2.stop(channel2.MBR_TX_SEQ);
+        ByteMessage msg = new ByteMessage(new byte[1024]);
+        try {
+            Thread.sleep(5000);
+            assertEquals("Expecting member count to be 
equal",mbrlist1.members.size(),mbrlist2.members.size());
+            channel1.send(channel1.getMembers(), msg, 0);
+        } catch ( ChannelException x ) {
+            assertEquals("Message send should have succeeded.",true,false);
+        }
+        channel1.stop(Channel.DEFAULT);
+        channel2.stop(Channel.DEFAULT);
+    }
+
+
+    protected void tearDown() throws Exception {
+        tcpFailureDetector1 = null;
+        tcpFailureDetector2 = null;
+        try { channel1.stop(Channel.DEFAULT);}catch (Exception ignore){}
+        channel1 = null;
+        try { channel2.stop(Channel.DEFAULT);}catch (Exception ignore){}
+        channel2 = null;
+        super.tearDown();
+    }
+    
+    public class TestMbrListener implements MembershipListener {
+        public ArrayList members = new ArrayList();
+        public void memberAdded(Member member) {
+            if ( !members.contains(member) ) members.add(member);
+        }
+        
+        public void memberDisappeared(Member member) {
+            if ( members.contains(member) ) members.remove(member);
+        }
+        
+    }
+
+}

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407309&r1=407308&r2=407309&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Wed May 17 09:39:59 2006
@@ -43,17 +43,6 @@
 45. McastServiceImpl.receive should have a SO_TIMEOUT so that we can check
     for members dropping on the same thread
 
-44. Soft membership failure detection, ie if a webapp is stopped, but
-    the AbstractReplicatedMap doesn't broadcast a stop message
-    This is one potential solution:
-    1. keep a static WeakHashMap of all map implementations running
-       so that we can share one heartbeat thread for timeouts
-    2. everytime a message is received, update the last check time for that
-       member so that we don't need the thread to actively check
-    3. when the thread wakes up, it will check maps that are outside
-       the valid range for check time, 
-    4. send a RPC message, if no reply, remove the map from itself
-
 41. Build a tipi that is a soft membership
 
 38. Make the AbstractReplicatedMap accept non serializable elements, but just 
don't replicate them
@@ -279,4 +268,18 @@
 43. Silent member, node discovery.
     Add in the ability to start up tribes, but don't start the membership 
broadcast
     component, only the listener
-Notes: Completed. added in correct startup sequences.
\ No newline at end of file
+Notes: Completed. added in correct startup sequences.
+
+44. Soft membership failure detection, ie if a webapp is stopped, but
+    the AbstractReplicatedMap doesn't broadcast a stop message
+    This is one potential solution:
+    1. keep a static WeakHashMap of all map implementations running
+       so that we can share one heartbeat thread for timeouts
+    2. everytime a message is received, update the last check time for that
+       member so that we don't need the thread to actively check
+    3. when the thread wakes up, it will check maps that are outside
+       the valid range for check time, 
+    4. send a RPC message, if no reply, remove the map from itself
+    Other solution, use the TcpFailureDetector, catch send errors 
+Notes: The TcpFailureDetector will add this functionality by intercepting 
+       a send failure and promote that as a member disappeared
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to