Author: fhanik Date: Mon Feb 27 13:07:39 2006 New Revision: 381446 URL: http://svn.apache.org/viewcvs?rev=381446&view=rev Log: Optimized all serialization of all messaging. ClusterData is now transferred as a byte array directly.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381446&r1=381445&r2=381446&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java Mon Feb 27 13:07:39 2006 @@ -50,6 +50,13 @@ static int messageSize = 0; + public static int messagesSent = 0; + public static long messageSendTime = 0; + + public static synchronized void addSendStats(int count, long time) { + messagesSent+=count; + messageSendTime+=time; + } public LoadTest(ManagedChannel channel, @@ -108,7 +115,10 @@ } } if ( (counter % statsInterval) == 0 && (counter > 0)) { - printSendStats(counter, messageSize, sendTime); + //add to the global counter + addSendStats(counter,sendTime); + //print from the global counter + printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime); } } @@ -122,7 +132,7 @@ float cnt = (float)counter; float size = (float)messageSize; float time = (float)sendTime / 1000; - log.info("****SEND STATS*****"+ + log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+ "\n\tMessage count:"+counter+ "\n\tTotal bytes :"+(long)(size*cnt)+ "\n\tTotal seconds:"+(time)+ @@ -179,9 +189,9 @@ if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) { float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived); float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f; - log.info("****RECEIVE STATS*****"+ + log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+ "\n\tMessage count :"+(long)messagesReceived+ - "\n\tTotal bytes :"+bytes+ + "\n\tTotal bytes :"+(long)bytes+ "\n\tTime since 1st:"+seconds+" seconds"+ "\n\tBytes/second :"+(bytes/seconds)+ "\n\tMBytes/second :"+(bytes/seconds/1024f/1024f)); @@ -237,7 +247,7 @@ public byte[] getMessage() { byte[] data = new byte[size+4]; - System.arraycopy(XByteBuffer.toBytes(msgNr),0,data,0,4); + XByteBuffer.toBytes(msgNr,data,0); if ( message != null ) { System.arraycopy(message, 0, data, 4, message.length); }else { @@ -271,6 +281,7 @@ "[-gzip] \n\t\t"+ "[-pause nrofsecondstopausebetweensends] \n\t\t"+ "[-sender pooled|fastasyncqueue] \n\t\t"+ + "[-threads numberofsenderthreads] \n\t\t"+ "[-break (halts execution on exception)]\n"+ "Example:\n\t"+ "java LoadTest -port 4004\n\t"+ @@ -291,6 +302,7 @@ int count = 1000000; int stats = 10000; boolean breakOnEx = false; + int threads = 1; String sender = "pooled"; if ( args.length == 0 ) { args = new String[] {"-help"}; @@ -302,6 +314,8 @@ sender = args[++i]; } else if ("-port".equals(args[i])) { port = Integer.parseInt(args[++i]); + } else if ("-threads".equals(args[i])) { + threads = Integer.parseInt(args[++i]); } else if ("-count".equals(args[i])) { count = Integer.parseInt(args[++i]); } else if ("-pause".equals(args[i])) { @@ -366,7 +380,15 @@ channel.setChannelListener(test); channel.setMembershipListener(test); channel.start(channel.DEFAULT); + while ( threads > 1 ) { + Thread t = new Thread(test); + t.setDaemon(true); + t.start(); + threads--; + test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx); + } test.run(); + System.out.println("System test complete, sleeping to let threads finish."); Thread.sleep(60*1000*60); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381446&r1=381445&r2=381446&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Mon Feb 27 13:07:39 2006 @@ -16,22 +16,22 @@ package org.apache.catalina.tribes.group; +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.catalina.tribes.ByteMessage; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelInterceptor; +import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.ChannelSender; +import org.apache.catalina.tribes.ManagedChannel; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.MembershipService; import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.io.XByteBuffer; -import java.io.Serializable; -import org.apache.catalina.tribes.ChannelListener; -import org.apache.catalina.tribes.ManagedChannel; -import java.util.Iterator; -import java.util.UUID; -import org.apache.catalina.tribes.ByteMessage; /** * The GroupChannel manages the replication channel. It coordinates @@ -81,15 +81,6 @@ } - public byte[] getUUID() { - UUID id = UUID.randomUUID(); - long msb = id.getMostSignificantBits(); - long lsb = id.getLeastSignificantBits(); - byte[] data = new byte[16]; - System.arraycopy(XByteBuffer.toBytes(msb),0,data,0,8); - System.arraycopy(XByteBuffer.toBytes(lsb),0,data,8,8); - return data; - } /** * Send a message to one or more members in the cluster * @param destination Member[] - the destinations, null or zero length means all @@ -101,9 +92,8 @@ if ( msg == null ) return; try { int options = 0; - ClusterData data = new ClusterData(); + ClusterData data = new ClusterData();//generates a unique Id data.setAddress(getLocalMember()); - data.setUniqueId(getUUID()); data.setTimestamp(System.currentTimeMillis()); byte[] b = null; if ( msg instanceof ByteMessage ){ Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=381446&r1=381445&r2=381446&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Mon Feb 27 13:07:39 2006 @@ -24,6 +24,7 @@ import org.apache.catalina.tribes.mcast.McastMember; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; +import java.util.UUID; /** * The cluster data class is used to transport around the byte array from @@ -42,7 +43,15 @@ private byte[] uniqueId ; private Member address; - public ClusterData() {} + public ClusterData() { + this(true); + } + + public ClusterData(boolean generateUUID) { + if ( generateUUID ) generateUUID(); + } + + /** * @param type message type (class) @@ -118,6 +127,17 @@ this.address = address; } + public void generateUUID() { + UUID id = UUID.randomUUID(); + long msb = id.getMostSignificantBits(); + long lsb = id.getLeastSignificantBits(); + byte[] data = new byte[16]; + System.arraycopy(XByteBuffer.toBytes(msb),0,data,0,8); + System.arraycopy(XByteBuffer.toBytes(lsb),0,data,8,8); + setUniqueId(data); + } + + /** * @@ -130,36 +150,57 @@ * @return byte[] */ public byte[] getDataPackage() throws IOException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(getMessage().length*2); - ObjectOutputStream out = new ObjectOutputStream(bout); - out.writeInt(options); - out.writeLong(timestamp); - out.writeInt(uniqueId.length); - out.write(uniqueId); byte[] addr = ((McastMember)address).getData(); - out.writeInt(addr.length); - out.write(addr); - out.writeInt(message.length); - out.write(message); - out.flush(); - return bout.toByteArray(); - } - - public static ClusterData getDataFromPackage(byte[] dataPackage) throws IOException { - ByteArrayInputStream bin = new ByteArrayInputStream(dataPackage); - ObjectInputStream in = new ObjectInputStream(bin); - ClusterData data = new ClusterData(); - data.setOptions(in.readInt()); - data.setTimestamp(in.readLong()); - byte[] uniqueId = new byte[in.readInt()]; - in.read(uniqueId); - data.setUniqueId(uniqueId); - byte[] addr = new byte[in.readInt()]; - in.read(addr); + int length = + 4 + //options + 8 + //timestamp off=4 + 4 + //unique id length off=12 + uniqueId.length+ //id data off=12+uniqueId.length + 4 + //addr length off=12+uniqueId.length+4 + addr.length+ //member data off=12+uniqueId.length+4+add.length + 4 + //message length off=12+uniqueId.length+4+add.length+4 + message.length; + byte[] data = new byte[length]; + int offset = 0; + XByteBuffer.toBytes(options,data,offset); + offset = 4; //options + XByteBuffer.toBytes(timestamp,data,offset); + offset += 8; //timestamp + XByteBuffer.toBytes(uniqueId.length,data,offset); + offset += 4; //uniqueId.length + System.arraycopy(uniqueId,0,data,offset,uniqueId.length); + offset += uniqueId.length; //uniqueId data + XByteBuffer.toBytes(addr.length,data,offset); + offset += 4; //addr.length + System.arraycopy(addr,0,data,offset,addr.length); + offset += addr.length; //addr data + XByteBuffer.toBytes(message.length,data,offset); + offset += 4; //message.length + System.arraycopy(message,0,data,offset,message.length); + offset += message.length; //message data + return data; + } + + public static ClusterData getDataFromPackage(byte[] b) throws IOException { + ClusterData data = new ClusterData(false); + int offset = 0; + data.setOptions(XByteBuffer.toInt(b,offset)); + offset += 4; //options + data.setTimestamp(XByteBuffer.toLong(b,offset)); + offset += 8; //timestamp + data.uniqueId = new byte[XByteBuffer.toInt(b,offset)]; + offset += 4; //uniqueId length + System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length); + offset += data.uniqueId.length; //uniqueId data + byte[] addr = new byte[XByteBuffer.toInt(b,offset)]; + offset += 4; //addr length + System.arraycopy(b,offset,addr,0,addr.length); data.setAddress(McastMember.getMember(addr)); - byte[] message = new byte[in.readInt()]; - in.read(message); - data.setMessage(message); + offset += addr.length; //addr data + data.message = new byte[XByteBuffer.toInt(b,offset)]; + offset += 4; //message length + System.arraycopy(b,offset,data.message,0,data.message.length); + offset += data.message.length; //message data return data; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381446&r1=381445&r2=381446&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Mon Feb 27 13:07:39 2006 @@ -276,14 +276,17 @@ * @return - four bytes in an array */ public static byte[] toBytes(int n) { - byte[] b = new byte[4]; - b[3] = (byte) (n); + return toBytes(n,new byte[4],0); + } + + public static byte[] toBytes(int n,byte[] b, int offset) { + b[offset+3] = (byte) (n); n >>>= 8; - b[2] = (byte) (n); + b[offset+2] = (byte) (n); n >>>= 8; - b[1] = (byte) (n); + b[offset+1] = (byte) (n); n >>>= 8; - b[0] = (byte) (n); + b[offset+0] = (byte) (n); return b; } @@ -293,22 +296,24 @@ * @return - eight bytes in an array */ public static byte[] toBytes(long n) { - byte[] b = new byte[8]; - b[7] = (byte) (n); + return toBytes(n,new byte[8],0); + } + public static byte[] toBytes(long n, byte[] b, int offset) { + b[offset+7] = (byte) (n); n >>>= 8; - b[6] = (byte) (n); + b[offset+6] = (byte) (n); n >>>= 8; - b[5] = (byte) (n); + b[offset+5] = (byte) (n); n >>>= 8; - b[4] = (byte) (n); + b[offset+4] = (byte) (n); n >>>= 8; - b[3] = (byte) (n); + b[offset+3] = (byte) (n); n >>>= 8; - b[2] = (byte) (n); + b[offset+2] = (byte) (n); n >>>= 8; - b[1] = (byte) (n); + b[offset+1] = (byte) (n); n >>>= 8; - b[0] = (byte) (n); + b[offset+0] = (byte) (n); return b; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=381446&r1=381445&r2=381446&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Mon Feb 27 13:07:39 2006 @@ -64,7 +64,8 @@ /** * The name of the cluster domain from this node */ - private String domain; + protected byte[] domain; + protected transient String domainname; /** * Counter for how many messages have been sent from this member @@ -101,7 +102,7 @@ long aliveTime) throws IOException { setHostname(host); this.port = port; - this.domain = domain; + this.domain = domain.getBytes(); this.memberAliveTime=aliveTime; } @@ -141,7 +142,7 @@ //host - 4 bytes //dlen - 4 bytes //domain - dlen bytes - byte[] domaind = getDomain().getBytes(); + byte[] domaind = this.domain; byte[] addr = host; byte[] data = new byte[8+4+addr.length+4+domaind.length]; long alive=System.currentTimeMillis()-getServiceStartTime(); @@ -201,7 +202,8 @@ * @return a cluster domain to the cluster */ public String getDomain() { - return domain; + if ( this.domainname == null ) this.domainname = new String(domain); + return this.domainname; } /** @@ -256,7 +258,7 @@ * String representation of this object */ public String toString() { - return "org.apache.catalina.tribes.mcast.McastMember["+getName()+","+domain+","+getHostname()+","+port+", alive="+memberAliveTime+"]"; + return "org.apache.catalina.tribes.mcast.McastMember["+getName()+","+getDomain()+","+getHostname()+","+port+", alive="+memberAliveTime+"]"; } /** @@ -355,7 +357,7 @@ } public void setDomain(String domain) { - this.domain = domain; + this.domain = domain.getBytes(); } public void setPort(int port) { this.port = port; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381446&r1=381445&r2=381446&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Feb 27 13:07:39 2006 @@ -72,6 +72,7 @@ * current sender socket */ private Socket socket = null; + private OutputStream socketout = null; /** * is Socket really connected @@ -698,6 +699,7 @@ */ protected void createSocket() throws IOException, SocketException { socket = new Socket(getAddress(), getPort()); + this.socketout = socket.getOutputStream(); } /** @@ -846,9 +848,8 @@ isMessageTransferStarted = true ; } try { - OutputStream out = socket.getOutputStream(); - out.write(XByteBuffer.createDataPackage((ClusterData)data)); - out.flush(); + socketout.write(XByteBuffer.createDataPackage((ClusterData)data)); + socketout.flush(); if (isWaitForAck()) waitForAck(ackTimeout); } finally { synchronized(this) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=381446&r1=381445&r2=381446&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Mon Feb 27 13:07:39 2006 @@ -376,7 +376,7 @@ time = System.currentTimeMillis(); } try { - String key = getKey(member); + Object key = getKey(member); IDataSender sender = (IDataSender) map.get(key); sendMessageData(message, sender); } finally { @@ -506,7 +506,7 @@ */ public synchronized void add(Member member) { try { - String key = getKey(member); + Object key = getKey(member); if (!map.containsKey(key)) { IDataSender sender = IDataSenderFactory.getIDataSender( replicationMode, member); @@ -524,7 +524,7 @@ * @see org.apache.catalina.tribes.ClusterSender#remove(org.apache.catalina.tribes.Member) */ public synchronized void remove(Member member) { - String key = getKey(member); + Object key = getKey(member); IDataSender toberemoved = (IDataSender) map.get(key); if (toberemoved == null) return; @@ -570,8 +570,8 @@ * @param member * @return concat member.host:member.port */ - protected String getKey(Member member) { - return member.getHost() + ":" + member.getPort(); + protected Object getKey(Member member) { + return member; } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]