Author: fhanik
Date: Fri May 19 18:47:50 2006
New Revision: 407936

URL: http://svn.apache.org/viewvc?rev=407936&view=rev
Log:
Started working on a non blocking coordinator, will work according to Hans 
Svensson's algorithm

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java?rev=407936&r1=407935&r2=407936&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java
 Fri May 19 18:47:50 2006
@@ -36,6 +36,7 @@
     
     
     public static void absoluteOrder(Member[] members) {
+        if ( members == null || members.length == 0 ) return;
         Arrays.sort(members,comp);
     }
     

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=407936&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
 Fri May 19 18:47:50 2006
@@ -0,0 +1,184 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.group.AbsoluteOrder;
+import org.apache.catalina.tribes.util.Arrays;
+
+/**
+ * <p>Title: NonBlockingCoordinator</p>
+ *
+ * <p>Description: Implementation of a simple coordinator algorithm.</p>
+ * <p>This algorithm is non blocking meaning it allows for transactions while 
the coordination phase is going on
+ * </p>
+ * <p>Implementation based on ideas fetched from <a 
href="http://www.cs.chalmers.se/~hanssv/";>Hans Svensson</a></p>
+ * <p>Ideally, the interceptor below this one would be the TcpFailureDetector 
to ensure correct memberships</p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ * @todo 
+ *    when sending a HALT message, btw, only the highest in the membership 
group will do that
+ *    allow for some time to pass, incase there is a higher member around
+ *    preferrably, place a mcast interceptor below, so that we can mcast this 
sucker
+ */
+public class NonBlockingCoordinator extends ChannelInterceptorBase {
+    
+    protected Membership membership = null;
+    
+    protected static byte[] NBC_HEADER = new byte[] {-86, 38, -34, -29, -98, 
90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63};
+    protected static byte[] NBC_REQUEST = new byte[] {-55, -37, 18, -52, -105, 
107, 72, 40, -122, 29, 70, -19, -74, 123, 61, 110};
+    protected static byte[] NBC_REPLY = new byte[] {6, -15, 14, 23, -96, 106, 
78, 124, -94, -122, -85, 31, 88, 21, 126, 20};
+    
+    protected static byte[] NBC_HALT = new byte[] {12, -28, 85, -97, -102, 
-35, 74, 9, -65, -78, -83, -84, -29, -70, -23, -15};
+    protected static byte[] NBC_ACK = new byte[] {12, -49, 117, -70, 77, 52, 
65, -91, -93, -110, 37, 34, -28, -127, 26, 18};
+    protected static byte[] NBC_NORM = new byte[] {34, -110, 83, 118, -109, 
-55, 67, -27, -97, -94, -84, -72, -82, -114, 65, 81};
+    protected static byte[] NBC_NOTNORM = new byte[] {125, -70, -102, -125, 
-78, -39, 73, -80, -89, 84, 120, 83, 25, 42, 88, -76};
+    protected static byte[] NBC_LDR = new byte[] {97, 31, -23, 30, -42, -72, 
72, 116, -97, 7, 112, 25, 82, -96, -87, -48};
+    protected static byte[] NBC_HASLDR = new byte[] {93, -80, -88, -58, -127, 
21, 76, -90, -89, 77, 58, 25, -55, 65, -1, -83};
+    protected static byte[] NBC_ISLDR = new byte[] {104, -95, -92, -42, 114, 
-36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30};
+    
+    protected Member coordinator = null;
+    
+    public NonBlockingCoordinator() {
+        super();
+    }
+    
+    public Member getCoordinator() {
+        return coordinator;
+    }
+    
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
+        super.sendMessage(destination, msg, payload);
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        if ( 
Arrays.contains(msg.getMessage().getBytesDirect(),0,NBC_HEADER,0,NBC_HEADER.length)
 ) {
+            receiveNBC(msg.getMessage());
+        } else {
+            super.messageReceived(msg);
+        }
+    }
+
+    public boolean accept(ChannelMessage msg) {
+        return super.accept(msg);
+    }
+
+    public void memberAdded(Member member) {
+        if ( membership == null ) setupMembership();
+        membership.addMember((MemberImpl)member);
+        super.memberAdded(member);
+    }
+
+    public void memberDisappeared(Member member) {
+        if ( membership == null ) setupMembership();
+        membership.removeMember((MemberImpl)member);
+        super.memberDisappeared(member);
+    }
+
+    public void heartbeat() {
+        super.heartbeat();
+    }
+
+    /**
+     * has members
+     */
+    public boolean hasMembers() {
+        if ( membership == null ) setupMembership();
+        return membership.hasMembers();
+    }
+
+    /**
+     * Get all current cluster members
+     * @return all members or empty array
+     */
+    public Member[] getMembers() {
+        if ( membership == null ) setupMembership();
+        Member[] members = membership.getMembers(); 
+        AbsoluteOrder.absoluteOrder(members);
+        return members;
+    }
+
+    /**
+     *
+     * @param mbr Member
+     * @return Member
+     */
+    public Member getMember(Member mbr) {
+        if ( membership == null ) setupMembership();
+        return membership.getMember(mbr);
+    }
+
+    /**
+     * Return the member that represents this node.
+     *
+     * @return Member
+     */
+    public Member getLocalMember(boolean incAlive) {
+        Member local = super.getLocalMember(incAlive);
+        if ( membership == null && (local != null)) setupMembership();
+        return local;
+    }
+    
+    protected synchronized void setupMembership() {
+        if ( membership == null ) {
+            membership = new 
Membership((MemberImpl)super.getLocalMember(true));
+        }
+
+    }
+    
+    /**
+     * A message is:<br>
+     * HEADER, REQUEST|REPLY, ID, MSG, SOURCE_LEN, SOURCE
+     * @param type byte[] - either NBC_REQUEST or NBC_REPLY
+     * @param msg byte[] - NBC_HALT, NBC_ACK, NBC_NORM, NBC_NOTNORM, NBC_LDR
+     */
+    protected UniqueId createNBCMessage(XByteBuffer buf, byte[] type, byte[] 
msg) {
+        UniqueId id = new UniqueId(UUIDGenerator.randomUUID(true));
+        Member local = getLocalMember(false);
+        byte[] ldata = ((MemberImpl)local).getData(false,false);
+        buf.reset();
+        buf.append(NBC_HEADER,0,NBC_HEADER.length);
+        buf.append(type,0,type.length);
+        buf.append(id.getBytes(),0,id.getBytes().length);
+        buf.append(msg,0,msg.length);
+        buf.append(ldata.length);
+        buf.append(ldata,0,ldata.length);
+        return id;
+    }
+    
+    protected void receiveNBC(XByteBuffer buf) {
+        
+    }
+    
+    public void start(int svc) throws ChannelException {
+        super.start(svc);
+        //coordination can happen before this line of code executes
+        Member local = getLocalMember(false);
+        if (local != null && coordinator == null) coordinator = local;
+    }
+
+
+
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to