Author: fhanik
Date: Tue Jun 13 21:05:24 2006
New Revision: 414056

URL: http://svn.apache.org/viewvc?rev=414056&view=rev
Log:
Created a coordination demo, text only, to show how election works

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=414056&r1=414055&r2=414056&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
 Tue Jun 13 21:05:24 2006
@@ -171,6 +171,7 @@
 
     interface InterceptorEvent {
         int getEventType();
+        String getEventTypeDesc();
         ChannelInterceptor getInterceptor();
     }
     

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414056&r1=414055&r2=414056&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
 Tue Jun 13 21:05:24 2006
@@ -14,10 +14,12 @@
  */
 package org.apache.catalina.tribes.group.interceptors;
 
-import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.UniqueId;
@@ -30,12 +32,6 @@
 import org.apache.catalina.tribes.membership.Membership;
 import org.apache.catalina.tribes.util.Arrays;
 import org.apache.catalina.tribes.util.UUIDGenerator;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.catalina.tribes.membership.*;
-import org.apache.catalina.tribes.test.interceptors.TestNonBlockingCoordinator;
-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
-import org.apache.catalina.tribes.ChannelInterceptor;
 
 /**
  * <p>Title: Auto merging leader election algorithm</p>
@@ -217,9 +213,11 @@
                     //no message arrived, send the coord msg
                     fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting 
timed out."));
                     startElection(true);
+                } else {
+                    fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election 
abandoned"));
                 }
             }//end if
-            fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election in 
progress"));
+            
         }
     }
 
@@ -372,7 +370,7 @@
         }
         
         viewChange(viewId,view.getMembers());
-        fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View 
id("+this.viewId+")"));
+        fireInterceptorEvent(new 
CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
         
         if ( suggestedviewId == null && 
hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
             startElection(false);
@@ -401,6 +399,14 @@
         return (view != null && view.hasMembers()) ? view.getMembers()[0] : 
null;
     }
     
+    public Member[] getView() {
+        return (view != null && view.hasMembers()) ? view.getMembers() : new 
Member[0];
+    }
+    
+    public UniqueId getViewId() {
+        return viewId;
+    }
+    
     /**
     * Block in/out messages while a election is going on
     */
@@ -723,6 +729,7 @@
         static final int EVT_SEND_MSG = 10;
         static final int EVT_STOP = 11;
         static final int EVT_CONF_RX = 12;
+        static final int EVT_ELECT_ABANDONED = 13;
         
         int type;
         ChannelInterceptor interceptor;
@@ -743,6 +750,25 @@
         
         public int getEventType() {
             return type;
+        }
+        
+        public String getEventTypeDesc() {
+            switch (type) {
+                case  EVT_START: return "EVT_START:"+info;
+                case  EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
+                case  EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
+                case  EVT_START_ELECT: return "EVT_START_ELECT:"+info;
+                case  EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
+                case  EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
+                case  EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
+                case  EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
+                case  EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
+                case  EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
+                case  EVT_STOP: return "EVT_STOP:"+info;
+                case  EVT_CONF_RX: return "EVT_CONF_RX:"+info;
+                case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
+                default: return "Unknown";
+            }
         }
         
         public ChannelInterceptor getInterceptor() {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=414056&r1=414055&r2=414056&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
 Tue Jun 13 21:05:24 2006
@@ -46,7 +46,7 @@
     }
     
     public static String toString(byte[] data) {
-        return toString(data,0,data.length);
+        return toString(data,0,data!=null?data.length:0);
     }
 
     public static String toString(byte[] data, int offset, int length) {

Added: 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414056&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
 Tue Jun 13 21:05:24 2006
@@ -0,0 +1,245 @@
+package org.apache.catalina.tribes.demos;
+
+import org.apache.catalina.tribes.group.GroupChannel;
+import 
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.util.Arrays;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+
+
+public class CoordinationDemo {
+    static int CHANNEL_COUNT = 5;
+    static int SCREEN_WIDTH = 120;
+    StringBuffer statusLine = new StringBuffer();
+    Status[] status = new Status[CHANNEL_COUNT];
+    BufferedReader reader = null;
+    /**
+     * Construct and show the application.
+     */
+    public CoordinationDemo() {
+        reader = new BufferedReader(new InputStreamReader(System.in));
+    }
+    
+    
+    public void clearScreen() {
+        StringBuffer buf = new StringBuffer(700);
+        for (int i=0; i<30; i++ ) buf.append("\n");
+        System.out.println(buf);
+    }
+
+    public void printMenuOptions() {
+        System.out.println("Commands:");
+        System.out.println("start [member id]");
+        System.out.println("stop  [member id]");
+        System.out.println("quit");
+        System.out.print("Enter command:");
+    }
+    
+    public synchronized void printScreen() {
+        clearScreen();
+        System.out.println("XXX. "+getHeader());
+        for ( int i=0; i<status.length; i++ ) {
+            System.out.print(fill(String.valueOf(i+1)+".",5," "));
+            if ( status[i] != null ) 
System.out.print(status[i].getStatusLine());
+        }
+        System.out.println("\n\n");
+        System.out.println("Overall status:"+statusLine);
+        printMenuOptions();
+        
+    }
+    
+    public String getHeader() {
+        //member - 30
+        //running- 8
+        //coord - 30
+        //view-id - 24
+        //view count - 8
+
+        StringBuffer buf = new StringBuffer();
+        buf.append(fill("Member",30," "));
+        buf.append(fill("Running",8," "));
+        buf.append(fill("Coord",30," "));
+        buf.append(fill("View-id(short)",24," "));
+        buf.append(fill("Count",8," "));
+        buf.append("\n");
+        buf.append(fill("",SCREEN_WIDTH,"="));
+        buf.append("\n");
+        return buf.toString();
+    }
+    
+    public String[] tokenize(String line) {
+        StringTokenizer tz = new StringTokenizer(line," ");
+        String[] result = new String[tz.countTokens()];
+        for (int i=0; i<result.length; i++ ) result[i] = tz.nextToken();
+        return result;
+    }
+    
+    public void waitForInput() throws IOException {
+        for ( int i=0; i<status.length; i++ ) status[i] = new Status(this);
+        printScreen();
+        String l = reader.readLine();
+        String[] args = tokenize(l);
+        while ( args.length >= 1 && (!"quit".equalsIgnoreCase(args[0]))) {
+            if ("start".equalsIgnoreCase(args[0])) {
+                if ( args.length == 1 ) {
+                    setSystemStatus("System starting up...");
+                    for (int i = 0; i < status.length; i++) status[i].start();
+                    setSystemStatus("System started.");
+                } else { 
+                    int index = -1;
+                    try { index = Integer.parseInt(args[1])-1;}catch ( 
Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
+                    if ( index >= 0 ) {
+                        setSystemStatus("Starting member:"+(index+1));
+                        status[index].start();
+                        setSystemStatus("Member started:"+(index+1));
+                    }
+                }
+            } else if ("stop".equalsIgnoreCase(args[0])) {
+                if ( args.length == 1 ) {
+                    setSystemStatus("System shutting down...");
+                    for (int i = 0; i < status.length; i++) status[i].stop();
+                    setSystemStatus("System stopped.");
+                } else { 
+                    int index = -1;
+                    try { index = Integer.parseInt(args[1])-1;}catch ( 
Exception x ) {setSystemStatus("Invalid index:"+args[1]);}
+                    if ( index >= 0 ) {
+                        setSystemStatus("Stopping member:"+(index+1));
+                        status[index].stop();
+                        setSystemStatus("Member stopped:"+(index+1));
+                    }
+                }
+
+            }
+            printScreen();
+            l = reader.readLine();
+            args = tokenize(l);
+        }
+        for ( int i=0; i<status.length; i++ ) status[i].stop();
+    }
+
+    public void setSystemStatus(String status) {
+        statusLine.delete(0,statusLine.length());
+        statusLine.append(status);
+    }
+    
+    
+    
+
+
+
+    public static void main(String[] args) throws Exception {
+        CoordinationDemo demo = new CoordinationDemo();
+        demo.waitForInput();
+    }
+    
+    public static String fill(String value, int length, String ch) {
+        StringBuffer buf = new StringBuffer();
+        for (int i=value.trim().length(); i<length; i++ ) buf.append(ch);
+        buf.append(value.trim());
+        return buf.toString();
+    }
+    
+    
+    public static class Status {
+        public CoordinationDemo parent;
+        public GroupChannel channel;
+        NonBlockingCoordinator interceptor = null;
+        public String status;
+        public Exception error;
+        public boolean started = false;
+        
+        public Status(CoordinationDemo parent) {
+            this.parent = parent;
+        }
+        
+        public String getStatusLine() {
+            //member - 30
+            //running- 8
+            //coord - 30
+            //view-id - 24
+            //view count - 8
+            StringBuffer buf = new StringBuffer();
+            String local = "";
+            String coord = "";
+            String viewId = "";
+            String count = "0";
+            if ( channel != null ) {
+                Member lm = channel.getLocalMember(false);
+                local = lm!=null?lm.getName():"";
+                coord = 
interceptor.getCoordinator()!=null?interceptor.getCoordinator().getName():"";
+                viewId = 
getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new
 byte[0]);
+                count = String.valueOf(interceptor.getView().length);
+            }
+            buf.append(fill(local,30," "));
+            buf.append(fill(String.valueOf(started), 8, " "));
+            buf.append(fill(coord, 30, " "));
+            buf.append(fill(viewId, 24, " "));
+            buf.append(fill(count, 8, " "));
+            buf.append("\n");
+            buf.append("Status:"+status);
+            buf.append("\n");
+            return buf.toString();
+        }
+        
+        public String getByteString(byte[] b) {
+            if ( b == null ) return "{}";
+            return Arrays.toString(b,0,Math.min(b.length,4));
+        }
+        
+        public void start() {
+            try {
+                if ( channel == null ) {
+                    channel = createChannel();
+                    channel.start(channel.DEFAULT);
+                    started = true;
+                } else {
+                    status = "Channel already started.";
+                }
+            } catch ( Exception x ) {
+                status = "Start failed:"+x.getMessage();
+                error = x;
+                started = false;
+            }
+        }
+        
+        public void stop() {
+            try {
+                if ( channel != null ) {
+                    channel.stop(channel.DEFAULT);
+                    status = "Channel Stopped";
+                }
+            }catch ( Exception x )  {
+                status = "Stop failed:"+x.getMessage();
+                error = x;
+            }finally {
+                started = false;
+                channel = null;
+                interceptor = null;
+            }
+        }
+        
+        public GroupChannel createChannel() {
+            channel = new GroupChannel();
+            interceptor = new NonBlockingCoordinator() {
+                public void fireInterceptorEvent(InterceptorEvent event) {
+                    status = event.getEventTypeDesc();
+                    parent.printScreen();
+                    try { Thread.sleep(100); }catch ( Exception x){}
+                    
+                }
+            };
+            channel.addInterceptor(interceptor);
+            channel.addInterceptor(new TcpFailureDetector());
+            channel.addInterceptor(new MessageDispatch15Interceptor());
+            return channel;
+        }
+    }
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to