Author: fhanik
Date: Sat Jul  1 16:16:04 2006
New Revision: 418517

URL: http://svn.apache.org/viewvc?rev=418517&view=rev
Log:
Fixed threading hand off, now works correctly

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java?rev=418517&r1=418516&r2=418517&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
 Sat Jul  1 16:16:04 2006
@@ -62,11 +62,14 @@
     }
     
     protected void setupThread(WorkerThread thread) {
-        thread.setPool(this);
-        thread.setName (thread.getClass().getName()+"[" + inc()+"]");
-        thread.setDaemon(true);
-        thread.setPriority(Thread.MAX_PRIORITY);
-        thread.start();
+        synchronized (thread) {
+            thread.setPool(this);
+            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
+            thread.setDaemon(true);
+            thread.setPriority(Thread.MAX_PRIORITY);
+            thread.start();
+            try {thread.wait(500); }catch ( InterruptedException x ) {}
+        }
     }
 
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?rev=418517&r1=418516&r2=418517&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java
 Sat Jul  1 16:16:04 2006
@@ -58,6 +58,7 @@
     // loop forever waiting for work to do
     public synchronized void run()
     {
+        this.notify();
         while (isDoRun()) {
             try {
                 // sleep and release object lock

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418517&r1=418516&r2=418517&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
 Sat Jul  1 16:16:04 2006
@@ -59,6 +59,7 @@
     // loop forever waiting for work to do
     public synchronized void run()
     {
+        this.notify();
         if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
             buffer = ByteBuffer.allocateDirect(getRxBufSize());
         }else {
@@ -77,8 +78,11 @@
             if (key == null) {
                 continue;      // just in case
             }
+            ObjectReader reader = (ObjectReader)key.attachment();
             try {
-                drainChannel (key);
+                reader.setLastAccess(System.currentTimeMillis());
+                reader.access();
+                drainChannel (key,reader);
             } catch (Exception e) {
                 //this is common, since the sockets on the other
                 //end expire after a certain time.
@@ -86,13 +90,15 @@
                     //do nothing
                 } else if ( e instanceof IOException ) {
                     //dont spew out stack traces for IO exceptions unless 
debug is enabled.
-                    if (log.isDebugEnabled()) log.debug ("IOException in 
replication worker, unable to drain channel. Probable cause: Keep alive socket 
closed.", e);
-                    else log.warn ("IOException in replication worker, unable 
to drain channel. Probable cause: Keep alive socket closed.");
+                    if (log.isDebugEnabled()) log.debug ("IOException in 
replication worker, unable to drain channel. Probable cause: Keep alive socket 
closed["+e.getMessage()+"].", e);
+                    else log.warn ("IOException in replication worker, unable 
to drain channel. Probable cause: Keep alive socket 
closed["+e.getMessage()+"].");
                 } else if ( log.isErrorEnabled() ) {
                     //this is a real error, log it.
                     log.error("Exception caught in 
TcpReplicationThread.drainChannel.",e);
                 } 
                 cancelKey(key);
+            } finally {
+                reader.finish();
             }
             key = null;
             // done, ready for more, return to pool
@@ -126,59 +132,51 @@
      * re-enables OP_READ and calls wakeup() on the selector
      * so the selector will resume watching this channel.
      */
-    protected void drainChannel (final SelectionKey key) throws Exception {
+    protected void drainChannel (final SelectionKey key, ObjectReader reader) 
throws Exception {
         SocketChannel channel = (SocketChannel) key.channel();
         int count;
         buffer.clear();                        // make buffer empty
-        ObjectReader reader = (ObjectReader)key.attachment();
-        reader.setLastAccess(System.currentTimeMillis());
-        try {
-            reader.access();
 
-            // loop while data available, channel is non-blocking
-            while ((count = channel.read (buffer)) > 0) {
-                buffer.flip();         // make buffer readable
-                if ( buffer.hasArray() ) 
-                    reader.append(buffer.array(),0,count,false);
-                else 
-                    reader.append(buffer,count,false);
-                buffer.clear();                // make buffer empty
-            }
+        // loop while data available, channel is non-blocking
+        while ((count = channel.read (buffer)) > 0) {
+            buffer.flip();             // make buffer readable
+            if ( buffer.hasArray() ) 
+                reader.append(buffer.array(),0,count,false);
+            else 
+                reader.append(buffer,count,false);
+            buffer.clear();            // make buffer empty
+        }
 
-            int pkgcnt = reader.count();
+        int pkgcnt = reader.count();
 
-            if ( pkgcnt > 0 ) {
-                ChannelMessage[] msgs = reader.execute();
-                for ( int i=0; i<msgs.length; i++ ) {
+        if ( pkgcnt > 0 ) {
+            ChannelMessage[] msgs = reader.execute();
+            for ( int i=0; i<msgs.length; i++ ) {
+                /**
+                 * Use send ack here if you want to ack the request to the 
remote 
+                 * server before completing the request
+                 * This is considered an asynchronized request
+                 */
+                if (ChannelData.sendAckAsync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.ACK_COMMAND);
+                try {
+                    //process the message
+                    getCallback().messageDataReceived(msgs[i]);
                     /**
-                     * Use send ack here if you want to ack the request to the 
remote 
-                     * server before completing the request
-                     * This is considered an asynchronized request
+                     * Use send ack here if you want the request to complete 
on this 
+                     * server before sending the ack to the remote server
+                     * This is considered a synchronized request
                      */
-                    if (ChannelData.sendAckAsync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.ACK_COMMAND);
-                    try {
-                        //process the message
-                        getCallback().messageDataReceived(msgs[i]);
-                        /**
-                         * Use send ack here if you want the request to 
complete on this 
-                         * server before sending the ack to the remote server
-                         * This is considered a synchronized request
-                         */
-                        if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.ACK_COMMAND);
-                    }catch ( Exception e ) {
-                        log.error("Processing of cluster message failed.",e);
-                        if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
-                    }
-                    if ( getUseBufferPool() ) {
-                        
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
-                        msgs[i].setMessage(null);
-                    }
-                }                        
-            }
-        } finally {
-            reader.finish();
+                    if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.ACK_COMMAND);
+                }catch ( Exception e ) {
+                    log.error("Processing of cluster message failed.",e);
+                    if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+                }
+                if ( getUseBufferPool() ) {
+                    
BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+                    msgs[i].setMessage(null);
+                }
+            }                        
         }
-
         
         if (count < 0) {
             // close channel on EOF, invalidates the key



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to