Author: fhanik
Date: Mon Feb 27 13:07:39 2006
New Revision: 381446

URL: http://svn.apache.org/viewcvs?rev=381446&view=rev
Log:
Optimized all serialization of all messaging. ClusterData is now transferred as 
a byte array directly.


Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 Mon Feb 27 13:07:39 2006
@@ -50,6 +50,13 @@
     
     static int messageSize = 0;
     
+    public static int messagesSent = 0;
+    public static long messageSendTime = 0;
+    
+    public static synchronized void addSendStats(int count, long time) {
+        messagesSent+=count;
+        messageSendTime+=time;
+    }    
     
     
     public LoadTest(ManagedChannel channel, 
@@ -108,7 +115,10 @@
                     }
                 }
                 if ( (counter % statsInterval) == 0 && (counter > 0)) {
-                    printSendStats(counter, messageSize, sendTime);
+                    //add to the global counter
+                    addSendStats(counter,sendTime);
+                    //print from the global counter
+                    printSendStats(LoadTest.messagesSent, 
LoadTest.messageSize, LoadTest.messageSendTime);
                 }
 
             }
@@ -122,7 +132,7 @@
         float cnt = (float)counter;
         float size = (float)messageSize;
         float time = (float)sendTime / 1000;
-        log.info("****SEND STATS*****"+
+        log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
                  "\n\tMessage count:"+counter+
                  "\n\tTotal bytes  :"+(long)(size*cnt)+
                  "\n\tTotal seconds:"+(time)+
@@ -179,9 +189,9 @@
         if ( (messagesReceived%statsInterval)==0 || 
(messagesReceived==msgCount)) {
             float bytes = 
(float)(((LoadMessage)msg).getMessage().length*messagesReceived);
             float seconds = ((float)(System.currentTimeMillis()-receiveStart)) 
/ 1000f;
-            log.info("****RECEIVE STATS*****"+
+            log.info("****RECEIVE 
STATS-"+Thread.currentThread().getName()+"*****"+
                      "\n\tMessage count :"+(long)messagesReceived+
-                     "\n\tTotal bytes   :"+bytes+
+                     "\n\tTotal bytes   :"+(long)bytes+
                      "\n\tTime since 1st:"+seconds+" seconds"+
                      "\n\tBytes/second  :"+(bytes/seconds)+
                      "\n\tMBytes/second :"+(bytes/seconds/1024f/1024f));
@@ -237,7 +247,7 @@
         
         public byte[] getMessage() {
             byte[] data = new byte[size+4];
-            System.arraycopy(XByteBuffer.toBytes(msgNr),0,data,0,4);
+            XByteBuffer.toBytes(msgNr,data,0);
             if ( message != null ) {
                 System.arraycopy(message, 0, data, 4, message.length);
             }else {
@@ -271,6 +281,7 @@
                            "[-gzip]  \n\t\t"+
                            "[-pause nrofsecondstopausebetweensends]  \n\t\t"+
                            "[-sender pooled|fastasyncqueue]  \n\t\t"+
+                           "[-threads numberofsenderthreads]  \n\t\t"+
                            "[-break (halts execution on exception)]\n"+
                            "Example:\n\t"+
                            "java LoadTest -port 4004\n\t"+
@@ -291,6 +302,7 @@
         int count = 1000000;
         int stats = 10000;
         boolean breakOnEx = false;
+        int threads = 1;
         String sender = "pooled";
         if ( args.length == 0 ) {
             args = new String[] {"-help"};
@@ -302,6 +314,8 @@
                 sender = args[++i];
             } else if ("-port".equals(args[i])) {
                 port = Integer.parseInt(args[++i]);
+            } else if ("-threads".equals(args[i])) {
+                threads = Integer.parseInt(args[++i]);
             } else if ("-count".equals(args[i])) {
                 count = Integer.parseInt(args[++i]);
             } else if ("-pause".equals(args[i])) {
@@ -366,7 +380,15 @@
         channel.setChannelListener(test);
         channel.setMembershipListener(test);
         channel.start(channel.DEFAULT);
+        while ( threads > 1 ) {
+            Thread t = new Thread(test);
+            t.setDaemon(true);
+            t.start();
+            threads--;
+            test = new 
LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+        }
         test.run();
+        
         System.out.println("System test complete, sleeping to let threads 
finish.");
         Thread.sleep(60*1000*60);
     }    

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Mon Feb 27 13:07:39 2006
@@ -16,22 +16,22 @@
 package org.apache.catalina.tribes.group;
 
 
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.catalina.tribes.ByteMessage;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelReceiver;
 import org.apache.catalina.tribes.ChannelSender;
+import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
 import org.apache.catalina.tribes.MembershipService;
 import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
-import java.io.Serializable;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.ManagedChannel;
-import java.util.Iterator;
-import java.util.UUID;
-import org.apache.catalina.tribes.ByteMessage;
 
 /**
  * The GroupChannel manages the replication channel. It coordinates
@@ -81,15 +81,6 @@
     }
     
     
-    public byte[] getUUID() {
-        UUID id = UUID.randomUUID();
-        long msb = id.getMostSignificantBits();
-        long lsb = id.getLeastSignificantBits();
-        byte[] data = new byte[16];
-        System.arraycopy(XByteBuffer.toBytes(msb),0,data,0,8);
-        System.arraycopy(XByteBuffer.toBytes(lsb),0,data,8,8);
-        return data;
-    }
     /**
      * Send a message to one or more members in the cluster
      * @param destination Member[] - the destinations, null or zero length 
means all
@@ -101,9 +92,8 @@
         if ( msg == null ) return;
         try {
             int options = 0;
-            ClusterData data = new ClusterData();
+            ClusterData data = new ClusterData();//generates a unique Id
             data.setAddress(getLocalMember());
-            data.setUniqueId(getUUID());
             data.setTimestamp(System.currentTimeMillis());
             byte[] b = null;
             if ( msg instanceof ByteMessage ){

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 Mon Feb 27 13:07:39 2006
@@ -24,6 +24,7 @@
 import org.apache.catalina.tribes.mcast.McastMember;
 import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
+import java.util.UUID;
 
 /**
  * The cluster data class is used to transport around the byte array from
@@ -42,7 +43,15 @@
     private byte[] uniqueId ;
     private Member address;
 
-    public ClusterData() {}
+    public ClusterData() {
+        this(true);
+    }
+    
+    public ClusterData(boolean generateUUID) {
+        if ( generateUUID ) generateUUID();
+    }
+    
+    
     
     /**
      * @param type message type (class)
@@ -118,6 +127,17 @@
         this.address = address;
     }
     
+    public void generateUUID() {
+        UUID id = UUID.randomUUID();
+        long msb = id.getMostSignificantBits();
+        long lsb = id.getLeastSignificantBits();
+        byte[] data = new byte[16];
+        System.arraycopy(XByteBuffer.toBytes(msb),0,data,0,8);
+        System.arraycopy(XByteBuffer.toBytes(lsb),0,data,8,8);
+        setUniqueId(data);
+    }
+
+    
     
     /**
      * 
@@ -130,36 +150,57 @@
      * @return byte[]
      */
     public byte[] getDataPackage() throws IOException {
-        ByteArrayOutputStream bout = new 
ByteArrayOutputStream(getMessage().length*2);
-        ObjectOutputStream out = new ObjectOutputStream(bout);
-        out.writeInt(options);
-        out.writeLong(timestamp);
-        out.writeInt(uniqueId.length);
-        out.write(uniqueId);
         byte[] addr = ((McastMember)address).getData();
-        out.writeInt(addr.length);
-        out.write(addr);
-        out.writeInt(message.length);
-        out.write(message);
-        out.flush();
-        return bout.toByteArray();
-    }
-    
-    public static ClusterData getDataFromPackage(byte[] dataPackage) throws 
IOException {
-        ByteArrayInputStream bin = new ByteArrayInputStream(dataPackage);
-        ObjectInputStream in = new ObjectInputStream(bin);
-        ClusterData data = new ClusterData();
-        data.setOptions(in.readInt());
-        data.setTimestamp(in.readLong());
-        byte[] uniqueId = new byte[in.readInt()];
-        in.read(uniqueId);
-        data.setUniqueId(uniqueId);
-        byte[] addr = new byte[in.readInt()];
-        in.read(addr);
+        int length = 
+            4 + //options
+            8 + //timestamp  off=4
+            4 + //unique id length off=12
+            uniqueId.length+ //id data off=12+uniqueId.length
+            4 + //addr length off=12+uniqueId.length+4
+            addr.length+ //member data off=12+uniqueId.length+4+add.length
+            4 + //message length off=12+uniqueId.length+4+add.length+4
+            message.length;
+        byte[] data = new byte[length];
+        int offset = 0;
+        XByteBuffer.toBytes(options,data,offset);
+        offset = 4; //options
+        XByteBuffer.toBytes(timestamp,data,offset);
+        offset += 8; //timestamp
+        XByteBuffer.toBytes(uniqueId.length,data,offset);
+        offset += 4; //uniqueId.length
+        System.arraycopy(uniqueId,0,data,offset,uniqueId.length);
+        offset += uniqueId.length; //uniqueId data
+        XByteBuffer.toBytes(addr.length,data,offset);
+        offset += 4; //addr.length
+        System.arraycopy(addr,0,data,offset,addr.length);
+        offset += addr.length; //addr data
+        XByteBuffer.toBytes(message.length,data,offset);
+        offset += 4; //message.length
+        System.arraycopy(message,0,data,offset,message.length);
+        offset += message.length; //message data
+        return data;
+    }
+    
+    public static ClusterData getDataFromPackage(byte[] b) throws IOException {
+        ClusterData data = new ClusterData(false);
+        int offset = 0;
+        data.setOptions(XByteBuffer.toInt(b,offset));
+        offset += 4; //options
+        data.setTimestamp(XByteBuffer.toLong(b,offset));
+        offset += 8; //timestamp
+        data.uniqueId = new byte[XByteBuffer.toInt(b,offset)];
+        offset += 4; //uniqueId length
+        System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length);
+        offset += data.uniqueId.length; //uniqueId data
+        byte[] addr = new byte[XByteBuffer.toInt(b,offset)];
+        offset += 4; //addr length
+        System.arraycopy(b,offset,addr,0,addr.length);
         data.setAddress(McastMember.getMember(addr));
-        byte[] message = new byte[in.readInt()];
-        in.read(message);
-        data.setMessage(message);
+        offset += addr.length; //addr data
+        data.message = new byte[XByteBuffer.toInt(b,offset)];
+        offset += 4; //message length
+        System.arraycopy(b,offset,data.message,0,data.message.length);
+        offset += data.message.length; //message data
         return data;
     }
     

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Mon Feb 27 13:07:39 2006
@@ -276,14 +276,17 @@
      * @return - four bytes in an array
      */
     public static byte[] toBytes(int n) {
-        byte[] b = new byte[4];
-        b[3] = (byte) (n);
+        return toBytes(n,new byte[4],0);
+    }
+
+    public static byte[] toBytes(int n,byte[] b, int offset) {
+        b[offset+3] = (byte) (n);
         n >>>= 8;
-        b[2] = (byte) (n);
+        b[offset+2] = (byte) (n);
         n >>>= 8;
-        b[1] = (byte) (n);
+        b[offset+1] = (byte) (n);
         n >>>= 8;
-        b[0] = (byte) (n);
+        b[offset+0] = (byte) (n);
         return b;
     }
 
@@ -293,22 +296,24 @@
      * @return - eight bytes in an array
      */
     public static byte[] toBytes(long n) {
-        byte[] b = new byte[8];
-        b[7] = (byte) (n);
+        return toBytes(n,new byte[8],0);
+    }
+    public static byte[] toBytes(long n, byte[] b, int offset) {
+        b[offset+7] = (byte) (n);
         n >>>= 8;
-        b[6] = (byte) (n);
+        b[offset+6] = (byte) (n);
         n >>>= 8;
-        b[5] = (byte) (n);
+        b[offset+5] = (byte) (n);
         n >>>= 8;
-        b[4] = (byte) (n);
+        b[offset+4] = (byte) (n);
         n >>>= 8;
-        b[3] = (byte) (n);
+        b[offset+3] = (byte) (n);
         n >>>= 8;
-        b[2] = (byte) (n);
+        b[offset+2] = (byte) (n);
         n >>>= 8;
-        b[1] = (byte) (n);
+        b[offset+1] = (byte) (n);
         n >>>= 8;
-        b[0] = (byte) (n);
+        b[offset+0] = (byte) (n);
         return b;
     }
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
 Mon Feb 27 13:07:39 2006
@@ -64,7 +64,8 @@
     /**
      * The name of the cluster domain from this node
      */
-    private String domain;
+    protected byte[] domain;
+    protected transient String domainname;
     
     /**
      * Counter for how many messages have been sent from this member
@@ -101,7 +102,7 @@
                        long aliveTime) throws IOException {
         setHostname(host);
         this.port = port;
-        this.domain = domain;
+        this.domain = domain.getBytes();
         this.memberAliveTime=aliveTime;
     }
 
@@ -141,7 +142,7 @@
         //host - 4 bytes
         //dlen - 4 bytes
         //domain - dlen bytes
-        byte[] domaind = getDomain().getBytes();
+        byte[] domaind = this.domain;
         byte[] addr = host;
         byte[] data = new byte[8+4+addr.length+4+domaind.length];
         long alive=System.currentTimeMillis()-getServiceStartTime();
@@ -201,7 +202,8 @@
      * @return a cluster domain to the cluster
      */
     public String getDomain() {
-        return domain;
+        if ( this.domainname == null ) this.domainname = new String(domain);
+        return this.domainname;
     }
     
     /**
@@ -256,7 +258,7 @@
      * String representation of this object
      */
     public String toString()  {
-        return 
"org.apache.catalina.tribes.mcast.McastMember["+getName()+","+domain+","+getHostname()+","+port+",
 alive="+memberAliveTime+"]";
+        return 
"org.apache.catalina.tribes.mcast.McastMember["+getName()+","+getDomain()+","+getHostname()+","+port+",
 alive="+memberAliveTime+"]";
     }
 
     /**
@@ -355,7 +357,7 @@
     }
 
     public void setDomain(String domain) {
-        this.domain = domain;
+        this.domain = domain.getBytes();
     }
     public void setPort(int port) {
         this.port = port;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 Mon Feb 27 13:07:39 2006
@@ -72,6 +72,7 @@
      * current sender socket
      */
     private Socket socket = null;
+    private OutputStream socketout = null;
 
     /**
      * is Socket really connected
@@ -698,6 +699,7 @@
      */
     protected void createSocket() throws IOException, SocketException {
         socket = new Socket(getAddress(), getPort());
+        this.socketout = socket.getOutputStream();
     }
 
     /**
@@ -846,9 +848,8 @@
             isMessageTransferStarted = true ;
         }
         try {
-            OutputStream out = socket.getOutputStream();
-            out.write(XByteBuffer.createDataPackage((ClusterData)data));
-            out.flush();
+            socketout.write(XByteBuffer.createDataPackage((ClusterData)data));
+            socketout.flush();
             if (isWaitForAck()) waitForAck(ackTimeout);
         } finally {
             synchronized(this) {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 Mon Feb 27 13:07:39 2006
@@ -376,7 +376,7 @@
             time = System.currentTimeMillis();
         }
         try {
-            String key = getKey(member);
+            Object key = getKey(member);
             IDataSender sender = (IDataSender) map.get(key);
             sendMessageData(message, sender);
         } finally {
@@ -506,7 +506,7 @@
      */
     public synchronized void add(Member member) {
         try {
-            String key = getKey(member);
+            Object key = getKey(member);
             if (!map.containsKey(key)) {
                 IDataSender sender = IDataSenderFactory.getIDataSender(
                         replicationMode, member);
@@ -524,7 +524,7 @@
      * @see 
org.apache.catalina.tribes.ClusterSender#remove(org.apache.catalina.tribes.Member)
      */
     public synchronized void remove(Member member) {
-        String key = getKey(member);
+        Object key = getKey(member);
         IDataSender toberemoved = (IDataSender) map.get(key);
         if (toberemoved == null)
             return;
@@ -570,8 +570,8 @@
      * @param member
      * @return concat member.host:member.port
      */
-    protected String getKey(Member member) {
-        return member.getHost() + ":" + member.getPort();
+    protected Object getKey(Member member) {
+        return member;
     }
 
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to