Author: fhanik Date: Mon May 22 15:39:06 2006 New Revision: 408775 URL: http://svn.apache.org/viewvc?rev=408775&view=rev Log: Membership arrival and disappearance should never be locked cause the interceptor or app is holding on to the thread. These are rare events, hence we don't need a thread pool, instead fire off the events using a new thread.
Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=408775&r1=408774&r2=408775&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Mon May 22 15:39:06 2006 @@ -197,12 +197,6 @@ receiver.setDaemon(true); receiver.start(); valid = true; - long memberwait = sendFrequency*4; - if(log.isInfoEnabled()) - log.info("Sleeping for "+memberwait+" milliseconds to establish cluster membership"); - try {Thread.sleep(memberwait);}catch (InterruptedException ignore){} - if(log.isInfoEnabled()) - log.info("Done sleeping, membership established."); } if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) { if ( sender != null ) throw new IllegalStateException("McastService.send already running."); @@ -214,14 +208,26 @@ sender = new SenderThread(sendFrequency); sender.setDaemon(true); sender.start(); + //we have started the receiver, but not yet waited for membership to establish valid = true; } if (!valid) { throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ"); } + //pause, once or twice + waitForMembers(level); startLevel = (startLevel | level); } + private void waitForMembers(int level) { + long memberwait = sendFrequency*4; + if(log.isInfoEnabled()) + log.info("Sleeping for "+memberwait+" milliseconds to establish cluster membership, start level:"+level); + try {Thread.sleep(memberwait);}catch (InterruptedException ignore){} + if(log.isInfoEnabled()) + log.info("Done sleeping, membership established, start level:"+level); + } + /** * Stops the service * @throws IOException if the service fails to disconnect from the sockets @@ -272,19 +278,27 @@ socket.receive(receivePacket); byte[] data = new byte[receivePacket.getLength()]; System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length); - MemberImpl m = MemberImpl.getMember(data); + final MemberImpl m = MemberImpl.getMember(data); if (log.isDebugEnabled()) log.debug("Mcast receive ping from member " + m); - - if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) { - if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m); - membership.removeMember(m); - service.memberDisappeared(m); - } else if (membership.memberAlive(m)) { - if (log.isDebugEnabled()) - log.debug("Mcast add member " + m); - service.memberAdded(m); - } //end if + Thread t = null; + if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) { + if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m); + membership.removeMember(m); + t = new Thread() { + public void run() { + service.memberDisappeared(m); + } + }; + } else if (membership.memberAlive(m)) { + if (log.isDebugEnabled()) log.debug("Mcast add member " + m); + t = new Thread() { + public void run() { + service.memberAdded(m); + } + }; + } //end if + if ( t != null ) t.start(); } catch (SocketTimeoutException x ) { //do nothing, this is normal, we don't want to block forever //since the receive thread is the same thread @@ -298,10 +312,16 @@ synchronized (expiredMutex) { MemberImpl[] expired = membership.expire(timeToExpiration); for (int i = 0; i < expired.length; i++) { + final MemberImpl member = expired[i]; if (log.isDebugEnabled()) log.debug("Mcast exipre member " + expired[i]); try { - service.memberDisappeared(expired[i]); + Thread t = new Thread() { + public void run() { + service.memberDisappeared(member); + } + }; + t.start(); } catch (Exception x) { log.error("Unable to process member disappeared message.", x); } Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java?rev=408775&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestMemberArrival.java Mon May 22 15:39:06 2006 @@ -0,0 +1,100 @@ +package org.apache.catalina.tribes.test.membership; + +import java.util.ArrayList; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.GroupChannel; +import junit.framework.TestCase; + +public class TestMemberArrival + extends TestCase { + private static int count = 10; + private ManagedChannel[] channels = new ManagedChannel[count]; + private TestMbrListener[] listeners = new TestMbrListener[count]; + + protected void setUp() throws Exception { + super.setUp(); + for (int i = 0; i < channels.length; i++) { + channels[i] = new GroupChannel(); + channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII")); + listeners[i] = new TestMbrListener( ("Listener-" + (i + 1))); + channels[i].addMembershipListener(listeners[i]); + + } + } + + public void clear() { + for (int i = 0; i < channels.length; i++) { + listeners[i].members.clear(); + } + } + + public void testMemberArrival() throws Exception { + //purpose of this test is to make sure that we have received all the members + //that we can expect before the start method returns + Thread[] threads = new Thread[channels.length]; + for (int i=0; i<channels.length; i++ ) { + final Channel channel = channels[i]; + Thread t = new Thread() { + public void run() { + try { + channel.start(Channel.DEFAULT); + }catch ( Exception x ) { + throw new RuntimeException(x); + } + } + }; + threads[i] = t; + } + for (int i=0; i<threads.length; i++ ) threads[i].start(); + for (int i=0; i<threads.length; i++ ) threads[i].join(); + System.out.println("All channels started."); + for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member arrival length",channels.length-1,listeners[i].members.size()); + } + + protected void tearDown() throws Exception { + + for (int i = 0; i < channels.length; i++) { + try { + channels[i].stop(Channel.DEFAULT); + } catch (Exception ignore) {} + } + super.tearDown(); + } + + public class TestMbrListener + implements MembershipListener { + public String name = null; + public TestMbrListener(String name) { + this.name = name; + } + + public ArrayList members = new ArrayList(); + public void memberAdded(Member member) { + if (!members.contains(member)) { + members.add(member); + try { + System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member added[unknown]"); + } + } + } + + public void memberDisappeared(Member member) { + if (members.contains(member)) { + members.remove(member); + try { + System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member disappeared[unknown]"); + } + } + } + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]