Author: fhanik Date: Thu May 18 14:12:23 2006 New Revision: 407634 URL: http://svn.apache.org/viewvc?rev=407634&view=rev Log: The throughput interceptor now measures throughput correctly and taking both multithreading and multiple destinations into account. Implemented a pool of buffers used by the group to send data down the stack
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=407634&r1=407633&r2=407634&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Thu May 18 14:12:23 2006 @@ -38,6 +38,7 @@ import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.Heartbeat; +import org.apache.catalina.tribes.io.BufferPool; /** * The default implementation of a Channel.<br> @@ -184,6 +185,7 @@ */ public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException { if ( msg == null ) throw new ChannelException("Cant send a NULL message"); + XByteBuffer buffer = null; try { if ( destination == null || destination.length == 0) throw new ChannelException("No destination given"); ChannelData data = new ChannelData(true);//generates a unique Id @@ -198,7 +200,8 @@ options = options & (~SEND_OPTIONS_BYTE_MESSAGE); } data.setOptions(options); - XByteBuffer buffer = new XByteBuffer(b.length+128,false); + //XByteBuffer buffer = new XByteBuffer(b.length+128,false); + buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false); buffer.append(b,0,b.length); data.setMessage(buffer); InterceptorPayload payload = null; @@ -211,6 +214,8 @@ }catch ( Exception x ) { if ( x instanceof ChannelException ) throw (ChannelException)x; throw new ChannelException(x); + } finally { + if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=407634&r1=407633&r2=407634&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Thu May 18 14:12:23 2006 @@ -44,7 +44,7 @@ private boolean run = false; private Thread msgDispatchThread = null; protected long currentSize = 0; - private boolean useDeepClone = false; + private boolean useDeepClone = true; public MessageDispatchInterceptor() { setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?rev=407634&r1=407633&r2=407634&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Thu May 18 14:12:23 2006 @@ -24,6 +24,8 @@ import org.apache.catalina.tribes.io.XByteBuffer; import java.text.DecimalFormat; import org.apache.catalina.tribes.membership.MemberImpl; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; @@ -34,35 +36,56 @@ * @version 1.0 */ public class ThroughputInterceptor extends ChannelInterceptorBase { - + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ThroughputInterceptor.class); + double mbTx = 0; double mbRx = 0; double timeTx = 0; - long msgTxCnt = 1; - long msgRxCnt = 1; + AtomicLong msgTxCnt = new AtomicLong(1); + AtomicLong msgRxCnt = new AtomicLong(1); + AtomicLong msgTxErr = new AtomicLong(1); int interval = 10000; - DecimalFormat df = new DecimalFormat("##.00"); - int addrlength = 0; + AtomicInteger access = new AtomicInteger(0); + long start = 0; + DecimalFormat df = new DecimalFormat("#0.00"); + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + if ( access.addAndGet(1) == 1 ) start = System.currentTimeMillis(); long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackageLength()); - long start = System.currentTimeMillis(); - super.sendMessage(destination,msg,payload); - long stop = System.currentTimeMillis(); - timeTx+= ((double)(stop-start))/1000d; - mbTx += ((double)bytes)/(1024d*1024d); - if ( msgTxCnt % interval == 0 ) report(); - msgTxCnt++; + try { + super.sendMessage(destination, msg, payload); + }catch ( ChannelException x ) { + msgTxErr.addAndGet(1); + access.addAndGet(-1); + throw x; + } + mbTx += ((double)(bytes*destination.length))/(1024d*1024d); + if ( access.addAndGet(-1) == 0 ) { + long stop = System.currentTimeMillis(); + timeTx += ( (double) (stop - start)) / 1000d; + } + + if (msgTxCnt.get() % interval == 0) { + double time = timeTx; + + if ( access.get() != 0 ) { + long now = System.currentTimeMillis(); + time = (double)(now - start + timeTx)/1000d; + } + report(time); + } + msgTxCnt.addAndGet(1); } public void messageReceived(ChannelMessage msg) { long bytes = XByteBuffer.getDataPackageLength(((ChannelData)msg).getDataPackage().length); mbRx += ((double)bytes)/(1024d*1024d); - if ( msgRxCnt % interval == 0 ) report(); - msgRxCnt++; + if ( msgRxCnt.get() % interval == 0 ) report(timeTx); + msgRxCnt.addAndGet(1); } - public void report() { + public void report(double timeTx) { StringBuffer buf = new StringBuffer("ThroughputInterceptor Report[\n\tTx Msg:"); buf.append(msgTxCnt).append(" messages\n\tSent:"); buf.append(df.format(mbTx)); @@ -70,18 +93,14 @@ buf.append(df.format(timeTx)); buf.append(" seconds\n\tSpeed:"); buf.append(df.format(mbTx/timeTx)); - buf.append(" MB/sec\n\tRx Msg:"); + buf.append(" MB/sec\n\tError Msg:"); + buf.append(msgTxErr).append("\n\tRx Msg:"); buf.append(msgRxCnt); buf.append(" messages\n\tReceived:"); buf.append(df.format(mbRx)).append(" MB]\n"); - System.out.println(buf); + if ( log.isInfoEnabled() ) log.info(buf); } - public void start(int svc) throws ChannelException{ - super.start(svc); - addrlength = ((MemberImpl)getLocalMember(true)).getData().length; - } - public void setInterval(int interval) { this.interval = interval; } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java?rev=407634&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool.java Thu May 18 14:12:23 2006 @@ -0,0 +1,87 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.io; + +import org.apache.commons.logging.LogFactory; + +import org.apache.commons.logging.Log; + +/** + * + * @author Filip Hanik + * + * @version 1.0 + */ +public class BufferPool { + protected static Log log = LogFactory.getLog(BufferPool.class); + + public static int DEFAULT_POOL_SIZE = 100*1024*1024; //100MB + + + + protected static BufferPool instance = null; + protected BufferPoolAPI pool = null; + + private BufferPool(BufferPoolAPI pool) { + this.pool = pool; + } + + public XByteBuffer getBuffer(int minSize, boolean discard) { + if ( pool != null ) return pool.getBuffer(minSize, discard); + else return new XByteBuffer(minSize,discard); + } + + public void returnBuffer(XByteBuffer buffer) { + if ( pool != null ) pool.returnBuffer(buffer); + } + + public void clear() { + if ( pool != null ) pool.clear(); + } + + + public static BufferPool getBufferPool() { + if ( (instance == null) ) { + synchronized (BufferPool.class) { + if ( instance == null ) { + BufferPoolAPI pool = null; + try { + Class clazz = Class.forName("org.apache.catalina.tribes.io.BufferPool15Impl"); + pool = (BufferPoolAPI)clazz.newInstance(); + pool.setMaxSize(DEFAULT_POOL_SIZE); + log.info("Created a buffer pool with max size:"+DEFAULT_POOL_SIZE+" bytes."); + } catch ( Exception x ) { + log.warn("Unable to initilize BufferPool, not pooling XByteBuffer objects.",x); + } + instance = new BufferPool(pool); + + }//end if + }//sync + }//end if + return instance; + } + + + public static interface BufferPoolAPI { + public void setMaxSize(int bytes); + + public XByteBuffer getBuffer(int minSize, boolean discard); + + public void returnBuffer(XByteBuffer buffer); + + public void clear(); + } +} Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java?rev=407634&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/BufferPool15Impl.java Thu May 18 14:12:23 2006 @@ -0,0 +1,62 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.io; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + * @author Filip Hanik + * @version 1.0 + */ +class BufferPool15Impl implements BufferPool.BufferPoolAPI { + protected int maxSize; + protected AtomicInteger size = new AtomicInteger(0); + protected ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + + public void setMaxSize(int bytes) { + this.maxSize = bytes; + } + + + public XByteBuffer getBuffer(int minSize, boolean discard) { + XByteBuffer buffer = (XByteBuffer)queue.poll(); + if ( buffer != null ) size.addAndGet(-buffer.getCapacity()); + if ( buffer == null ) buffer = new XByteBuffer(minSize,discard); + else if ( buffer.getCapacity() <= minSize ) buffer.expand(minSize); + buffer.setDiscard(discard); + buffer.reset(); + return buffer; + } + + public void returnBuffer(XByteBuffer buffer) { + if ( (size.get() + buffer.getCapacity()) <= maxSize ) { + size.addAndGet(buffer.getCapacity()); + queue.offer(buffer); + } + } + + public void clear() { + queue.clear(); + size.set(0); + } + + public int getMaxSize() { + return maxSize; + } + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=407634&r1=407633&r2=407634&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Thu May 18 14:12:23 2006 @@ -20,10 +20,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; -import java.io.ObjectInputStream; /** * The XByteBuffer provides a dual functionality. @@ -119,6 +119,10 @@ throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length); bufSize -= length; } + + public void reset() { + bufSize = 0; + } public byte[] getBytesDirect() { return this.buf; @@ -571,5 +575,12 @@ return data; } - + public void setDiscard(boolean discard) { + this.discard = discard; + } + + public boolean getDiscard() { + return discard; + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=407634&r1=407633&r2=407634&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Thu May 18 14:12:23 2006 @@ -80,7 +80,6 @@ public boolean process(SelectionKey key, boolean waitForAck) throws IOException { int ops = key.readyOps(); key.interestOps(key.interestOps() & ~ops); - //in case disconnect has been called if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key."); if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled."); Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=407634&r1=407633&r2=407634&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Thu May 18 14:12:23 2006 @@ -31,6 +31,7 @@ import org.apache.catalina.tribes.transport.ReplicationTransmitter; import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; /** * <p>Title: </p> @@ -68,6 +69,7 @@ .append("\n\t\t[-frag]") .append("\n\t\t[-fragsize maxmsgsize]") .append("\n\t\t[-throughput]") + .append("\n\t\t[-failuredetect]") .append("\n\t\t[-async]") .append("\n\t\t[-asyncsize maxqueuesizeinbytes]"); return buf; @@ -97,6 +99,7 @@ boolean async = false; int asyncsize = 1024*1024*50; //50MB boolean throughput = false; + boolean failuredetect = false; for (int i = 0; i < args.length; i++) { if ("-bind".equals(args[i])) { @@ -113,6 +116,8 @@ gzip = true; } else if ("-async".equals(args[i])) { async = true; + } else if ("-failuredetect".equals(args[i])) { + failuredetect = true; } else if ("-asyncsize".equals(args[i])) { asyncsize = Integer.parseInt(args[++i]); System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize); @@ -209,6 +214,11 @@ mi.setMaxQueueSize(asyncsize); channel.addInterceptor(mi); System.out.println("Added MessageDispatchInterceptor"); + } + + if ( failuredetect ) { + TcpFailureDetector tcpfi = new TcpFailureDetector(); + channel.addInterceptor(tcpfi); } return channel; Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java?rev=407634&r1=407633&r2=407634&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java Thu May 18 14:12:23 2006 @@ -390,6 +390,7 @@ t.start(); threads--; test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx); + test.channelOptions = channelOptions; } test.run(); if ( shutdown && send ) channel.stop(channel.DEFAULT); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]