Author: fhanik Date: Sat Jul 1 16:16:04 2006 New Revision: 418517 URL: http://svn.apache.org/viewvc?rev=418517&view=rev Log: Fixed threading hand off, now works correctly
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java?rev=418517&r1=418516&r2=418517&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java Sat Jul 1 16:16:04 2006 @@ -62,11 +62,14 @@ } protected void setupThread(WorkerThread thread) { - thread.setPool(this); - thread.setName (thread.getClass().getName()+"[" + inc()+"]"); - thread.setDaemon(true); - thread.setPriority(Thread.MAX_PRIORITY); - thread.start(); + synchronized (thread) { + thread.setPool(this); + thread.setName(thread.getClass().getName() + "[" + inc() + "]"); + thread.setDaemon(true); + thread.setPriority(Thread.MAX_PRIORITY); + thread.start(); + try {thread.wait(500); }catch ( InterruptedException x ) {} + } } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java?rev=418517&r1=418516&r2=418517&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReplicationThread.java Sat Jul 1 16:16:04 2006 @@ -58,6 +58,7 @@ // loop forever waiting for work to do public synchronized void run() { + this.notify(); while (isDoRun()) { try { // sleep and release object lock Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418517&r1=418516&r2=418517&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul 1 16:16:04 2006 @@ -59,6 +59,7 @@ // loop forever waiting for work to do public synchronized void run() { + this.notify(); if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { buffer = ByteBuffer.allocateDirect(getRxBufSize()); }else { @@ -77,8 +78,11 @@ if (key == null) { continue; // just in case } + ObjectReader reader = (ObjectReader)key.attachment(); try { - drainChannel (key); + reader.setLastAccess(System.currentTimeMillis()); + reader.access(); + drainChannel (key,reader); } catch (Exception e) { //this is common, since the sockets on the other //end expire after a certain time. @@ -86,13 +90,15 @@ //do nothing } else if ( e instanceof IOException ) { //dont spew out stack traces for IO exceptions unless debug is enabled. - if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e); - else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed."); + if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e); + else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"]."); } else if ( log.isErrorEnabled() ) { //this is a real error, log it. log.error("Exception caught in TcpReplicationThread.drainChannel.",e); } cancelKey(key); + } finally { + reader.finish(); } key = null; // done, ready for more, return to pool @@ -126,59 +132,51 @@ * re-enables OP_READ and calls wakeup() on the selector * so the selector will resume watching this channel. */ - protected void drainChannel (final SelectionKey key) throws Exception { + protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); // make buffer empty - ObjectReader reader = (ObjectReader)key.attachment(); - reader.setLastAccess(System.currentTimeMillis()); - try { - reader.access(); - // loop while data available, channel is non-blocking - while ((count = channel.read (buffer)) > 0) { - buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) - reader.append(buffer.array(),0,count,false); - else - reader.append(buffer,count,false); - buffer.clear(); // make buffer empty - } + // loop while data available, channel is non-blocking + while ((count = channel.read (buffer)) > 0) { + buffer.flip(); // make buffer readable + if ( buffer.hasArray() ) + reader.append(buffer.array(),0,count,false); + else + reader.append(buffer,count,false); + buffer.clear(); // make buffer empty + } - int pkgcnt = reader.count(); + int pkgcnt = reader.count(); - if ( pkgcnt > 0 ) { - ChannelMessage[] msgs = reader.execute(); - for ( int i=0; i<msgs.length; i++ ) { + if ( pkgcnt > 0 ) { + ChannelMessage[] msgs = reader.execute(); + for ( int i=0; i<msgs.length; i++ ) { + /** + * Use send ack here if you want to ack the request to the remote + * server before completing the request + * This is considered an asynchronized request + */ + if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + try { + //process the message + getCallback().messageDataReceived(msgs[i]); /** - * Use send ack here if you want to ack the request to the remote - * server before completing the request - * This is considered an asynchronized request + * Use send ack here if you want the request to complete on this + * server before sending the ack to the remote server + * This is considered a synchronized request */ - if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); - try { - //process the message - getCallback().messageDataReceived(msgs[i]); - /** - * Use send ack here if you want the request to complete on this - * server before sending the ack to the remote server - * This is considered a synchronized request - */ - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); - }catch ( Exception e ) { - log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); - } - if ( getUseBufferPool() ) { - BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); - msgs[i].setMessage(null); - } - } - } - } finally { - reader.finish(); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + }catch ( Exception e ) { + log.error("Processing of cluster message failed.",e); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); + } + if ( getUseBufferPool() ) { + BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); + msgs[i].setMessage(null); + } + } } - if (count < 0) { // close channel on EOF, invalidates the key --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]