Author: fhanik
Date: Mon May 22 15:39:06 2006
New Revision: 408775

URL: http://svn.apache.org/viewvc?rev=408775&view=rev
Log:
Membership arrival and disappearance should never be locked cause the 
interceptor or app is holding on to the thread.
These are rare events, hence we don't need a thread pool, instead fire off the 
events using a new thread.

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=408775&r1=408774&r2=408775&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
 Mon May 22 15:39:06 2006
@@ -197,12 +197,6 @@
             receiver.setDaemon(true);
             receiver.start();
             valid = true;
-            long memberwait = sendFrequency*4;
-            if(log.isInfoEnabled())
-                log.info("Sleeping for "+memberwait+" milliseconds to 
establish cluster membership");
-            try {Thread.sleep(memberwait);}catch (InterruptedException 
ignore){}
-            if(log.isInfoEnabled())
-                log.info("Done sleeping, membership established.");
         } 
         if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
             if ( sender != null ) throw new 
IllegalStateException("McastService.send already running.");
@@ -214,14 +208,26 @@
             sender = new SenderThread(sendFrequency);
             sender.setDaemon(true);
             sender.start();
+            //we have started the receiver, but not yet waited for membership 
to establish
             valid = true;
         } 
         if (!valid) {
             throw new IllegalArgumentException("Invalid start level. Only 
acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
         }
+        //pause, once or twice
+        waitForMembers(level);
         startLevel = (startLevel | level);
     }
 
+    private void waitForMembers(int level) {
+        long memberwait = sendFrequency*4;
+        if(log.isInfoEnabled())
+            log.info("Sleeping for "+memberwait+" milliseconds to establish 
cluster membership, start level:"+level);
+        try {Thread.sleep(memberwait);}catch (InterruptedException ignore){}
+        if(log.isInfoEnabled())
+            log.info("Done sleeping, membership established, start 
level:"+level);
+    }
+
     /**
      * Stops the service
      * @throws IOException if the service fails to disconnect from the sockets
@@ -272,19 +278,27 @@
             socket.receive(receivePacket);
             byte[] data = new byte[receivePacket.getLength()];
             System.arraycopy(receivePacket.getData(), 
receivePacket.getOffset(), data, 0, data.length);
-            MemberImpl m = MemberImpl.getMember(data);
+            final MemberImpl m = MemberImpl.getMember(data);
             if (log.isDebugEnabled())
                 log.debug("Mcast receive ping from member " + m);
-
-            if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) {
-                if (log.isDebugEnabled()) log.debug("Member has shutdown:" + 
m);
-                membership.removeMember(m);
-                service.memberDisappeared(m);
-            } else if (membership.memberAlive(m)) {
-                if (log.isDebugEnabled())
-                    log.debug("Mcast add member " + m);
-                service.memberAdded(m);
-            } //end if
+                Thread t = null;
+                if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) {
+                    if (log.isDebugEnabled()) log.debug("Member has shutdown:" 
+ m);
+                    membership.removeMember(m);
+                    t = new Thread() {
+                        public void run() {
+                            service.memberDisappeared(m);
+                        }
+                    };
+                } else if (membership.memberAlive(m)) {
+                    if (log.isDebugEnabled()) log.debug("Mcast add member " + 
m);
+                    t = new Thread() {
+                        public void run() {
+                            service.memberAdded(m);
+                        }
+                    };
+                } //end if
+            if ( t != null ) t.start();
         } catch (SocketTimeoutException x ) { 
             //do nothing, this is normal, we don't want to block forever
             //since the receive thread is the same thread
@@ -298,10 +312,16 @@
         synchronized (expiredMutex) {
             MemberImpl[] expired = membership.expire(timeToExpiration);
             for (int i = 0; i < expired.length; i++) {
+                final MemberImpl member = expired[i];
                 if (log.isDebugEnabled())
                     log.debug("Mcast exipre  member " + expired[i]);
                 try {
-                    service.memberDisappeared(expired[i]);
+                    Thread t = new Thread() {
+                        public void run() {
+                            service.memberDisappeared(member);
+                        }
+                    };
+                    t.start();
                 } catch (Exception x) {
                     log.error("Unable to process member disappeared message.", 
x);
                 }

Added: 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java?rev=408775&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
 Mon May 22 15:39:06 2006
@@ -0,0 +1,100 @@
+package org.apache.catalina.tribes.test.membership;
+
+import java.util.ArrayList;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+
+public class TestMemberArrival
+    extends TestCase {
+    private static int count = 10;
+    private ManagedChannel[] channels = new ManagedChannel[count];
+    private TestMbrListener[] listeners = new TestMbrListener[count];
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        for (int i = 0; i < channels.length; i++) {
+            channels[i] = new GroupChannel();
+            channels[i].getMembershipService().setPayload( ("Channel-" + (i + 
1)).getBytes("ASCII"));
+            listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+            channels[i].addMembershipListener(listeners[i]);
+
+        }
+    }
+
+    public void clear() {
+        for (int i = 0; i < channels.length; i++) {
+            listeners[i].members.clear();
+        }
+    }
+
+    public void testMemberArrival() throws Exception {
+        //purpose of this test is to make sure that we have received all the 
members
+        //that we can expect before the start method returns
+        Thread[] threads = new Thread[channels.length];
+        for (int i=0; i<channels.length; i++ ) {
+            final Channel channel = channels[i];
+            Thread t = new Thread() {
+                public void run() {
+                    try {
+                        channel.start(Channel.DEFAULT);
+                    }catch ( Exception x ) {
+                        throw new RuntimeException(x);
+                    }
+                }
+            };
+            threads[i] = t;
+        }
+        for (int i=0; i<threads.length; i++ ) threads[i].start();
+        for (int i=0; i<threads.length; i++ ) threads[i].join();
+        System.out.println("All channels started.");
+        for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking 
member arrival length",channels.length-1,listeners[i].members.size());
+    }
+
+    protected void tearDown() throws Exception {
+
+        for (int i = 0; i < channels.length; i++) {
+            try {
+                channels[i].stop(Channel.DEFAULT);
+            } catch (Exception ignore) {}
+        }
+        super.tearDown();
+    }
+
+    public class TestMbrListener
+        implements MembershipListener {
+        public String name = null;
+        public TestMbrListener(String name) {
+            this.name = name;
+        }
+
+        public ArrayList members = new ArrayList();
+        public void memberAdded(Member member) {
+            if (!members.contains(member)) {
+                members.add(member);
+                try {
+                    System.out.println(name + ":member added[" + new 
String(member.getPayload(), "ASCII") + "; 
Thread:"+Thread.currentThread().getName()+"]");
+                } catch (Exception x) {
+                    System.out.println(name + ":member added[unknown]");
+                }
+            }
+        }
+
+        public void memberDisappeared(Member member) {
+            if (members.contains(member)) {
+                members.remove(member);
+                try {
+                    System.out.println(name + ":member disappeared[" + new 
String(member.getPayload(), "ASCII") + "; 
Thread:"+Thread.currentThread().getName()+"]");
+                } catch (Exception x) {
+                    System.out.println(name + ":member disappeared[unknown]");
+                }
+            }
+        }
+
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to