Author: fhanik
Date: Tue Jul 18 14:04:00 2006
New Revision: 423244

URL: http://svn.apache.org/viewvc?rev=423244&view=rev
Log:
Message dispatch interceptor uses a thread pool

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=423244&r1=423243&r2=423244&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
 Tue Jul 18 14:04:00 2006
@@ -15,17 +15,20 @@
 package org.apache.catalina.tribes.group.interceptors;
 
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 
  * Same implementation as the MessageDispatchInterceptor
  * except is ues an atomic long for the currentSize calculation
+ * and uses a thread pool for message sending.
  * 
  * @author Filip Hanik
  * @version 1.0
@@ -34,7 +37,11 @@
 public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
 
     protected AtomicLong currentSize = new AtomicLong(0);
-    protected LinkedBlockingQueue queue = new LinkedBlockingQueue();
+    protected ThreadPoolExecutor executor = null;
+    protected int maxThreads = 10;
+    protected int maxSpareThreads = 2;
+    protected long keepAliveTime = 5000;
+    protected LinkedBlockingQueue<Runnable> runnablequeue = new 
LinkedBlockingQueue<Runnable>();
 
     public long getCurrentSize() {
         return currentSize.get();
@@ -50,32 +57,55 @@
     }
     
     public boolean addToQueue(ChannelMessage msg, Member[] destination, 
InterceptorPayload payload) {
-        LinkObject obj = new LinkObject(msg,destination,payload);
-        return queue.offer(obj);
+        final LinkObject obj = new LinkObject(msg,destination,payload);
+        Runnable r = new Runnable() {
+            public void run() {
+                sendAsyncData(obj);
+            }
+        };
+        executor.execute(r);
+        return true;
     }
 
     public LinkObject removeFromQueue() {
-        LinkObject head = null;
-        try {
-            head = (LinkObject)queue.take();
-        }catch ( InterruptedException x ) {}
-        return head;
+        return null; //not used, thread pool contains its own queue.
     }
 
     public void startQueue() {
-        msgDispatchThread = new Thread(this);
-        
msgDispatchThread.setName("MessageDispatch15Interceptor.MessageDispatchThread");
-        msgDispatchThread.setDaemon(true);
-        msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+        if ( run ) return;
+        executor = new 
ThreadPoolExecutor(maxSpareThreads,maxThreads,keepAliveTime,TimeUnit.MILLISECONDS,runnablequeue);
         run = true;
-        msgDispatchThread.start();
     }
 
     public void stopQueue() {
         run = false;
-        msgDispatchThread.interrupt();
+        executor.shutdownNow();
         setAndGetCurrentSize(0);
+        runnablequeue.clear();
     }
 
+    public long getKeepAliveTime() {
+        return keepAliveTime;
+    }
+
+    public int getMaxSpareThreads() {
+        return maxSpareThreads;
+    }
+
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    public void setKeepAliveTime(long keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public void setMaxSpareThreads(int maxSpareThreads) {
+        this.maxSpareThreads = maxSpareThreads;
+    }
+
+    public void setMaxThreads(int maxThreads) {
+        this.maxThreads = maxThreads;
+    }
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=423244&r1=423243&r2=423244&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
 Tue Jul 18 14:04:00 2006
@@ -165,32 +165,37 @@
             LinkObject link = removeFromQueue();
             if ( link == null ) continue; //should not happen unless we exceed 
wait time
             while ( link != null && run ) {
-                ChannelMessage msg = link.data();
-                Member[] destination = link.getDestination();
-                try {
-                    super.sendMessage(destination,msg,null);
-                    try {
-                        if ( link.getHandler() != null ) 
link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId())); 
-                    } catch ( Exception ex ) {
-                        log.error("Unable to report back completed 
message.",ex);
-                    }
-                } catch ( Exception x ) {
-                    ChannelException cx = null;
-                    if ( x instanceof ChannelException ) cx = 
(ChannelException)x;
-                    else cx = new ChannelException(x);
-                    if ( log.isDebugEnabled() ) log.debug("Error while 
processing async message.",x);
-                    try {
-                        if (link.getHandler() != null) 
link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId()));
-                    } catch ( Exception ex ) {
-                        log.error("Unable to report back error message.",ex);
-                    }
-                } finally {
-                    addAndGetCurrentSize(-msg.getMessage().getLength());
-                    link = link.next();
-                }//try
+                link = sendAsyncData(link);
             }//while
         }//while
     }//run
+
+    protected LinkObject sendAsyncData(LinkObject link) {
+        ChannelMessage msg = link.data();
+        Member[] destination = link.getDestination();
+        try {
+            super.sendMessage(destination,msg,null);
+            try {
+                if ( link.getHandler() != null ) 
link.getHandler().handleCompletion(new UniqueId(msg.getUniqueId())); 
+            } catch ( Exception ex ) {
+                log.error("Unable to report back completed message.",ex);
+            }
+        } catch ( Exception x ) {
+            ChannelException cx = null;
+            if ( x instanceof ChannelException ) cx = (ChannelException)x;
+            else cx = new ChannelException(x);
+            if ( log.isDebugEnabled() ) log.debug("Error while processing 
async message.",x);
+            try {
+                if (link.getHandler() != null) 
link.getHandler().handleError(cx, new UniqueId(msg.getUniqueId()));
+            } catch ( Exception ex ) {
+                log.error("Unable to report back error message.",ex);
+            }
+        } finally {
+            addAndGetCurrentSize(-msg.getMessage().getLength());
+            link = link.next();
+        }//try
+        return link;
+    }
 
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=423244&r1=423243&r2=423244&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
 Tue Jul 18 14:04:00 2006
@@ -9,6 +9,7 @@
 import org.apache.catalina.tribes.group.GroupChannel;
 import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
 import 
org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import 
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
 
 /**
  * <p>Title: </p> 
@@ -24,7 +25,7 @@
  */
 public class TestDataIntegrity extends TestCase {
     int msgCount = 1000;
-    int threadCount = 8;
+    int threadCount = 20;
     GroupChannel channel1;
     GroupChannel channel2;
     Listener listener1;
@@ -32,9 +33,9 @@
     protected void setUp() throws Exception {
         super.setUp();
         channel1 = new GroupChannel();
-        channel1.addInterceptor(new MessageDispatchInterceptor());
+        channel1.addInterceptor(new MessageDispatch15Interceptor());
         channel2 = new GroupChannel();
-        channel2.addInterceptor(new MessageDispatchInterceptor());
+        channel2.addInterceptor(new MessageDispatch15Interceptor());
         listener1 = new Listener();
         channel2.addChannelListener(listener1);
         channel1.start(GroupChannel.DEFAULT);
@@ -54,7 +55,7 @@
             threads[x] = new Thread() {
                 public void run() {
                     try {
-                        for (int i = 0; i < msgCount; i++) 
channel1.send(channel1.getMembers(), Data.createRandomData(),0);
+                        for (int i = 0; i < msgCount; i++) channel1.send(new 
Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0);
                     }catch ( Exception x ) {
                         x.printStackTrace();
                         return;
@@ -68,11 +69,46 @@
         for (int x=0; x<threads.length; x++ ) { threads[x].join();}
         //sleep for 50 sec, let the other messages in
         long start = System.currentTimeMillis();
-        while ( (System.currentTimeMillis()-start)<50000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+        while ( (System.currentTimeMillis()-start)<15000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
         System.err.println("Finished NO_ACK");
         assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
     }
     
+    public void testDataSendASYNCM() throws Exception {
+            System.err.println("Starting ASYNC MULTI THREAD");
+            Thread[] threads = new Thread[threadCount];
+            for (int x=0; x<threads.length; x++ ) {
+                threads[x] = new Thread() {
+                    public void run() {
+                        try {
+                            for (int i = 0; i < msgCount; i++) 
channel1.send(new Member[] {channel2.getLocalMember(false)}, 
Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+                        }catch ( Exception x ) {
+                            x.printStackTrace();
+                            return;
+                        } finally {
+                            threadCounter++;
+                        }
+                    }
+                };
+            }
+            for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+            for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+            //sleep for 50 sec, let the other messages in
+            long start = System.currentTimeMillis();
+            while ( (System.currentTimeMillis()-start)<15000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+            System.err.println("Finished ASYNC MULTI THREAD");
+            assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
+    }
+    public void testDataSendASYNC() throws Exception {
+        System.err.println("Starting ASYNC");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+        //sleep for 50 sec, let the other messages in
+        long start = System.currentTimeMillis();
+        while ( (System.currentTimeMillis()-start)<5000 && 
msgCount!=listener1.count) Thread.sleep(500);
+        System.err.println("Finished ASYNC");
+        assertEquals("Checking success messages.",msgCount,listener1.count);
+    }
+
     public void testDataSendACK() throws Exception {
         System.err.println("Starting ACK");
         for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK);
@@ -83,7 +119,7 @@
 
     public void testDataSendSYNCACK() throws Exception {
         System.err.println("Starting SYNC_ACK");
-        for (int i=0; i<msgCount; i++) 
channel1.send(channel1.getMembers(),Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
         Thread.sleep(250);
         System.err.println("Finished SYNC_ACK");
         assertEquals("Checking success messages.",msgCount,listener1.count);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to