Author: fhanik Date: Fri May 19 18:47:50 2006 New Revision: 407936 URL: http://svn.apache.org/viewvc?rev=407936&view=rev Log: Started working on a non blocking coordinator, will work according to Hans Svensson's algorithm
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java?rev=407936&r1=407935&r2=407936&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java Fri May 19 18:47:50 2006 @@ -36,6 +36,7 @@ public static void absoluteOrder(Member[] members) { + if ( members == null || members.length == 0 ) return; Arrays.sort(members,comp); } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=407936&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Fri May 19 18:47:50 2006 @@ -0,0 +1,184 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ +package org.apache.catalina.tribes.group.interceptors; + +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.group.InterceptorPayload; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.membership.Membership; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.UniqueId; +import org.apache.catalina.tribes.util.UUIDGenerator; +import org.apache.catalina.tribes.group.AbsoluteOrder; +import org.apache.catalina.tribes.util.Arrays; + +/** + * <p>Title: NonBlockingCoordinator</p> + * + * <p>Description: Implementation of a simple coordinator algorithm.</p> + * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on + * </p> + * <p>Implementation based on ideas fetched from <a href="http://www.cs.chalmers.se/~hanssv/">Hans Svensson</a></p> + * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p> + * + * @author Filip Hanik + * @version 1.0 + * @todo + * when sending a HALT message, btw, only the highest in the membership group will do that + * allow for some time to pass, incase there is a higher member around + * preferrably, place a mcast interceptor below, so that we can mcast this sucker + */ +public class NonBlockingCoordinator extends ChannelInterceptorBase { + + protected Membership membership = null; + + protected static byte[] NBC_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63}; + protected static byte[] NBC_REQUEST = new byte[] {-55, -37, 18, -52, -105, 107, 72, 40, -122, 29, 70, -19, -74, 123, 61, 110}; + protected static byte[] NBC_REPLY = new byte[] {6, -15, 14, 23, -96, 106, 78, 124, -94, -122, -85, 31, 88, 21, 126, 20}; + + protected static byte[] NBC_HALT = new byte[] {12, -28, 85, -97, -102, -35, 74, 9, -65, -78, -83, -84, -29, -70, -23, -15}; + protected static byte[] NBC_ACK = new byte[] {12, -49, 117, -70, 77, 52, 65, -91, -93, -110, 37, 34, -28, -127, 26, 18}; + protected static byte[] NBC_NORM = new byte[] {34, -110, 83, 118, -109, -55, 67, -27, -97, -94, -84, -72, -82, -114, 65, 81}; + protected static byte[] NBC_NOTNORM = new byte[] {125, -70, -102, -125, -78, -39, 73, -80, -89, 84, 120, 83, 25, 42, 88, -76}; + protected static byte[] NBC_LDR = new byte[] {97, 31, -23, 30, -42, -72, 72, 116, -97, 7, 112, 25, 82, -96, -87, -48}; + protected static byte[] NBC_HASLDR = new byte[] {93, -80, -88, -58, -127, 21, 76, -90, -89, 77, 58, 25, -55, 65, -1, -83}; + protected static byte[] NBC_ISLDR = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30}; + + protected Member coordinator = null; + + public NonBlockingCoordinator() { + super(); + } + + public Member getCoordinator() { + return coordinator; + } + + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + super.sendMessage(destination, msg, payload); + } + + public void messageReceived(ChannelMessage msg) { + if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,NBC_HEADER,0,NBC_HEADER.length) ) { + receiveNBC(msg.getMessage()); + } else { + super.messageReceived(msg); + } + } + + public boolean accept(ChannelMessage msg) { + return super.accept(msg); + } + + public void memberAdded(Member member) { + if ( membership == null ) setupMembership(); + membership.addMember((MemberImpl)member); + super.memberAdded(member); + } + + public void memberDisappeared(Member member) { + if ( membership == null ) setupMembership(); + membership.removeMember((MemberImpl)member); + super.memberDisappeared(member); + } + + public void heartbeat() { + super.heartbeat(); + } + + /** + * has members + */ + public boolean hasMembers() { + if ( membership == null ) setupMembership(); + return membership.hasMembers(); + } + + /** + * Get all current cluster members + * @return all members or empty array + */ + public Member[] getMembers() { + if ( membership == null ) setupMembership(); + Member[] members = membership.getMembers(); + AbsoluteOrder.absoluteOrder(members); + return members; + } + + /** + * + * @param mbr Member + * @return Member + */ + public Member getMember(Member mbr) { + if ( membership == null ) setupMembership(); + return membership.getMember(mbr); + } + + /** + * Return the member that represents this node. + * + * @return Member + */ + public Member getLocalMember(boolean incAlive) { + Member local = super.getLocalMember(incAlive); + if ( membership == null && (local != null)) setupMembership(); + return local; + } + + protected synchronized void setupMembership() { + if ( membership == null ) { + membership = new Membership((MemberImpl)super.getLocalMember(true)); + } + + } + + /** + * A message is:<br> + * HEADER, REQUEST|REPLY, ID, MSG, SOURCE_LEN, SOURCE + * @param type byte[] - either NBC_REQUEST or NBC_REPLY + * @param msg byte[] - NBC_HALT, NBC_ACK, NBC_NORM, NBC_NOTNORM, NBC_LDR + */ + protected UniqueId createNBCMessage(XByteBuffer buf, byte[] type, byte[] msg) { + UniqueId id = new UniqueId(UUIDGenerator.randomUUID(true)); + Member local = getLocalMember(false); + byte[] ldata = ((MemberImpl)local).getData(false,false); + buf.reset(); + buf.append(NBC_HEADER,0,NBC_HEADER.length); + buf.append(type,0,type.length); + buf.append(id.getBytes(),0,id.getBytes().length); + buf.append(msg,0,msg.length); + buf.append(ldata.length); + buf.append(ldata,0,ldata.length); + return id; + } + + protected void receiveNBC(XByteBuffer buf) { + + } + + public void start(int svc) throws ChannelException { + super.start(svc); + //coordination can happen before this line of code executes + Member local = getLocalMember(false); + if (local != null && coordinator == null) coordinator = local; + } + + + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]