Author: fhanik Date: Tue Jul 18 14:04:00 2006 New Revision: 423244 URL: http://svn.apache.org/viewvc?rev=423244&view=rev Log: Message dispatch interceptor uses a thread pool
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=423244&r1=423243&r2=423244&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Tue Jul 18 14:04:00 2006 @@ -15,17 +15,20 @@ package org.apache.catalina.tribes.group.interceptors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.transport.bio.util.LinkObject; +import java.util.concurrent.TimeUnit; /** * * Same implementation as the MessageDispatchInterceptor * except is ues an atomic long for the currentSize calculation + * and uses a thread pool for message sending. * * @author Filip Hanik * @version 1.0 @@ -34,7 +37,11 @@ public class MessageDispatch15Interceptor extends MessageDispatchInterceptor { protected AtomicLong currentSize = new AtomicLong(0); - protected LinkedBlockingQueue queue = new LinkedBlockingQueue(); + protected ThreadPoolExecutor executor = null; + protected int maxThreads = 10; + protected int maxSpareThreads = 2; + protected long keepAliveTime = 5000; + protected LinkedBlockingQueue<Runnable> runnablequeue = new LinkedBlockingQueue<Runnable>(); public long getCurrentSize() { return currentSize.get(); @@ -50,32 +57,55 @@ } public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) { - LinkObject obj = new LinkObject(msg,destination,payload); - return queue.offer(obj); + final LinkObject obj = new LinkObject(msg,destination,payload); + Runnable r = new Runnable() { + public void run() { + sendAsyncData(obj); + } + }; + executor.execute(r); + return true; } public LinkObject removeFromQueue() { - LinkObject head = null; - try { - head = (LinkObject)queue.take(); - }catch ( InterruptedException x ) {} - return head; + return null; //not used, thread pool contains its own queue. } public void startQueue() { - msgDispatchThread = new Thread(this); - msgDispatchThread.setName("MessageDispatch15Interceptor.MessageDispatchThread"); - msgDispatchThread.setDaemon(true); - msgDispatchThread.setPriority(Thread.MAX_PRIORITY); + if ( run ) return; + executor = new ThreadPoolExecutor(maxSpareThreads,maxThreads,keepAliveTime,TimeUnit.MILLISECONDS,runnablequeue); run = true; - msgDispatchThread.start(); } public void stopQueue() { run = false; - msgDispatchThread.interrupt(); + executor.shutdownNow(); setAndGetCurrentSize(0); + runnablequeue.clear(); } + public long getKeepAliveTime() { + return keepAliveTime; + } + + public int getMaxSpareThreads() { + return maxSpareThreads; + } + + public int getMaxThreads() { + return maxThreads; + } + + public void setKeepAliveTime(long keepAliveTime) { + this.keepAliveTime = keepAliveTime; + } + + public void setMaxSpareThreads(int maxSpareThreads) { + this.maxSpareThreads = maxSpareThreads; + } + + public void setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=423244&r1=423243&r2=423244&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Tue Jul 18 14:04:00 2006 @@ -165,32 +165,37 @@ LinkObject link = removeFromQueue(); if ( link == null ) continue; //should not happen unless we exceed wait time while ( link != null && run ) { - ChannelMessage msg = link.data(); - Member[] destination = link.getDestination(); - try { - super.sendMessage(destination,msg,null); - try { - if ( link.getHandler() != null ) link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId())); - } catch ( Exception ex ) { - log.error("Unable to report back completed message.",ex); - } - } catch ( Exception x ) { - ChannelException cx = null; - if ( x instanceof ChannelException ) cx = (ChannelException)x; - else cx = new ChannelException(x); - if ( log.isDebugEnabled() ) log.debug("Error while processing async message.",x); - try { - if (link.getHandler() != null) link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId())); - } catch ( Exception ex ) { - log.error("Unable to report back error message.",ex); - } - } finally { - addAndGetCurrentSize(-msg.getMessage().getLength()); - link = link.next(); - }//try + link = sendAsyncData(link); }//while }//while }//run + + protected LinkObject sendAsyncData(LinkObject link) { + ChannelMessage msg = link.data(); + Member[] destination = link.getDestination(); + try { + super.sendMessage(destination,msg,null); + try { + if ( link.getHandler() != null ) link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId())); + } catch ( Exception ex ) { + log.error("Unable to report back completed message.",ex); + } + } catch ( Exception x ) { + ChannelException cx = null; + if ( x instanceof ChannelException ) cx = (ChannelException)x; + else cx = new ChannelException(x); + if ( log.isDebugEnabled() ) log.debug("Error while processing async message.",x); + try { + if (link.getHandler() != null) link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId())); + } catch ( Exception ex ) { + log.error("Unable to report back error message.",ex); + } + } finally { + addAndGetCurrentSize(-msg.getMessage().getLength()); + link = link.next(); + }//try + return link; + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=423244&r1=423243&r2=423244&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java Tue Jul 18 14:04:00 2006 @@ -9,6 +9,7 @@ import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener; import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; +import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; /** * <p>Title: </p> @@ -24,7 +25,7 @@ */ public class TestDataIntegrity extends TestCase { int msgCount = 1000; - int threadCount = 8; + int threadCount = 20; GroupChannel channel1; GroupChannel channel2; Listener listener1; @@ -32,9 +33,9 @@ protected void setUp() throws Exception { super.setUp(); channel1 = new GroupChannel(); - channel1.addInterceptor(new MessageDispatchInterceptor()); + channel1.addInterceptor(new MessageDispatch15Interceptor()); channel2 = new GroupChannel(); - channel2.addInterceptor(new MessageDispatchInterceptor()); + channel2.addInterceptor(new MessageDispatch15Interceptor()); listener1 = new Listener(); channel2.addChannelListener(listener1); channel1.start(GroupChannel.DEFAULT); @@ -54,7 +55,7 @@ threads[x] = new Thread() { public void run() { try { - for (int i = 0; i < msgCount; i++) channel1.send(channel1.getMembers(), Data.createRandomData(),0); + for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0); }catch ( Exception x ) { x.printStackTrace(); return; @@ -68,11 +69,46 @@ for (int x=0; x<threads.length; x++ ) { threads[x].join();} //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); - while ( (System.currentTimeMillis()-start)<50000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); + while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); System.err.println("Finished NO_ACK"); assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); } + public void testDataSendASYNCM() throws Exception { + System.err.println("Starting ASYNC MULTI THREAD"); + Thread[] threads = new Thread[threadCount]; + for (int x=0; x<threads.length; x++ ) { + threads[x] = new Thread() { + public void run() { + try { + for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS); + }catch ( Exception x ) { + x.printStackTrace(); + return; + } finally { + threadCounter++; + } + } + }; + } + for (int x=0; x<threads.length; x++ ) { threads[x].start();} + for (int x=0; x<threads.length; x++ ) { threads[x].join();} + //sleep for 50 sec, let the other messages in + long start = System.currentTimeMillis(); + while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); + System.err.println("Finished ASYNC MULTI THREAD"); + assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); + } + public void testDataSendASYNC() throws Exception { + System.err.println("Starting ASYNC"); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS); + //sleep for 50 sec, let the other messages in + long start = System.currentTimeMillis(); + while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500); + System.err.println("Finished ASYNC"); + assertEquals("Checking success messages.",msgCount,listener1.count); + } + public void testDataSendACK() throws Exception { System.err.println("Starting ACK"); for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK); @@ -83,7 +119,7 @@ public void testDataSendSYNCACK() throws Exception { System.err.println("Starting SYNC_ACK"); - for (int i=0; i<msgCount; i++) channel1.send(channel1.getMembers(),Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK); Thread.sleep(250); System.err.println("Finished SYNC_ACK"); assertEquals("Checking success messages.",msgCount,listener1.count); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]