Author: fhanik Date: Tue May 23 15:11:05 2006 New Revision: 409013 URL: http://svn.apache.org/viewvc?rev=409013&view=rev Log: piece by piece
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=409013&r1=409012&r2=409013&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue May 23 15:11:05 2006 @@ -14,28 +14,29 @@ */ package org.apache.catalina.tribes.group.interceptors; -import org.apache.catalina.tribes.group.ChannelInterceptorBase; -import org.apache.catalina.tribes.group.InterceptorPayload; -import org.apache.catalina.tribes.ChannelMessage; +import java.util.LinkedHashMap; + +import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.membership.Membership; -import org.apache.catalina.tribes.membership.MemberImpl; -import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.UniqueId; -import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.catalina.tribes.group.AbsoluteOrder; -import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.io.ChannelData; -import org.apache.catalina.tribes.Channel; -import java.util.HashMap; -import java.util.LinkedHashMap; -import org.apache.catalina.tribes.membership.*; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.membership.Membership; +import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.util.UUIDGenerator; /** - * <p>Title: NonBlockingCoordinator</p> + * <p>Title: Auto merging leader election algorithm</p> * - * <p>Description: Implementation of a simple coordinator algorithm.</p> + * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, + * it also merges groups automatically when members are discovered that werent part of the + * </p> * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on * </p> * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership @@ -246,7 +247,6 @@ } protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) { - MemberImpl local = (MemberImpl)getLocalMember(false); Membership merged = new Membership(local,AbsoluteOrder.comp); Arrays.fill(merged,msg.getMembers()); @@ -254,21 +254,26 @@ Member[] diff = Arrays.diff(merged,membership,local); for ( int i=0; i<diff.length; i++ ) { if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]); + else memberAdded(diff[i],false); } return merged; } - protected void processCoordMessage(CoordinationMessage msg, Member sender) { + protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException { synchronized (electionMutex) { msg.timestamp = System.currentTimeMillis(); rotatingViews.put(msg.getId(),msg); Membership merged = mergeOnArrive(msg,sender); if ( isViewConf(msg) ) handleViewConf(msg, sender,merged); + else handleToken(msg,sender,merged); } } - protected void handleViewConf(CoordinationMessage msg, Member sender, - Membership merged) { + protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { + + } + + protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException { this.view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp); this.viewId = msg.getId(); if ( viewId.equals(this.suggestedviewId) ) { @@ -276,12 +281,24 @@ this.suggestedviewId = null; } this.viewChange(viewId,view.getMembers()); + if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { + startElection(false); + } } protected boolean isViewConf(CoordinationMessage msg) { return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length); } + + protected boolean hasHigherPriority(Member[] complete, Member[] local) { + if ( local == null || local.length == 0 ) return false; + if ( complete == null || complete.length == 0 ) return true; + AbsoluteOrder.absoluteOrder(complete); + AbsoluteOrder.absoluteOrder(local); + return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0); + + } /** @@ -346,7 +363,11 @@ if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) { //ignore message, its an alive message } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) { - processCoordMessage(new CoordinationMessage(msg.getMessage()),msg.getAddress()); + try { + processCoordMessage(new CoordinationMessage(msg.getMessage()), msg.getAddress()); + }catch ( ChannelException x ) { + log.error("Error processing coordination message. Could be fatal.",x); + } } else { super.messageReceived(msg); } @@ -357,11 +378,15 @@ } public void memberAdded(Member member) { + memberAdded(member,true); + } + + public void memberAdded(Member member,boolean elect) { try { if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member); try { - if (started) startElection(false); + if (started && elect) startElection(false); }catch ( ChannelException x ) { log.error("Unable to start election when member was added.",x); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]