Author: fhanik Date: Thu Mar 2 09:38:40 2006 New Revision: 382456 URL: http://svn.apache.org/viewcvs?rev=382456&view=rev Log: Working on the parallel sender, but had to write down some ideas for a lazy replicated map, which could be one solution to primary/secondary replication
Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/AckProtocol.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/Server.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/ServerThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=382456&r1=382455&r2=382456&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Thu Mar 2 09:38:40 2006 @@ -329,7 +329,7 @@ * @return - a full package (header,compress,size,data,footer) * */ - public static byte[] createDataPackage(ClusterData cdata) throws java.io.IOException { + public static byte[] createDataPackage(ClusterData cdata) { return createDataPackage(cdata.getDataPackage()); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java?rev=382456&r1=382455&r2=382456&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java Thu Mar 2 09:38:40 2006 @@ -241,10 +241,10 @@ * @throws IOException * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method */ - public synchronized void setMessage(ChannelMessage data) throws IOException { + public synchronized void setMessage(byte[] data) throws IOException { reset(); if ( data != null ) { - current = XByteBuffer.createDataPackage((ClusterData)data); + current = data; remaining = current.length; curPos = 0; if (connected) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java?rev=382456&r1=382455&r2=382456&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java Thu Mar 2 09:38:40 2006 @@ -13,17 +13,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.catalina.tribes.tcp; + +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.io.ClusterData; + /** - * A class that uses NIO to send data in parallel to several remote nodes. - * + * <p>Title: </p> + * + * <p>Description: </p> * - * @author Filip Hanik + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable * @version 1.0 */ public class ParallelNioSender { - public ParallelNioSender() { + protected long timeout; + protected boolean waitForAck; + + public ParallelNioSender(long timeout, boolean waitForAck) { + this.timeout = timeout; + this.waitForAck = waitForAck; } + + + public synchronized void sendMessage(Member mbr, ChannelMessage msg) throws ChannelException { + long start = System.currentTimeMillis(); + byte[] data = XByteBuffer.createDataPackage((ClusterData)msg); + + + } + + protected synchronized + } Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/AckProtocol.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/AckProtocol.java?rev=382456&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/AckProtocol.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/AckProtocol.java Thu Mar 2 09:38:40 2006 @@ -0,0 +1,12 @@ +package org.apache.catalina.tribes.test; + +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.io.ClusterData; +public class AckProtocol { + public byte[] processInput(XByteBuffer buf, int counter) throws Exception { + ClusterData data = buf.extractPackage(true); + System.out.println("Received:\n\tThread:"+Thread.currentThread().getName()+"\n\tCount:"+counter+"\n\tData:"+new String(data.getMessage().getBytes())); + return org.apache.catalina.tribes.tcp.Constants.ACK_COMMAND; + } + +} Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/Server.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/Server.java?rev=382456&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/Server.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/Server.java Thu Mar 2 09:38:40 2006 @@ -0,0 +1,27 @@ +package org.apache.catalina.tribes.test; +import java.net.*; +import java.io.*; + +public class Server { + public static void main(String[] args) throws IOException { + System.out.println("Usage: Server [port]"); + int port = 4444; + if ( args.length > 0 ) port = Integer.parseInt(args[0]); + + ServerSocket serverSocket = null; + boolean listening = true; + + try { + serverSocket = new ServerSocket(port); + System.out.println("Echo server is listening on port "+port); + } catch (IOException e) { + System.err.println("Could not listen on port: "+port+"."); + System.exit(-1); + } + + while (listening) + new ServerThread(serverSocket.accept()).start(); + + serverSocket.close(); + } +} Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/ServerThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/ServerThread.java?rev=382456&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/ServerThread.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/ServerThread.java Thu Mar 2 09:38:40 2006 @@ -0,0 +1,58 @@ +package org.apache.catalina.tribes.test; + +import java.net.*; +import java.io.*; +import org.apache.catalina.tribes.io.XByteBuffer; + +public class ServerThread + extends Thread { + private Socket socket = null; + private static int incounter = 0; + public ServerThread(Socket socket) { + super("ServerThread"); + this.socket = socket; + } + + public synchronized int incounter() { + return ++incounter; + } + + public void run() { + + try { + this.socket.setSoLinger(false,0); + System.out.println("Accepted:\n\tThread:"+Thread.currentThread().getName()); + OutputStream out = (socket.getOutputStream()); + InputStream in = socket.getInputStream(); + byte[] input = new byte[43800]; + byte[] outputLine; + XByteBuffer buf = new XByteBuffer(input.length, true); + AckProtocol ack = new AckProtocol(); + int length = 0; + + while ( (length = in.read(input)) >= 0) { + buf.append(input, 0, length); + if (buf.countPackages() > 0) { + outputLine = ack.processInput(buf,incounter()); + out.write(outputLine); + } + } + System.out.println("Finished:\n\tThread:"+Thread.currentThread().getName()); + out.close(); + in.close(); + socket.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + + + } + + public void printBytes(byte[] d, int offset, int length) { + for (int i=offset; i<length; i++ ) { + System.out.println("["+(i-offset)+"]="+d[i]); + } + } + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=382456&r1=382455&r2=382456&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Mar 2 09:38:40 2006 @@ -16,9 +16,30 @@ don't return until all have been processed on the remote end.) 9. CoordinatorInterceptor - manages the selection of a cluster coordinator + just had a brilliant idea, if GroupChannel keeps its own view of members, + the coordinator interceptor can hold on to the member added/disappared event + It can also intercept down going messages if the coordinator disappeared + while a new coordinator is chosen + It can also intercept down going messages for members disappeared that the + calling app not yet knows about, to avoid a ChannelException + + 10. Xa2PhaseCommitInterceptor - make sure the message doesn't reach the receiver unless all members got it 11. Code a ReplicatedFileSystem example, package org.apache.catalina.tipis + +12. LazyReplicatedHashMap - memory efficient clustered map. + This map can be used for PRIMARY/SECONDARY session replication + Ahh, the beauty of storing data in remote locations + The lazy hash map will only replicate its attribute names to all members in the group + with that name, it will also replicate the source (where to get the object) + and the backup member where it can find a backup if the source is gone. + If the source disappears, the backup node will replicate attributes that + are stored to a new primary backups can be chosen on round robin. + When a new member arrives and requests state, that member will get all the attribute + names and the locations. + It can replicate every X seconds, or on dirty flags by the objects stored, + or a request to scan for dirty flags, or a request with the objects. Tasks Completed =========================================== --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]