Author: fhanik Date: Tue Jun 13 21:05:24 2006 New Revision: 414056 URL: http://svn.apache.org/viewvc?rev=414056&view=rev Log: Created a coordination demo, text only, to show how election works
Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=414056&r1=414055&r2=414056&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java Tue Jun 13 21:05:24 2006 @@ -171,6 +171,7 @@ interface InterceptorEvent { int getEventType(); + String getEventTypeDesc(); ChannelInterceptor getInterceptor(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414056&r1=414055&r2=414056&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Tue Jun 13 21:05:24 2006 @@ -14,10 +14,12 @@ */ package org.apache.catalina.tribes.group.interceptors; -import java.util.LinkedHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelInterceptor; +import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.UniqueId; @@ -30,12 +32,6 @@ import org.apache.catalina.tribes.membership.Membership; import org.apache.catalina.tribes.util.Arrays; import org.apache.catalina.tribes.util.UUIDGenerator; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.catalina.tribes.membership.*; -import org.apache.catalina.tribes.test.interceptors.TestNonBlockingCoordinator; -import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; -import org.apache.catalina.tribes.ChannelInterceptor; /** * <p>Title: Auto merging leader election algorithm</p> @@ -217,9 +213,11 @@ //no message arrived, send the coord msg fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out.")); startElection(true); + } else { + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned")); } }//end if - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election in progress")); + } } @@ -372,7 +370,7 @@ } viewChange(viewId,view.getMembers()); - fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View id("+this.viewId+")")); + fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View")); if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) { startElection(false); @@ -401,6 +399,14 @@ return (view != null && view.hasMembers()) ? view.getMembers()[0] : null; } + public Member[] getView() { + return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0]; + } + + public UniqueId getViewId() { + return viewId; + } + /** * Block in/out messages while a election is going on */ @@ -723,6 +729,7 @@ static final int EVT_SEND_MSG = 10; static final int EVT_STOP = 11; static final int EVT_CONF_RX = 12; + static final int EVT_ELECT_ABANDONED = 13; int type; ChannelInterceptor interceptor; @@ -743,6 +750,25 @@ public int getEventType() { return type; + } + + public String getEventTypeDesc() { + switch (type) { + case EVT_START: return "EVT_START:"+info; + case EVT_MBR_ADD: return "EVT_MBR_ADD:"+info; + case EVT_MBR_DEL: return "EVT_MBR_DEL:"+info; + case EVT_START_ELECT: return "EVT_START_ELECT:"+info; + case EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info; + case EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info; + case EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info; + case EVT_POST_MERGE: return "EVT_POST_MERGE:"+info; + case EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info; + case EVT_SEND_MSG: return "EVT_SEND_MSG:"+info; + case EVT_STOP: return "EVT_STOP:"+info; + case EVT_CONF_RX: return "EVT_CONF_RX:"+info; + case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info; + default: return "Unknown"; + } } public ChannelInterceptor getInterceptor() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=414056&r1=414055&r2=414056&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Tue Jun 13 21:05:24 2006 @@ -46,7 +46,7 @@ } public static String toString(byte[] data) { - return toString(data,0,data.length); + return toString(data,0,data!=null?data.length:0); } public static String toString(byte[] data, int offset, int length) { Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414056&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Tue Jun 13 21:05:24 2006 @@ -0,0 +1,245 @@ +package org.apache.catalina.tribes.demos; + +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; +import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent; +import org.apache.catalina.tribes.ChannelInterceptor; +import org.apache.catalina.tribes.util.Arrays; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; +import java.util.StringTokenizer; + + + +public class CoordinationDemo { + static int CHANNEL_COUNT = 5; + static int SCREEN_WIDTH = 120; + StringBuffer statusLine = new StringBuffer(); + Status[] status = new Status[CHANNEL_COUNT]; + BufferedReader reader = null; + /** + * Construct and show the application. + */ + public CoordinationDemo() { + reader = new BufferedReader(new InputStreamReader(System.in)); + } + + + public void clearScreen() { + StringBuffer buf = new StringBuffer(700); + for (int i=0; i<30; i++ ) buf.append("\n"); + System.out.println(buf); + } + + public void printMenuOptions() { + System.out.println("Commands:"); + System.out.println("start [member id]"); + System.out.println("stop [member id]"); + System.out.println("quit"); + System.out.print("Enter command:"); + } + + public synchronized void printScreen() { + clearScreen(); + System.out.println("XXX. "+getHeader()); + for ( int i=0; i<status.length; i++ ) { + System.out.print(fill(String.valueOf(i+1)+".",5," ")); + if ( status[i] != null ) System.out.print(status[i].getStatusLine()); + } + System.out.println("\n\n"); + System.out.println("Overall status:"+statusLine); + printMenuOptions(); + + } + + public String getHeader() { + //member - 30 + //running- 8 + //coord - 30 + //view-id - 24 + //view count - 8 + + StringBuffer buf = new StringBuffer(); + buf.append(fill("Member",30," ")); + buf.append(fill("Running",8," ")); + buf.append(fill("Coord",30," ")); + buf.append(fill("View-id(short)",24," ")); + buf.append(fill("Count",8," ")); + buf.append("\n"); + buf.append(fill("",SCREEN_WIDTH,"=")); + buf.append("\n"); + return buf.toString(); + } + + public String[] tokenize(String line) { + StringTokenizer tz = new StringTokenizer(line," "); + String[] result = new String[tz.countTokens()]; + for (int i=0; i<result.length; i++ ) result[i] = tz.nextToken(); + return result; + } + + public void waitForInput() throws IOException { + for ( int i=0; i<status.length; i++ ) status[i] = new Status(this); + printScreen(); + String l = reader.readLine(); + String[] args = tokenize(l); + while ( args.length >= 1 && (!"quit".equalsIgnoreCase(args[0]))) { + if ("start".equalsIgnoreCase(args[0])) { + if ( args.length == 1 ) { + setSystemStatus("System starting up..."); + for (int i = 0; i < status.length; i++) status[i].start(); + setSystemStatus("System started."); + } else { + int index = -1; + try { index = Integer.parseInt(args[1])-1;}catch ( Exception x ) {setSystemStatus("Invalid index:"+args[1]);} + if ( index >= 0 ) { + setSystemStatus("Starting member:"+(index+1)); + status[index].start(); + setSystemStatus("Member started:"+(index+1)); + } + } + } else if ("stop".equalsIgnoreCase(args[0])) { + if ( args.length == 1 ) { + setSystemStatus("System shutting down..."); + for (int i = 0; i < status.length; i++) status[i].stop(); + setSystemStatus("System stopped."); + } else { + int index = -1; + try { index = Integer.parseInt(args[1])-1;}catch ( Exception x ) {setSystemStatus("Invalid index:"+args[1]);} + if ( index >= 0 ) { + setSystemStatus("Stopping member:"+(index+1)); + status[index].stop(); + setSystemStatus("Member stopped:"+(index+1)); + } + } + + } + printScreen(); + l = reader.readLine(); + args = tokenize(l); + } + for ( int i=0; i<status.length; i++ ) status[i].stop(); + } + + public void setSystemStatus(String status) { + statusLine.delete(0,statusLine.length()); + statusLine.append(status); + } + + + + + + + public static void main(String[] args) throws Exception { + CoordinationDemo demo = new CoordinationDemo(); + demo.waitForInput(); + } + + public static String fill(String value, int length, String ch) { + StringBuffer buf = new StringBuffer(); + for (int i=value.trim().length(); i<length; i++ ) buf.append(ch); + buf.append(value.trim()); + return buf.toString(); + } + + + public static class Status { + public CoordinationDemo parent; + public GroupChannel channel; + NonBlockingCoordinator interceptor = null; + public String status; + public Exception error; + public boolean started = false; + + public Status(CoordinationDemo parent) { + this.parent = parent; + } + + public String getStatusLine() { + //member - 30 + //running- 8 + //coord - 30 + //view-id - 24 + //view count - 8 + StringBuffer buf = new StringBuffer(); + String local = ""; + String coord = ""; + String viewId = ""; + String count = "0"; + if ( channel != null ) { + Member lm = channel.getLocalMember(false); + local = lm!=null?lm.getName():""; + coord = interceptor.getCoordinator()!=null?interceptor.getCoordinator().getName():""; + viewId = getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new byte[0]); + count = String.valueOf(interceptor.getView().length); + } + buf.append(fill(local,30," ")); + buf.append(fill(String.valueOf(started), 8, " ")); + buf.append(fill(coord, 30, " ")); + buf.append(fill(viewId, 24, " ")); + buf.append(fill(count, 8, " ")); + buf.append("\n"); + buf.append("Status:"+status); + buf.append("\n"); + return buf.toString(); + } + + public String getByteString(byte[] b) { + if ( b == null ) return "{}"; + return Arrays.toString(b,0,Math.min(b.length,4)); + } + + public void start() { + try { + if ( channel == null ) { + channel = createChannel(); + channel.start(channel.DEFAULT); + started = true; + } else { + status = "Channel already started."; + } + } catch ( Exception x ) { + status = "Start failed:"+x.getMessage(); + error = x; + started = false; + } + } + + public void stop() { + try { + if ( channel != null ) { + channel.stop(channel.DEFAULT); + status = "Channel Stopped"; + } + }catch ( Exception x ) { + status = "Stop failed:"+x.getMessage(); + error = x; + }finally { + started = false; + channel = null; + interceptor = null; + } + } + + public GroupChannel createChannel() { + channel = new GroupChannel(); + interceptor = new NonBlockingCoordinator() { + public void fireInterceptorEvent(InterceptorEvent event) { + status = event.getEventTypeDesc(); + parent.printScreen(); + try { Thread.sleep(100); }catch ( Exception x){} + + } + }; + channel.addInterceptor(interceptor); + channel.addInterceptor(new TcpFailureDetector()); + channel.addInterceptor(new MessageDispatch15Interceptor()); + return channel; + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]