Author: fhanik Date: Thu May 18 17:21:11 2006 New Revision: 407670 URL: http://svn.apache.org/viewvc?rev=407670&view=rev Log: Major performance improvement to use the buffer pool for incoming messages, interceptors will have to be careful since they need to deepclone the message if they wish to store it
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java Thu May 18 17:21:11 2006 @@ -45,6 +45,7 @@ protected HashMap fragpieces = new HashMap(); private int maxSize = 1024*100; private long expire = 1000 * 60; //one minute expiration + protected boolean deepclone = true; public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { @@ -90,7 +91,7 @@ public void defrag(ChannelMessage msg ) { FragKey key = new FragKey(msg.getUniqueId()); FragCollection coll = getFragCollection(key,msg); - coll.addMessage(msg); + coll.addMessage((ChannelMessage)msg.deepclone()); if ( coll.complete() ) { removeFragCollection(key); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Thu May 18 17:21:11 2006 @@ -74,7 +74,7 @@ public void messageReceived(ChannelMessage msg) { int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4); msg.getMessage().trim(4); - MessageOrder order = new MessageOrder(msgnr,msg); + MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone()); if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java Thu May 18 17:21:11 2006 @@ -67,6 +67,7 @@ private int soLingerTime = 3; private int soTrafficClass = 0x04 | 0x08 | 0x010; private int timeout = 15000; //15 seconds + private boolean useBufferPool = true; public ReceiverBase() { @@ -289,6 +290,10 @@ return timeout; } + public boolean getUseBufferPool() { + return useBufferPool; + } + public void setTcpSelectorTimeout(long selTimeout) { tcpSelectorTimeout = selTimeout; } @@ -362,7 +367,11 @@ public void setTimeout(int timeout) { this.timeout = timeout; } - + + public void setUseBufferPool(boolean useBufferPool) { + this.useBufferPool = useBufferPool; + } + public void heartbeat() { //empty operation } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/WorkerThread.java Thu May 18 17:21:11 2006 @@ -34,6 +34,7 @@ private ThreadPool pool; private boolean doRun = true; private int options; + protected boolean useBufferPool = true; public WorkerThread(ListenCallback callback) { this.callback = callback; @@ -77,4 +78,11 @@ notify(); } + public void setUseBufferPool(boolean usebufpool) { + useBufferPool = usebufpool; + } + + public boolean getUseBufferPool() { + return useBufferPool; + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java Thu May 18 17:21:11 2006 @@ -77,6 +77,7 @@ protected BioReplicationThread getReplicationThread() { BioReplicationThread result = new BioReplicationThread(this); result.setOptions(getWorkerThreadOptions()); + result.setUseBufferPool(this.getUseBufferPool()); return result; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java Thu May 18 17:21:11 2006 @@ -27,6 +27,7 @@ import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.BufferPool; /** * A worker thread class which can drain channels and echo-back the input. Each @@ -116,7 +117,11 @@ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND); log.error("Error thrown from messageDataReceived.",x); } - } + if ( getUseBufferPool() ) { + BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); + msgs[i].setMessage(null); + } + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Thu May 18 17:21:11 2006 @@ -103,6 +103,7 @@ public WorkerThread getWorkerThread() { NioReplicationThread thread = new NioReplicationThread(this); + thread.setUseBufferPool(this.getUseBufferPool()); thread.setRxBufSize(getRxBufSize()); thread.setOptions(getWorkerThreadOptions()); return thread; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=407670&r1=407669&r2=407670&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Thu May 18 17:21:11 2006 @@ -26,6 +26,7 @@ import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.BufferPool; /** * A worker thread class which can drain channels and echo-back the input. Each @@ -165,7 +166,11 @@ }catch ( Exception e ) { log.error("Processing of cluster message failed.",e); if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); - } + } + if ( getUseBufferPool() ) { + BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); + msgs[i].setMessage(null); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]