Author: fhanik Date: Sat Jul 1 15:52:11 2006 New Revision: 418516 URL: http://svn.apache.org/viewvc?rev=418516&view=rev Log: Major improvements, there seems to be an error with the thread handling on the NIOReceiver and the hand off for the worker thread
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Sat Jul 1 15:52:11 2006 @@ -232,6 +232,7 @@ data.setAddress(MemberImpl.getMember(addr)); offset += addr.length; //addr data int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset); + offset += 4; //xsize length System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize); xbuf.setLength(xsize); data.message = xbuf; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Sat Jul 1 15:52:11 2006 @@ -41,6 +41,8 @@ private XByteBuffer buffer; protected long lastAccess = System.currentTimeMillis(); + + protected boolean accessed = false; /** * Creates an <code>ObjectReader</code> for a TCP NIO socket channel @@ -62,6 +64,18 @@ log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes."); this.buffer = new XByteBuffer(43800,true); } + } + + public synchronized void access() { + this.accessed = true; + } + + public synchronized void finish() { + this.accessed = false; + } + + public boolean isAccessed() { + return this.accessed; } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Sat Jul 1 15:52:11 2006 @@ -317,6 +317,7 @@ if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); int size = toInt(buf, START_DATA.length); XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false); + xbuf.setLength(size); System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size); if (clearFromBuffer) { int totalsize = START_DATA.length + 4 + size + END_DATA.length; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul 1 15:52:11 2006 @@ -1,321 +1,377 @@ -/* - * Copyright 1999,2004-2005 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.transport.nio; - -import java.io.IOException; -import java.net.ServerSocket; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Iterator; - -import org.apache.catalina.tribes.ChannelReceiver; -import org.apache.catalina.tribes.io.ListenCallback; -import org.apache.catalina.tribes.io.ObjectReader; -import org.apache.catalina.tribes.transport.Constants; -import org.apache.catalina.tribes.transport.ReceiverBase; -import org.apache.catalina.tribes.transport.ThreadPool; -import org.apache.catalina.tribes.transport.WorkerThread; -import org.apache.catalina.tribes.util.StringManager; -import java.util.LinkedList; - -/** - * @author Filip Hanik - * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ - */ -public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback { - - protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class); - - /** - * The string manager for this package. - */ - protected StringManager sm = StringManager.getManager(Constants.Package); - - /** - * The descriptive information about this implementation. - */ - private static final String info = "NioReceiver/1.0"; - - private Selector selector = null; - private ServerSocketChannel serverChannel = null; - - protected LinkedList events = new LinkedList(); -// private Object interestOpsMutex = new Object(); - - public NioReceiver() { - } - - /** - * Return descriptive information about this implementation and the - * corresponding version number, in the format - * <code><description>/<version></code>. - */ - public String getInfo() { - return (info); - } - -// public Object getInterestOpsMutex() { -// return interestOpsMutex; -// } - - public void stop() { - this.stopListening(); - } - - /** - * start cluster receiver - * @throws Exception - * @see org.apache.catalina.tribes.ClusterReceiver#start() - */ - public void start() throws IOException { - try { -// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); - setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); - } catch (Exception x) { - log.fatal("ThreadPool can initilzed. Listener not started", x); - if ( x instanceof IOException ) throw (IOException)x; - else throw new IOException(x.getMessage()); - } - try { - getBind(); - bind(); - Thread t = new Thread(this, "NioReceiver"); - t.setDaemon(true); - t.start(); - } catch (Exception x) { - log.fatal("Unable to start cluster receiver", x); - if ( x instanceof IOException ) throw (IOException)x; - else throw new IOException(x.getMessage()); - } - } - - public WorkerThread getWorkerThread() { - NioReplicationThread thread = new NioReplicationThread(this,this); - thread.setUseBufferPool(this.getUseBufferPool()); - thread.setRxBufSize(getRxBufSize()); - thread.setOptions(getWorkerThreadOptions()); - return thread; - } - - - - protected void bind() throws IOException { - // allocate an unbound server socket channel - serverChannel = ServerSocketChannel.open(); - // Get the associated ServerSocket to bind it with - ServerSocket serverSocket = serverChannel.socket(); - // create a new Selector for use below - selector = Selector.open(); - // set the port the server channel will listen to - //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); - bind(serverSocket,getTcpListenPort(),getAutoBind()); - // set non-blocking mode for the listening socket - serverChannel.configureBlocking(false); - // register the ServerSocketChannel with the Selector - serverChannel.register(selector, SelectionKey.OP_ACCEPT); - - } - - public void addEvent(Runnable event) { - if ( selector != null ) { - synchronized (events) { - events.add(event); - } - selector.wakeup(); - } - } - - public void events() { - if ( events.size() == 0 ) return; - synchronized (events) { - Runnable r = null; - while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) { - try { - r.run(); - } catch ( Exception x ) { - log.error("",x); - } - } - events.clear(); - } - } - - /** - * get data from channel and store in byte array - * send it to cluster - * @throws IOException - * @throws java.nio.channels.ClosedChannelException - */ - protected void listen() throws Exception { - if (doListen()) { - log.warn("ServerSocketChannel already started"); - return; - } - - setListen(true); - - while (doListen() && selector != null) { - // this may block for a long time, upon return the - // selected set contains keys of the ready channels - try { - events(); - int n = selector.select(getTcpSelectorTimeout()); - if (n == 0) { - //there is a good chance that we got here - //because the TcpReplicationThread called - //selector wakeup(). - //if that happens, we must ensure that that - //thread has enough time to call interestOps -// synchronized (interestOpsMutex) { - //if we got the lock, means there are no - //keys trying to register for the - //interestOps method -// } - continue; // nothing to do - } - // get an iterator over the set of selected keys - Iterator it = selector.selectedKeys().iterator(); - // look at each key in the selected set - while (it.hasNext()) { - SelectionKey key = (SelectionKey) it.next(); - // Is a new connection coming in? - if (key.isAcceptable()) { - ServerSocketChannel server = (ServerSocketChannel) key.channel(); - SocketChannel channel = server.accept(); - channel.socket().setReceiveBufferSize(getRxBufSize()); - channel.socket().setSendBufferSize(getTxBufSize()); - channel.socket().setTcpNoDelay(getTcpNoDelay()); - channel.socket().setKeepAlive(getSoKeepAlive()); - channel.socket().setOOBInline(getOoBInline()); - channel.socket().setReuseAddress(getSoReuseAddress()); - channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); - channel.socket().setTrafficClass(getSoTrafficClass()); - channel.socket().setSoTimeout(getTimeout()); - Object attach = new ObjectReader(channel); - registerChannel(selector, - channel, - SelectionKey.OP_READ, - attach); - } - // is there data to read on this channel? - if (key.isReadable()) { - readDataFromSocket(key); - } else { - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - } - - // remove key from selected set, it's been handled - it.remove(); - } - } catch (java.nio.channels.ClosedSelectorException cse) { - // ignore is normal at shutdown or stop listen socket - } catch (java.nio.channels.CancelledKeyException nx) { - log.warn("Replication client disconnected, error when polling key. Ignoring client."); - } catch (Throwable x) { - try { - log.error("Unable to process request in NioReceiver", x); - }catch ( Throwable tx ) { - tx.printStackTrace(); - } - } - - } - serverChannel.close(); - if (selector != null) - selector.close(); - } - - /** - * Close Selector. - * - * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening() - */ - protected void stopListening() { - // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529 - setListen(false); - if (selector != null) { - try { - for (int i = 0; i < getMaxThreads(); i++) { - selector.wakeup(); - } - selector.close(); - } catch (Exception x) { - log.error("Unable to close cluster receiver selector.", x); - } finally { - selector = null; - } - } - } - - // ---------------------------------------------------------- - - /** - * Register the given channel with the given selector for - * the given operations of interest - */ - protected void registerChannel(Selector selector, - SelectableChannel channel, - int ops, - Object attach) throws Exception { - if (channel == null)return; // could happen - // set the new channel non-blocking - channel.configureBlocking(false); - // register it with the selector - channel.register(selector, ops, attach); - } - - /** - * Start thread and listen - */ - public void run() { - try { - listen(); - } catch (Exception x) { - log.error("Unable to run replication listener.", x); - } - } - - // ---------------------------------------------------------- - - /** - * Sample data handler method for a channel with data ready to read. - * @param key A SelectionKey object associated with a channel - * determined by the selector to be ready for reading. If the - * channel returns an EOF condition, it is closed here, which - * automatically invalidates the associated key. The selector - * will then de-register the channel on the next select call. - */ - protected void readDataFromSocket(SelectionKey key) throws Exception { - NioReplicationThread worker = (NioReplicationThread) getPool().getWorker(); - if (worker == null) { - // No threads available, do nothing, the selection - // loop will keep calling this method until a - // thread becomes available. - // FIXME: This design could be improved. - if (log.isDebugEnabled()) - log.debug("No TcpReplicationThread available"); - } else { - // invoking this wakes up the worker thread then returns - worker.serviceChannel(key); - } - } - - -} +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.transport.nio; + +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.transport.Constants; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.catalina.tribes.transport.ThreadPool; +import org.apache.catalina.tribes.transport.WorkerThread; +import org.apache.catalina.tribes.util.StringManager; +import java.util.LinkedList; +import java.util.Set; +import java.nio.channels.CancelledKeyException; + +/** + * @author Filip Hanik + * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ + */ +public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback { + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class); + + /** + * The string manager for this package. + */ + protected StringManager sm = StringManager.getManager(Constants.Package); + + /** + * The descriptive information about this implementation. + */ + private static final String info = "NioReceiver/1.0"; + + private Selector selector = null; + private ServerSocketChannel serverChannel = null; + + protected LinkedList events = new LinkedList(); +// private Object interestOpsMutex = new Object(); + + public NioReceiver() { + } + + /** + * Return descriptive information about this implementation and the + * corresponding version number, in the format + * <code><description>/<version></code>. + */ + public String getInfo() { + return (info); + } + +// public Object getInterestOpsMutex() { +// return interestOpsMutex; +// } + + public void stop() { + this.stopListening(); + } + + /** + * start cluster receiver + * @throws Exception + * @see org.apache.catalina.tribes.ClusterReceiver#start() + */ + public void start() throws IOException { + try { +// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); + setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this)); + } catch (Exception x) { + log.fatal("ThreadPool can initilzed. Listener not started", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); + } + try { + getBind(); + bind(); + Thread t = new Thread(this, "NioReceiver"); + t.setDaemon(true); + t.start(); + } catch (Exception x) { + log.fatal("Unable to start cluster receiver", x); + if ( x instanceof IOException ) throw (IOException)x; + else throw new IOException(x.getMessage()); + } + } + + public WorkerThread getWorkerThread() { + NioReplicationThread thread = new NioReplicationThread(this,this); + thread.setUseBufferPool(this.getUseBufferPool()); + thread.setRxBufSize(getRxBufSize()); + thread.setOptions(getWorkerThreadOptions()); + return thread; + } + + + + protected void bind() throws IOException { + // allocate an unbound server socket channel + serverChannel = ServerSocketChannel.open(); + // Get the associated ServerSocket to bind it with + ServerSocket serverSocket = serverChannel.socket(); + // create a new Selector for use below + selector = Selector.open(); + // set the port the server channel will listen to + //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); + bind(serverSocket,getTcpListenPort(),getAutoBind()); + // set non-blocking mode for the listening socket + serverChannel.configureBlocking(false); + // register the ServerSocketChannel with the Selector + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + + } + + public void addEvent(Runnable event) { + if ( selector != null ) { + synchronized (events) { + events.add(event); + } + selector.wakeup(); + } + } + + public void events() { + if ( events.size() == 0 ) return; + synchronized (events) { + Runnable r = null; + while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) { + try { + r.run(); + } catch ( Exception x ) { + log.error("",x); + } + } + events.clear(); + } + } + + public static void cancelledKey(SelectionKey key) { + try { + ObjectReader ka = (ObjectReader)key.attachment(); + key.cancel(); + key.channel().close(); + key.attach(null); + if ( ka != null ) ka.finish(); + } catch (IOException e) { + if (log.isDebugEnabled()) log.debug("", e); + // Ignore + } + } + + protected void socketTimeouts() { + //timeout + Set keys = selector.keys(); + long now = System.currentTimeMillis(); + for (Iterator iter = keys.iterator(); iter.hasNext(); ) { + SelectionKey key = (SelectionKey) iter.next(); + try { +// if (key.interestOps() == SelectionKey.OP_READ) { +// //only timeout sockets that we are waiting for a read from +// ObjectReader ka = (ObjectReader) key.attachment(); +// long delta = now - ka.getLastAccess(); +// if (delta > (long) getTimeout()) { +// cancelledKey(key); +// } +// } +// else + if ( key.interestOps() == 0 ) { + //check for keys that didn't make it in. + ObjectReader ka = (ObjectReader) key.attachment(); + if ( ka != null ) { + long delta = now - ka.getLastAccess(); + if (delta > (long) getTimeout() && (!ka.isAccessed())) { + log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms."); + ka.setLastAccess(now); + key.interestOps(SelectionKey.OP_READ); + }//end if + } else { + cancelledKey(key); + }//end if + }//end if + }catch ( CancelledKeyException ckx ) { + cancelledKey(key); + } + } + } + + + /** + * get data from channel and store in byte array + * send it to cluster + * @throws IOException + * @throws java.nio.channels.ClosedChannelException + */ + protected void listen() throws Exception { + if (doListen()) { + log.warn("ServerSocketChannel already started"); + return; + } + + setListen(true); + + while (doListen() && selector != null) { + // this may block for a long time, upon return the + // selected set contains keys of the ready channels + try { + events(); + socketTimeouts(); + int n = selector.select(getTcpSelectorTimeout()); + if (n == 0) { + //there is a good chance that we got here + //because the TcpReplicationThread called + //selector wakeup(). + //if that happens, we must ensure that that + //thread has enough time to call interestOps +// synchronized (interestOpsMutex) { + //if we got the lock, means there are no + //keys trying to register for the + //interestOps method +// } + continue; // nothing to do + } + // get an iterator over the set of selected keys + Iterator it = selector.selectedKeys().iterator(); + // look at each key in the selected set + while (it.hasNext()) { + SelectionKey key = (SelectionKey) it.next(); + // Is a new connection coming in? + if (key.isAcceptable()) { + ServerSocketChannel server = (ServerSocketChannel) key.channel(); + SocketChannel channel = server.accept(); + channel.socket().setReceiveBufferSize(getRxBufSize()); + channel.socket().setSendBufferSize(getTxBufSize()); + channel.socket().setTcpNoDelay(getTcpNoDelay()); + channel.socket().setKeepAlive(getSoKeepAlive()); + channel.socket().setOOBInline(getOoBInline()); + channel.socket().setReuseAddress(getSoReuseAddress()); + channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); + channel.socket().setTrafficClass(getSoTrafficClass()); + channel.socket().setSoTimeout(getTimeout()); + Object attach = new ObjectReader(channel); + registerChannel(selector, + channel, + SelectionKey.OP_READ, + attach); + } + // is there data to read on this channel? + if (key.isReadable()) { + readDataFromSocket(key); + } else { + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + } + + // remove key from selected set, it's been handled + it.remove(); + } + } catch (java.nio.channels.ClosedSelectorException cse) { + // ignore is normal at shutdown or stop listen socket + } catch (java.nio.channels.CancelledKeyException nx) { + log.warn("Replication client disconnected, error when polling key. Ignoring client."); + } catch (Throwable x) { + try { + log.error("Unable to process request in NioReceiver", x); + }catch ( Throwable tx ) { + //in case an out of memory error, will affect the logging framework as well + tx.printStackTrace(); + } + } + + } + serverChannel.close(); + if (selector != null) + selector.close(); + } + + + + /** + * Close Selector. + * + * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening() + */ + protected void stopListening() { + // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529 + setListen(false); + if (selector != null) { + try { + for (int i = 0; i < getMaxThreads(); i++) { + selector.wakeup(); + } + selector.close(); + } catch (Exception x) { + log.error("Unable to close cluster receiver selector.", x); + } finally { + selector = null; + } + } + } + + // ---------------------------------------------------------- + + /** + * Register the given channel with the given selector for + * the given operations of interest + */ + protected void registerChannel(Selector selector, + SelectableChannel channel, + int ops, + Object attach) throws Exception { + if (channel == null)return; // could happen + // set the new channel non-blocking + channel.configureBlocking(false); + // register it with the selector + channel.register(selector, ops, attach); + } + + /** + * Start thread and listen + */ + public void run() { + try { + listen(); + } catch (Exception x) { + log.error("Unable to run replication listener.", x); + } + } + + // ---------------------------------------------------------- + + /** + * Sample data handler method for a channel with data ready to read. + * @param key A SelectionKey object associated with a channel + * determined by the selector to be ready for reading. If the + * channel returns an EOF condition, it is closed here, which + * automatically invalidates the associated key. The selector + * will then de-register the channel on the next select call. + */ + protected void readDataFromSocket(SelectionKey key) throws Exception { + NioReplicationThread worker = (NioReplicationThread) getPool().getWorker(); + if (worker == null) { + // No threads available, do nothing, the selection + // loop will keep calling this method until a + // thread becomes available. + // FIXME: This design could be improved. + if (log.isDebugEnabled()) + log.debug("No TcpReplicationThread available"); + } else { + // invoking this wakes up the worker thread then returns + worker.serviceChannel(key); + } + } + + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418516&r1=418515&r2=418516&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul 1 15:52:11 2006 @@ -1,244 +1,256 @@ -/* - * Copyright 1999,2004 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.catalina.tribes.transport.nio; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; - -import org.apache.catalina.tribes.io.ObjectReader; -import org.apache.catalina.tribes.transport.Constants; -import org.apache.catalina.tribes.transport.WorkerThread; -import org.apache.catalina.tribes.ChannelMessage; -import org.apache.catalina.tribes.io.ListenCallback; -import org.apache.catalina.tribes.io.ChannelData; -import org.apache.catalina.tribes.io.BufferPool; -import java.nio.channels.CancelledKeyException; - -/** - * A worker thread class which can drain channels and echo-back the input. Each - * instance is constructed with a reference to the owning thread pool object. - * When started, the thread loops forever waiting to be awakened to service the - * channel associated with a SelectionKey object. The worker is tasked by - * calling its serviceChannel() method with a SelectionKey object. The - * serviceChannel() method stores the key reference in the thread object then - * calls notify() to wake it up. When the channel has been drained, the worker - * thread returns itself to its parent pool. - * - * @author Filip Hanik - * - * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ - */ -public class NioReplicationThread extends WorkerThread { - - private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class ); - private ByteBuffer buffer = null; - private SelectionKey key; - private int rxBufSize; - private NioReceiver receiver; - public NioReplicationThread (ListenCallback callback, NioReceiver receiver) - { - super(callback); - this.receiver = receiver; - } - - // loop forever waiting for work to do - public synchronized void run() - { - if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { - buffer = ByteBuffer.allocateDirect(getRxBufSize()); - }else { - buffer = ByteBuffer.allocate (getRxBufSize()); - } - while (isDoRun()) { - try { - // sleep and release object lock - this.wait(); - } catch (InterruptedException e) { - if(log.isInfoEnabled()) - log.info("TCP worker thread interrupted in cluster",e); - // clear interrupt status - Thread.interrupted(); - } - if (key == null) { - continue; // just in case - } - try { - drainChannel (key); - } catch (Exception e) { - //this is common, since the sockets on the other - //end expire after a certain time. - if ( e instanceof CancelledKeyException ) { - //do nothing - } else if ( e instanceof IOException ) { - //dont spew out stack traces for IO exceptions unless debug is enabled. - if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e); - else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed."); - } else if ( log.isErrorEnabled() ) { - //this is a real error, log it. - log.error("Exception caught in TcpReplicationThread.drainChannel.",e); - } - - // close channel and nudge selector - try { - key.channel().close(); - } catch (IOException ex) { - log.error("Unable to close channel.",ex); - } - key.selector().wakeup(); - } - key = null; - // done, ready for more, return to pool - getPool().returnWorker (this); - } - } - - /** - * Called to initiate a unit of work by this worker thread - * on the provided SelectionKey object. This method is - * synchronized, as is the run() method, so only one key - * can be serviced at a given time. - * Before waking the worker thread, and before returning - * to the main selection loop, this key's interest set is - * updated to remove OP_READ. This will cause the selector - * to ignore read-readiness for this channel while the - * worker thread is servicing it. - */ - public synchronized void serviceChannel (SelectionKey key) { - this.key = key; - key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); - key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); - this.notify(); // awaken the thread - } - - /** - * The actual code which drains the channel associated with - * the given key. This method assumes the key has been - * modified prior to invocation to turn off selection - * interest in OP_READ. When this method completes it - * re-enables OP_READ and calls wakeup() on the selector - * so the selector will resume watching this channel. - */ - protected void drainChannel (final SelectionKey key) throws Exception { - SocketChannel channel = (SocketChannel) key.channel(); - int count; - buffer.clear(); // make buffer empty - ObjectReader reader = (ObjectReader)key.attachment(); - reader.setLastAccess(System.currentTimeMillis()); - // loop while data available, channel is non-blocking - while ((count = channel.read (buffer)) > 0) { - buffer.flip(); // make buffer readable - if ( buffer.hasArray() ) - reader.append(buffer.array(),0,count,false); - else - reader.append(buffer,count,false); - buffer.clear(); // make buffer empty - } - - int pkgcnt = reader.count(); - - if ( pkgcnt > 0 ) { - ChannelMessage[] msgs = reader.execute(); - for ( int i=0; i<msgs.length; i++ ) { - /** - * Use send ack here if you want to ack the request to the remote - * server before completing the request - * This is considered an asynchronized request - */ - if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); - try { - //process the message - getCallback().messageDataReceived(msgs[i]); - /** - * Use send ack here if you want the request to complete on this - * server before sending the ack to the remote server - * This is considered a synchronized request - */ - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); - }catch ( Exception e ) { - log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); - } - if ( getUseBufferPool() ) { - BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); - msgs[i].setMessage(null); - } - } - } - - - - - if (count < 0) { - // close channel on EOF, invalidates the key - if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting"); - channel.close(); - return; - } - - //acquire the interestOps mutex - Runnable r = new Runnable() { - public void run() { - try { - if (key.isValid()) { - // cycle the selector so this key is active again - key.selector().wakeup(); - // resume interest in OP_READ, OP_WRITE - int resumeOps = key.interestOps() | SelectionKey.OP_READ; - key.interestOps(resumeOps); - } - } catch (Exception x) { - try { - key.selector().close(); - } catch (Exception ignore) {} - log.error("Unable to cycle the selector, connection disconnected?", x); - } - } - }; - receiver.addEvent(r); - - } - - - - - - /** - * send a reply-acknowledgement (6,2,3) - * @param key - * @param channel - */ - protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) { - - try { - channel.write(ByteBuffer.wrap(command)); - if (log.isTraceEnabled()) { - log.trace("ACK sent to " + channel.socket().getPort()); - } - } catch ( java.io.IOException x ) { - log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); - } - } - - public void setRxBufSize(int rxBufSize) { - this.rxBufSize = rxBufSize; - } - - public int getRxBufSize() { - return rxBufSize; - } -} +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.transport.nio; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.transport.Constants; +import org.apache.catalina.tribes.transport.WorkerThread; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.io.BufferPool; +import java.nio.channels.CancelledKeyException; + +/** + * A worker thread class which can drain channels and echo-back the input. Each + * instance is constructed with a reference to the owning thread pool object. + * When started, the thread loops forever waiting to be awakened to service the + * channel associated with a SelectionKey object. The worker is tasked by + * calling its serviceChannel() method with a SelectionKey object. The + * serviceChannel() method stores the key reference in the thread object then + * calls notify() to wake it up. When the channel has been drained, the worker + * thread returns itself to its parent pool. + * + * @author Filip Hanik + * + * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ + */ +public class NioReplicationThread extends WorkerThread { + + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class ); + private ByteBuffer buffer = null; + private SelectionKey key; + private int rxBufSize; + private NioReceiver receiver; + public NioReplicationThread (ListenCallback callback, NioReceiver receiver) + { + super(callback); + this.receiver = receiver; + } + + // loop forever waiting for work to do + public synchronized void run() + { + if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { + buffer = ByteBuffer.allocateDirect(getRxBufSize()); + }else { + buffer = ByteBuffer.allocate (getRxBufSize()); + } + while (isDoRun()) { + try { + // sleep and release object lock + this.wait(); + } catch (InterruptedException e) { + if(log.isInfoEnabled()) + log.info("TCP worker thread interrupted in cluster",e); + // clear interrupt status + Thread.interrupted(); + } + if (key == null) { + continue; // just in case + } + try { + drainChannel (key); + } catch (Exception e) { + //this is common, since the sockets on the other + //end expire after a certain time. + if ( e instanceof CancelledKeyException ) { + //do nothing + } else if ( e instanceof IOException ) { + //dont spew out stack traces for IO exceptions unless debug is enabled. + if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e); + else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed."); + } else if ( log.isErrorEnabled() ) { + //this is a real error, log it. + log.error("Exception caught in TcpReplicationThread.drainChannel.",e); + } + cancelKey(key); + } + key = null; + // done, ready for more, return to pool + getPool().returnWorker (this); + } + } + + /** + * Called to initiate a unit of work by this worker thread + * on the provided SelectionKey object. This method is + * synchronized, as is the run() method, so only one key + * can be serviced at a given time. + * Before waking the worker thread, and before returning + * to the main selection loop, this key's interest set is + * updated to remove OP_READ. This will cause the selector + * to ignore read-readiness for this channel while the + * worker thread is servicing it. + */ + public synchronized void serviceChannel (SelectionKey key) { + this.key = key; + key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); + key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); + this.notify(); // awaken the thread + } + + /** + * The actual code which drains the channel associated with + * the given key. This method assumes the key has been + * modified prior to invocation to turn off selection + * interest in OP_READ. When this method completes it + * re-enables OP_READ and calls wakeup() on the selector + * so the selector will resume watching this channel. + */ + protected void drainChannel (final SelectionKey key) throws Exception { + SocketChannel channel = (SocketChannel) key.channel(); + int count; + buffer.clear(); // make buffer empty + ObjectReader reader = (ObjectReader)key.attachment(); + reader.setLastAccess(System.currentTimeMillis()); + try { + reader.access(); + + // loop while data available, channel is non-blocking + while ((count = channel.read (buffer)) > 0) { + buffer.flip(); // make buffer readable + if ( buffer.hasArray() ) + reader.append(buffer.array(),0,count,false); + else + reader.append(buffer,count,false); + buffer.clear(); // make buffer empty + } + + int pkgcnt = reader.count(); + + if ( pkgcnt > 0 ) { + ChannelMessage[] msgs = reader.execute(); + for ( int i=0; i<msgs.length; i++ ) { + /** + * Use send ack here if you want to ack the request to the remote + * server before completing the request + * This is considered an asynchronized request + */ + if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + try { + //process the message + getCallback().messageDataReceived(msgs[i]); + /** + * Use send ack here if you want the request to complete on this + * server before sending the ack to the remote server + * This is considered a synchronized request + */ + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); + }catch ( Exception e ) { + log.error("Processing of cluster message failed.",e); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); + } + if ( getUseBufferPool() ) { + BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); + msgs[i].setMessage(null); + } + } + } + } finally { + reader.finish(); + } + + + if (count < 0) { + // close channel on EOF, invalidates the key + if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting"); + cancelKey(key); + return; + } + + //register our OP_READ interest + Runnable r = new Runnable() { + public void run() { + try { + if (key.isValid()) { + // cycle the selector so this key is active again + key.selector().wakeup(); + // resume interest in OP_READ, OP_WRITE + int resumeOps = key.interestOps() | SelectionKey.OP_READ; + key.interestOps(resumeOps); + } + } catch (CancelledKeyException ckx ) { + NioReceiver.cancelledKey(key); + } catch (Exception x) { + try { + key.selector().close(); + } catch (Exception ignore) {} + log.error("Unable to cycle the selector, connection disconnected?", x); + } + } + }; + receiver.addEvent(r); + + } + + private void cancelKey(final SelectionKey key) { + Runnable cx = new Runnable() { + public void run() { + NioReceiver.cancelledKey(key); + } + }; + receiver.addEvent(cx); + } + + + + + + /** + * send a reply-acknowledgement (6,2,3) + * @param key + * @param channel + */ + protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) { + + try { + ByteBuffer buf = ByteBuffer.wrap(command); + int total = 0; + while ( total < command.length ) { + total += channel.write(buf); + } + if (log.isTraceEnabled()) { + log.trace("ACK sent to " + channel.socket().getPort()); + } + } catch ( java.io.IOException x ) { + log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); + } + } + + public void setRxBufSize(int rxBufSize) { + this.rxBufSize = rxBufSize; + } + + public int getRxBufSize() { + return rxBufSize; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]