Author: fhanik Date: Sat Jul 1 12:53:16 2006 New Revision: 418503 URL: http://svn.apache.org/viewvc?rev=418503&view=rev Log: Improved NioReceiver by almost 50% in performance, handles concurrency much better now
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Sat Jul 1 12:53:16 2006 @@ -215,6 +215,30 @@ * @param b byte[] * @return ChannelData */ + public static ChannelData getDataFromPackage(XByteBuffer xbuf) { + ChannelData data = new ChannelData(false); + int offset = 0; + data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset)); + offset += 4; //options + data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset)); + offset += 8; //timestamp + data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)]; + offset += 4; //uniqueId length + System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length); + offset += data.uniqueId.length; //uniqueId data + byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)]; + offset += 4; //addr length + System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length); + data.setAddress(MemberImpl.getMember(addr)); + offset += addr.length; //addr data + int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset); + System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize); + xbuf.setLength(xsize); + data.message = xbuf; + return data; + + } + public static ChannelData getDataFromPackage(byte[] b) { ChannelData data = new ChannelData(false); int offset = 0; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Sat Jul 1 12:53:16 2006 @@ -113,6 +113,11 @@ public int getLength() { return bufSize; } + + public void setLength(int size) { + if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer."); + bufSize = size; + } public void trim(int length) { if ( (bufSize - length) < 0 ) @@ -307,24 +312,24 @@ * @param clearFromBuffer - if true, the package will be removed from the byte buffer * @return - returns the actual message bytes (header, compress,size and footer not included). */ - public byte[] extractDataPackage(boolean clearFromBuffer) { + public XByteBuffer extractDataPackage(boolean clearFromBuffer) { int psize = countPackages(true); if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); int size = toInt(buf, START_DATA.length); - byte[] data = new byte[size]; - System.arraycopy(buf, START_DATA.length + 4, data, 0, size); + XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false); + System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size); if (clearFromBuffer) { int totalsize = START_DATA.length + 4 + size + END_DATA.length; bufSize = bufSize - totalsize; System.arraycopy(buf, totalsize, buf, 0, bufSize); } - return data; + return xbuf; } public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException { - byte[] data = extractDataPackage(clearFromBuffer); - ChannelData cdata = ChannelData.getDataFromPackage(data); + XByteBuffer xbuf = extractDataPackage(clearFromBuffer); + ChannelData cdata = ChannelData.getDataFromPackage(xbuf); return cdata; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java Sat Jul 1 12:53:16 2006 @@ -54,7 +54,7 @@ private boolean listen = false; private ThreadPool pool; private boolean direct = true; - private long tcpSelectorTimeout = 100; + private long tcpSelectorTimeout = 5000; //how many times to search for an available socket private int autoBind = 10; private int maxThreads = 25; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java Sat Jul 1 12:53:16 2006 @@ -54,7 +54,7 @@ */ public void start() throws IOException { try { - setPool(new ThreadPool(new Object(),getMaxThreads(),getMinThreads(),this)); + setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); } catch (Exception x) { log.fatal("ThreadPool can initilzed. Listener not started", x); if ( x instanceof IOException ) throw (IOException)x; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java Sat Jul 1 12:53:16 2006 @@ -275,7 +275,7 @@ byte d = (byte)i; ackbuf.append(d); if (ackbuf.doesPackageExist() ) { - byte[] ackcmd = ackbuf.extractDataPackage(true); + byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes(); ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA); failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA); ackReceived = ackReceived || failAckReceived; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul 1 12:53:16 2006 @@ -33,6 +33,7 @@ import org.apache.catalina.tribes.transport.ThreadPool; import org.apache.catalina.tribes.transport.WorkerThread; import org.apache.catalina.tribes.util.StringManager; +import java.util.LinkedList; /** * @author Filip Hanik @@ -55,8 +56,8 @@ private Selector selector = null; private ServerSocketChannel serverChannel = null; - - private Object interestOpsMutex = new Object(); + protected LinkedList events = new LinkedList(); +// private Object interestOpsMutex = new Object(); public NioReceiver() { } @@ -70,9 +71,9 @@ return (info); } - public Object getInterestOpsMutex() { - return interestOpsMutex; - } +// public Object getInterestOpsMutex() { +// return interestOpsMutex; +// } public void stop() { this.stopListening(); @@ -85,7 +86,8 @@ */ public void start() throws IOException { try { - setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); +// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); + setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); } catch (Exception x) { log.fatal("ThreadPool can initilzed. Listener not started", x); if ( x instanceof IOException ) throw (IOException)x; @@ -105,7 +107,7 @@ } public WorkerThread getWorkerThread() { - NioReplicationThread thread = new NioReplicationThread(this); + NioReplicationThread thread = new NioReplicationThread(this,this); thread.setUseBufferPool(this.getUseBufferPool()); thread.setRxBufSize(getRxBufSize()); thread.setOptions(getWorkerThreadOptions()); @@ -130,6 +132,31 @@ serverChannel.register(selector, SelectionKey.OP_ACCEPT); } + + public void addEvent(Runnable event) { + if ( selector != null ) { + synchronized (events) { + events.add(event); + } + selector.wakeup(); + } + } + + public void events() { + if ( events.size() == 0 ) return; + synchronized (events) { + Runnable r = null; + while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) { + try { + r.run(); + } catch ( Exception x ) { + log.error("",x); + } + } + events.clear(); + } + } + /** * get data from channel and store in byte array * send it to cluster @@ -148,7 +175,7 @@ // this may block for a long time, upon return the // selected set contains keys of the ready channels try { - + events(); int n = selector.select(getTcpSelectorTimeout()); if (n == 0) { //there is a good chance that we got here @@ -156,11 +183,11 @@ //selector wakeup(). //if that happens, we must ensure that that //thread has enough time to call interestOps - synchronized (interestOpsMutex) { +// synchronized (interestOpsMutex) { //if we got the lock, means there are no //keys trying to register for the //interestOps method - } +// } continue; // nothing to do } // get an iterator over the set of selected keys Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul 1 12:53:16 2006 @@ -49,9 +49,11 @@ private ByteBuffer buffer = null; private SelectionKey key; private int rxBufSize; - public NioReplicationThread (ListenCallback callback) + private NioReceiver receiver; + public NioReplicationThread (ListenCallback callback, NioReceiver receiver) { super(callback); + this.receiver = receiver; } // loop forever waiting for work to do @@ -131,7 +133,7 @@ * re-enables OP_READ and calls wakeup() on the selector * so the selector will resume watching this channel. */ - protected void drainChannel (SelectionKey key) throws Exception { + protected void drainChannel (final SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); // make buffer empty @@ -188,21 +190,25 @@ } //acquire the interestOps mutex - Object mutex = this.getPool().getInterestOpsMutex(); - synchronized (mutex) { - try { - if ( key.isValid() ) { - // cycle the selector so this key is active again - key.selector().wakeup(); - // resume interest in OP_READ, OP_WRITE - int resumeOps = key.interestOps() | SelectionKey.OP_READ; - key.interestOps(resumeOps); + Runnable r = new Runnable() { + public void run() { + try { + if (key.isValid()) { + // cycle the selector so this key is active again + key.selector().wakeup(); + // resume interest in OP_READ, OP_WRITE + int resumeOps = key.interestOps() | SelectionKey.OP_READ; + key.interestOps(resumeOps); + } + } catch (Exception x) { + try { + key.selector().close(); + } catch (Exception ignore) {} + log.error("Unable to cycle the selector, connection disconnected?", x); } - }catch ( Exception x ) { - try {key.selector().close();}catch ( Exception ignore){} - log.error("Unable to cycle the selector, connection disconnected?",x); } - } + }; + receiver.addEvent(r); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=418503&r1=418502&r2=418503&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Sat Jul 1 12:53:16 2006 @@ -156,7 +156,7 @@ ackbuf.append(readbuf,read); readbuf.clear(); if (ackbuf.doesPackageExist() ) { - byte[] ackcmd = ackbuf.extractDataPackage(true); + byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes(); boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA); boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA); if ( fack && getThrowOnFailedAck() ) throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA"); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]