Author: fhanik Date: Wed Jul 5 16:10:36 2006 New Revision: 419379 URL: http://svn.apache.org/viewvc?rev=419379&view=rev Log: Truly non blocking, don't send up the stack until we have processed the message
Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/VERSION?rev=419379&r1=419378&r2=419379&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/VERSION (original) +++ tomcat/container/tc5.5.x/modules/groupcom/VERSION Wed Jul 5 16:10:36 2006 @@ -1,3 +1,6 @@ +0.9.4.7 + - release the socket key to the poller before sending the data requests up through the channel + this makes receiving data non blocking, even if the application is 0.9.4.6 - fix package processing, the old release was hogging a thread for a single connection, making concurrency not so efficient, this fix uses the thread for one package, then moves on. faster concurrency, and much less memory usage in high stress environments Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java?rev=419379&r1=419378&r2=419379&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java Wed Jul 5 16:10:36 2006 @@ -15,7 +15,7 @@ */ package org.apache.catalina.tribes; -import java.util.Arrays; +import org.apache.catalina.tribes.util.Arrays; /** * <p>Title: Represents a globabally unique Id</p> Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=419379&r1=419378&r2=419379&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Wed Jul 5 16:10:36 2006 @@ -35,6 +35,8 @@ * */ public class ChannelData implements ChannelMessage { + public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0]; + public static boolean USE_SECURE_RANDOM_FOR_UUID = false; /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=419379&r1=419378&r2=419379&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Wed Jul 5 16:10:36 2006 @@ -28,6 +28,7 @@ import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.BufferPool; import java.nio.channels.CancelledKeyException; +import org.apache.catalina.tribes.UniqueId; /** * A worker thread class which can drain channels and echo-back the input. Each @@ -148,43 +149,61 @@ } int pkgcnt = reader.count(); - - if ( pkgcnt > 0 ) { - ChannelMessage[] msgs = reader.execute(); - for ( int i=0; i<msgs.length; i++ ) { + + if (count < 0 && pkgcnt == 0 ) { + //end of stream, and no more packages to process + remoteEof(key); + return; + } + + ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); + + registerForRead(key);//register to read new data, before we send it off to avoid dead locks + + for ( int i=0; i<msgs.length; i++ ) { + /** + * Use send ack here if you want to ack the request to the remote + * server before completing the request + * This is considered an asynchronized request + */ + if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + try { + if ( log.isTraceEnabled() ) { + try { + log.trace("Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis())); + }catch ( Throwable t ) {} + } + //process the message + getCallback().messageDataReceived(msgs[i]); /** - * Use send ack here if you want to ack the request to the remote - * server before completing the request - * This is considered an asynchronized request + * Use send ack here if you want the request to complete on this + * server before sending the ack to the remote server + * This is considered a synchronized request */ - if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); - try { - //process the message - getCallback().messageDataReceived(msgs[i]); - /** - * Use send ack here if you want the request to complete on this - * server before sending the ack to the remote server - * This is considered a synchronized request - */ - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); - }catch ( Exception e ) { - log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); - } - if ( getUseBufferPool() ) { - BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); - msgs[i].setMessage(null); - } - } - } + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + }catch ( Exception e ) { + log.error("Processing of cluster message failed.",e); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); + } + if ( getUseBufferPool() ) { + BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); + msgs[i].setMessage(null); + } + } if (count < 0) { - // close channel on EOF, invalidates the key - if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting"); - cancelKey(key); + remoteEof(key); return; } - + } + + private void remoteEof(SelectionKey key) { + // close channel on EOF, invalidates the key + if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting"); + cancelKey(key); + } + + protected void registerForRead(final SelectionKey key) { //register our OP_READ interest Runnable r = new Runnable() { public void run() { @@ -207,7 +226,6 @@ } }; receiver.addEvent(r); - } private void cancelKey(final SelectionKey key) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=419379&r1=419378&r2=419379&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Wed Jul 5 16:10:36 2006 @@ -107,6 +107,10 @@ return new UniqueId(data); } + public static boolean equals(byte[] o1, byte[] o2) { + return java.util.Arrays.equals(o1,o2); + } + public static boolean equals(Object[] o1, Object[] o2) { boolean result = o1.length == o2.length; if ( result ) for (int i=0; i<o1.length && result; i++ ) result = o1[i].equals(o2[i]); @@ -171,6 +175,19 @@ //System.out.println("Members:"+toNameString(members)); return idx; } + + public static int hashCode(byte a[]) { + if (a == null) + return 0; + + int result = 1; + for (int i=0; i<a.length; i++) { + byte element = a[i]; + result = 31 * result + element; + } + return result; + } + --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]