Author: fhanik Date: Sat Jul 1 13:23:27 2006 New Revision: 418507 URL: http://svn.apache.org/viewvc?rev=418507&view=rev Log: added a last access method to the object reader, so that we can track when a socket was last accessed. again, SVN is playing a trick on me, with the line endings
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=418507&r1=418506&r2=418507&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Sat Jul 1 13:23:27 2006 @@ -1,126 +1,135 @@ -/* - * Copyright 1999,2004-2005 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.catalina.tribes.io; - -import java.io.IOException; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; - -import org.apache.catalina.tribes.ChannelMessage; - - - -/** - * The object reader object is an object used in conjunction with - * java.nio TCP messages. This object stores the message bytes in a - * <code>XByteBuffer</code> until a full package has been received. - * This object uses an XByteBuffer which is an extendable object buffer that also allows - * for message encoding and decoding. - * - * @author Filip Hanik - * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $ - */ -public class ObjectReader { - - protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ObjectReader.class); - - private XByteBuffer buffer; - - /** - * Creates an <code>ObjectReader</code> for a TCP NIO socket channel - * @param channel - the channel to be read. - */ - public ObjectReader(SocketChannel channel) { - this(channel.socket()); - } - - /** - * Creates an <code>ObjectReader</code> for a TCP socket - * @param socket Socket - */ - public ObjectReader(Socket socket) { - try{ - this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true); - }catch ( IOException x ) { - //unable to get buffer size - log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes."); - this.buffer = new XByteBuffer(43800,true); - } - } - - /** - * Append new bytes to buffer. - * @see XByteBuffer#countPackages() - * @param data new transfer buffer - * @param off offset - * @param len length in buffer - * @return number of messages that sended to callback - * @throws java.io.IOException - */ - public int append(ByteBuffer data, int len, boolean count) throws java.io.IOException { - buffer.append(data,len); - int pkgCnt = -1; - if ( count ) pkgCnt = buffer.countPackages(); - return pkgCnt; - } - - public int append(byte[] data,int off,int len, boolean count) throws java.io.IOException { - buffer.append(data,off,len); - int pkgCnt = -1; - if ( count ) pkgCnt = buffer.countPackages(); - return pkgCnt; - } - - /** - * Send buffer to cluster listener (callback). - * Is message complete receiver send message to callback? - * - * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#messageDataReceived(ChannelMessage) - * @see XByteBuffer#doesPackageExist() - * @see XByteBuffer#extractPackage(boolean) - * - * @return number of received packages/messages - * @throws java.io.IOException - */ - public ChannelMessage[] execute() throws java.io.IOException { - int pkgCnt = buffer.countPackages(); - ChannelMessage[] result = new ChannelMessage[pkgCnt]; - for (int i=0; i<pkgCnt; i++) { - ChannelMessage data = buffer.extractPackage(true); - result[i] = data; - } - return result; - } - - public int bufferSize() { - return buffer.getLength(); - } - - /** - * Returns the number of packages that the reader has read - * @return int - */ - public int count() { - return buffer.countPackages(); - } - - public void close() { - this.buffer = null; - } - -} +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.io; + +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.catalina.tribes.ChannelMessage; + + + +/** + * The object reader object is an object used in conjunction with + * java.nio TCP messages. This object stores the message bytes in a + * <code>XByteBuffer</code> until a full package has been received. + * This object uses an XByteBuffer which is an extendable object buffer that also allows + * for message encoding and decoding. + * + * @author Filip Hanik + * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $ + */ +public class ObjectReader { + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ObjectReader.class); + + private XByteBuffer buffer; + + protected long lastAccess = System.currentTimeMillis(); + + /** + * Creates an <code>ObjectReader</code> for a TCP NIO socket channel + * @param channel - the channel to be read. + */ + public ObjectReader(SocketChannel channel) { + this(channel.socket()); + } + + /** + * Creates an <code>ObjectReader</code> for a TCP socket + * @param socket Socket + */ + public ObjectReader(Socket socket) { + try{ + this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true); + }catch ( IOException x ) { + //unable to get buffer size + log.warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes."); + this.buffer = new XByteBuffer(43800,true); + } + } + + /** + * Append new bytes to buffer. + * @see XByteBuffer#countPackages() + * @param data new transfer buffer + * @param off offset + * @param len length in buffer + * @return number of messages that sended to callback + * @throws java.io.IOException + */ + public int append(ByteBuffer data, int len, boolean count) throws java.io.IOException { + buffer.append(data,len); + int pkgCnt = -1; + if ( count ) pkgCnt = buffer.countPackages(); + return pkgCnt; + } + + public int append(byte[] data,int off,int len, boolean count) throws java.io.IOException { + buffer.append(data,off,len); + int pkgCnt = -1; + if ( count ) pkgCnt = buffer.countPackages(); + return pkgCnt; + } + + /** + * Send buffer to cluster listener (callback). + * Is message complete receiver send message to callback? + * + * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#messageDataReceived(ChannelMessage) + * @see XByteBuffer#doesPackageExist() + * @see XByteBuffer#extractPackage(boolean) + * + * @return number of received packages/messages + * @throws java.io.IOException + */ + public ChannelMessage[] execute() throws java.io.IOException { + int pkgCnt = buffer.countPackages(); + ChannelMessage[] result = new ChannelMessage[pkgCnt]; + for (int i=0; i<pkgCnt; i++) { + ChannelMessage data = buffer.extractPackage(true); + result[i] = data; + } + return result; + } + + public int bufferSize() { + return buffer.getLength(); + } + + /** + * Returns the number of packages that the reader has read + * @return int + */ + public int count() { + return buffer.countPackages(); + } + + public void close() { + this.buffer = null; + } + + public long getLastAccess() { + return lastAccess; + } + + public void setLastAccess(long lastAccess) { + this.lastAccess = lastAccess; + } + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418507&r1=418506&r2=418507&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul 1 13:23:27 2006 @@ -197,10 +197,8 @@ SelectionKey key = (SelectionKey) it.next(); // Is a new connection coming in? if (key.isAcceptable()) { - ServerSocketChannel server = - (ServerSocketChannel) key.channel(); + ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); - channel.socket().setReceiveBufferSize(getRxBufSize()); channel.socket().setSendBufferSize(getTxBufSize()); channel.socket().setTcpNoDelay(getTcpNoDelay()); @@ -229,10 +227,13 @@ } catch (java.nio.channels.ClosedSelectorException cse) { // ignore is normal at shutdown or stop listen socket } catch (java.nio.channels.CancelledKeyException nx) { - log.warn( - "Replication client disconnected, error when polling key. Ignoring client."); - } catch (Exception x) { - log.error("Unable to process request in NioReceiver", x); + log.warn("Replication client disconnected, error when polling key. Ignoring client."); + } catch (Throwable x) { + try { + log.error("Unable to process request in NioReceiver", x); + }catch ( Throwable tx ) { + tx.printStackTrace(); + } } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418507&r1=418506&r2=418507&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul 1 13:23:27 2006 @@ -138,6 +138,7 @@ int count; buffer.clear(); // make buffer empty ObjectReader reader = (ObjectReader)key.attachment(); + reader.setLastAccess(System.currentTimeMillis()); // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]