Author: fhanik Date: Wed Jun 14 10:47:00 2006 New Revision: 414323 URL: http://svn.apache.org/viewvc?rev=414323&view=rev Log: Further enhancements, looking pretty good
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414323&r1=414322&r2=414323&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Wed Jun 14 10:47:00 2006 @@ -192,7 +192,14 @@ return; //the only member, no need for an election } if ( suggestedviewId != null ) { - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running")); + + if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 && Arrays.diff(suggestedView,view,local).length == 0) { + suggestedviewId = null; + suggestedView = null; + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view")); + } else { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running")); + } return; //election already running, I'm not allowed to have two of them } if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) { @@ -218,10 +225,11 @@ } if ( suggestedviewId == null && (!coordMsgReceived.get())) { //no message arrived, send the coord msg - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out.")); - startElection(true); +// fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out.")); +// startElection(true); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out.")); } else { - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned")); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message")); } }//end if @@ -307,7 +315,6 @@ } protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException { -// synchronized (electionMutex) { coordMsgReceived.set(true); msg.timestamp = System.currentTimeMillis(); Membership merged = mergeOnArrive(msg,sender); @@ -440,7 +447,6 @@ // OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE //============================================================================================================ public void start(int svc) throws ChannelException { -// synchronized (electionMutex) { if (membership == null) setupMembership(); if (started)return; fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start")); @@ -449,7 +455,6 @@ if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start")); startElection(false); -// } } public void stop(int svc) throws ChannelException { @@ -527,9 +532,8 @@ super.memberDisappeared(member); try { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")")); - if ( started && (isCoordinator() || member.equals(getCoordinator())) ) - startElection(false); - //to do, if a member disappears, only the coordinator can start + if ( started && (isCoordinator() || isHighest()) ) + startElection(true); //to do, if a member disappears, only the coordinator can start }catch ( ChannelException x ) { log.error("Unable to start election when member was removed.",x); } @@ -537,13 +541,32 @@ } } + public boolean isHighest() { + Member local = getLocalMember(false); + if ( membership.getMembers().length == 0 ) return true; + else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0; + } + public boolean isCoordinator() { Member coord = getCoordinator(); return coord != null && getLocalMember(false).equals(coord); } public void heartbeat() { - super.heartbeat(); + try { + MemberImpl local = (MemberImpl)getLocalMember(false); + if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) { + if ( isHighest() ) { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this, + "Heartbeat found inconsistency, restart election")); + startElection(true); + } + } + } catch ( Exception x ){ + log.error("Unable to perform heartbeat.",x); + } finally { + super.heartbeat(); + } } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=414323&r1=414322&r2=414323&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Wed Jun 14 10:47:00 2006 @@ -111,39 +111,45 @@ }//messageReceived - public synchronized void memberAdded(Member member) { + public void memberAdded(Member member) { if ( membership == null ) setupMembership(); - if ( removeSuspects.containsKey(member) ) { - //previously marked suspect, system below picked up the member again - removeSuspects.remove(member); - } else { - //if we add it here, then add it upwards too - if ( membership.getMember((MemberImpl)member) == null) { + boolean notify = false; + synchronized (membership) { + if (removeSuspects.containsKey(member)) { + //previously marked suspect, system below picked up the member again + removeSuspects.remove(member); + } else if (membership.getMember( (MemberImpl) member) == null){ + //if we add it here, then add it upwards too //check to see if it is alive if (memberAlive(member)) { - membership.memberAlive((MemberImpl)member); - super.memberAdded(member); + membership.memberAlive( (MemberImpl) member); + notify = true; } else { addSuspects.put(member, new Long(System.currentTimeMillis())); } - } + } } + if ( notify ) super.memberAdded(member); } - public synchronized void memberDisappeared(Member member) { + public void memberDisappeared(Member member) { if ( membership == null ) setupMembership(); + boolean notify = false; boolean shutdown = Arrays.equals(member.getPayload(),Member.SHUTDOWN_PAYLOAD); if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify."); - //check to see if the member really is gone - //if the payload is not a shutdown message - if ( shutdown || !memberAlive(member) ) { - //not correct, we need to maintain the map - membership.removeMember((MemberImpl)member); - super.memberDisappeared(member); - } else { - //add the member as suspect - removeSuspects.put(member, new Long(System.currentTimeMillis())); + synchronized (membership) { + //check to see if the member really is gone + //if the payload is not a shutdown message + if (shutdown || !memberAlive(member)) { + //not correct, we need to maintain the map + membership.removeMember( (MemberImpl) member); + notify = true; + } else { + //add the member as suspect + removeSuspects.put(member, new Long(System.currentTimeMillis())); + } } + if ( notify ) super.memberDisappeared(member); } public boolean hasMembers() { @@ -165,47 +171,48 @@ return super.getLocalMember(incAlive); } - public synchronized void heartbeat() { + public void heartbeat() { try { if (membership == null) setupMembership(); - //update all alive times - Member[] members = super.getMembers(); - for (int i = 0; members != null && i < members.length; i++) { - if (membership.memberAlive( (MemberImpl) members[i])) { - //we don't have this one in our membership, check to see if he/she is alive - if (memberAlive(members[i])) { - log.warn("Member added, even though we werent notified:" + members[i]); - super.memberAdded(members[i]); - } else { - membership.removeMember( (MemberImpl) members[i]); + synchronized (membership) { + //update all alive times + Member[] members = super.getMembers(); + for (int i = 0; members != null && i < members.length; i++) { + if (membership.memberAlive( (MemberImpl) members[i])) { + //we don't have this one in our membership, check to see if he/she is alive + if (memberAlive(members[i])) { + log.warn("Member added, even though we werent notified:" + members[i]); + super.memberAdded(members[i]); + } else { + membership.removeMember( (MemberImpl) members[i]); + } //end if } //end if - } //end if - } //for + } //for - //check suspect members if they are still alive, - //if not, simply issue the memberDisappeared message - MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]); - for (int i = 0; i < keys.length; i++) { - MemberImpl m = (MemberImpl) keys[i]; - if (membership.getMember(m) != null && (!memberAlive(m))) { - membership.removeMember(m); - super.memberDisappeared(m); - removeSuspects.remove(m); - } //end if - } + //check suspect members if they are still alive, + //if not, simply issue the memberDisappeared message + MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]); + for (int i = 0; i < keys.length; i++) { + MemberImpl m = (MemberImpl) keys[i]; + if (membership.getMember(m) != null && (!memberAlive(m))) { + membership.removeMember(m); + super.memberDisappeared(m); + removeSuspects.remove(m); + } //end if + } - //check add suspects members if they are alive now, - //if they are, simply issue the memberAdded message - keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]); - for (int i = 0; i < keys.length; i++) { - MemberImpl m = (MemberImpl) keys[i]; - if ( membership.getMember(m) == null && (memberAlive(m))) { - membership.memberAlive(m); - super.memberAdded(m); - addSuspects.remove(m); - } //end if + //check add suspects members if they are alive now, + //if they are, simply issue the memberAdded message + keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]); + for (int i = 0; i < keys.length; i++) { + MemberImpl m = (MemberImpl) keys[i]; + if ( membership.getMember(m) == null && (memberAlive(m))) { + membership.memberAlive(m); + super.memberAdded(m); + addSuspects.remove(m); + } //end if + } } - }catch ( Exception x ) { log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x); } finally { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=414323&r1=414322&r2=414323&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java Wed Jun 14 10:47:00 2006 @@ -177,10 +177,11 @@ }catch ( IOException x) { retries--; if ( retries <= 0 ) { - log.info("Unable to bind server socket to "+addr+" throwing error."); + log.info("Unable to bind server socket to:"+addr+" throwing error."); throw x; } portstart++; + try {Thread.sleep(25);}catch( InterruptedException ti){Thread.currentThread().interrupted();} retries = bind(socket,portstart,retries); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java?rev=414323&r1=414322&r2=414323&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java Wed Jun 14 10:47:00 2006 @@ -55,9 +55,10 @@ public void start() throws IOException { try { setPool(new ThreadPool(new Object(),getMaxThreads(),getMinThreads(),this)); - } catch (Exception e) { - log.error("ThreadPool can initilzed. Listener not started", e); - return; + } catch (Exception x) { + log.fatal("ThreadPool can initilzed. Listener not started", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); } try { getBind(); @@ -67,6 +68,8 @@ t.start(); } catch (Exception x) { log.fatal("Unable to start cluster receiver", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=414323&r1=414322&r2=414323&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Wed Jun 14 10:47:00 2006 @@ -83,12 +83,13 @@ * @throws Exception * @see org.apache.catalina.tribes.ClusterReceiver#start() */ - public void start() { + public void start() throws IOException { try { setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); - } catch (Exception e) { - log.error("ThreadPool can initilzed. Listener not started", e); - return; + } catch (Exception x) { + log.fatal("ThreadPool can initilzed. Listener not started", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); } try { getBind(); @@ -98,6 +99,8 @@ t.start(); } catch (Exception x) { log.fatal("Unable to start cluster receiver", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414323&r1=414322&r2=414323&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Wed Jun 14 10:47:00 2006 @@ -21,7 +21,9 @@ static int CHANNEL_COUNT = 5; static int SCREEN_WIDTH = 120; static long SLEEP_TIME = 10; + static int CLEAR_SCREEN = 30; static boolean MULTI_THREAD = false; + static boolean[] VIEW_EVENTS = new boolean[255]; StringBuffer statusLine = new StringBuffer(); Status[] status = null; BufferedReader reader = null; @@ -36,7 +38,7 @@ public void clearScreen() { StringBuffer buf = new StringBuffer(700); - for (int i=0; i<30; i++ ) buf.append("\n"); + for (int i=0; i<CLEAR_SCREEN; i++ ) buf.append("\n"); System.out.println(buf); } @@ -165,20 +167,39 @@ - - - + public static void setEvents(String events) { + java.util.Arrays.fill(VIEW_EVENTS,false); + StringTokenizer t = new StringTokenizer(events,","); + while (t.hasMoreTokens() ) { + int idx = Integer.parseInt(t.nextToken()); + VIEW_EVENTS[idx] = true; + } + } + public static void main(String[] args) throws Exception { System.out.println("Usage:"); - System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo [channel-count multi-thread]"); + System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p view_events_csv(1,2,5,7)"); System.out.println("Example:"); System.out.println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo single threaded start/stop with 5 channels"); - System.out.println("\tjava o.a.c.t.d.CoordinationDemo 10 -> starts demo single threaded start/stop with 10 channels"); - System.out.println("\tjava o.a.c.t.d.CoordinationDemo 7 true-> starts demo multi threaded start/stop with 7 channels"); + System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels"); + System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen"); + System.out.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event"); System.out.println(); - - if ( args.length >= 1 ) CHANNEL_COUNT = Integer.parseInt(args[0]); - if ( args.length >= 2 ) MULTI_THREAD = true; + java.util.Arrays.fill(VIEW_EVENTS,true); + + for (int i=0; i<args.length; i++ ) { + if ( "-c".equals(args[i]) ) + CHANNEL_COUNT = Integer.parseInt(args[++i]); + else if ( "-t".equals(args[i]) ) + MULTI_THREAD = Boolean.parseBoolean(args[++i]); + else if ( "-s".equals(args[i]) ) + SLEEP_TIME = Long.parseLong(args[++i]); + else if ( "-sc".equals(args[i]) ) + CLEAR_SCREEN = Integer.parseInt(args[++i]); + else if ( "-p".equals(args[i]) ) + setEvents(args[++i]); + else if ( "-h".equals(args[i]) ) System.exit(0); + } CoordinationDemo demo = new CoordinationDemo(); demo.waitForInput(); } @@ -265,6 +286,9 @@ status = "Start failed:"+x.getMessage(); error = x; startstatus = "failed"; + try { channel.stop(GroupChannel.DEFAULT);}catch(Exception ignore){} + channel = null; + interceptor = null; } } @@ -298,11 +322,10 @@ interceptor = new NonBlockingCoordinator() { public void fireInterceptorEvent(InterceptorEvent event) { status = event.getEventTypeDesc(); -// if ( event instanceof NonBlockingCoordinator.CoordinationEvent && -// ((NonBlockingCoordinator.CoordinationEvent)event).getEventType() == NonBlockingCoordinator.CoordinationEvent.EVT_CONF_RX) - parent.printScreen(); + int type = event.getEventType(); + boolean display = VIEW_EVENTS[type]; + if ( display ) parent.printScreen(); try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){} - } }; channel.addInterceptor(interceptor); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]