Author: fhanik Date: Mon Mar 13 18:00:05 2006 New Revision: 385711 URL: http://svn.apache.org/viewcvs?rev=385711&view=rev Log: Completed more of the interfaces
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java Mon Mar 13 18:00:05 2006 @@ -15,14 +15,7 @@ */ package org.apache.catalina.tribes.tcp; -import java.io.IOException; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelMessage; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.tcp.DataSender; -import org.apache.catalina.tribes.tcp.MultiPointSender; -import org.apache.catalina.tribes.tcp.PooledSender; /** * <p>Title: </p> @@ -41,6 +34,7 @@ protected boolean useDirectBuffer; protected int maxRetryAttempts; protected boolean autoConnect; + protected int keepAliveCount; public AbstractPooledSender() { super(); } @@ -61,6 +55,10 @@ this.autoConnect = autoConnect; } + public void setKeepAliveCount(int keepAliveCount) { + this.keepAliveCount = keepAliveCount; + } + public boolean getSuspect() { return suspect; } @@ -75,5 +73,9 @@ public boolean isAutoConnect() { return autoConnect; + } + + public int getKeepAliveCount() { + return keepAliveCount; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Mar 13 18:00:05 2006 @@ -15,7 +15,7 @@ */ package org.apache.catalina.tribes.tcp; -import org.apache.catalina.tribes.ChannelException; +import java.io.IOException; /** * <p>Title: </p> @@ -30,7 +30,7 @@ * @version 1.0 */ public interface DataSender { - public void connect() throws ChannelException; + public void connect() throws IOException; public void disconnect(); public boolean isConnected(); public void setRxBufSize(int size); @@ -38,4 +38,5 @@ public boolean keepalive(); public void setTimeout(long timeout); public void setWaitForAck(boolean isWaitForAck); + public void setKeepAliveCount(int maxRequests); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java Mon Mar 13 18:00:05 2006 @@ -35,7 +35,7 @@ public void setSuspect(boolean suspect); public boolean getSuspect(); public void memberAdded(Member member); - public void memberRemoved(Member member); + public void memberDisappeared(Member member); public void setAutoConnect(boolean auto); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java Mon Mar 13 18:00:05 2006 @@ -15,10 +15,9 @@ */ package org.apache.catalina.tribes.tcp; +import java.io.IOException; import java.util.List; -import org.apache.catalina.tribes.ChannelException; - /** * <p>Title: </p> * @@ -31,7 +30,7 @@ * @author not attributable * @version 1.0 */ -public abstract class PooledSender implements DataSender { +public abstract class PooledSender implements MultiPointSender { private SenderQueue queue = null; private boolean connected; @@ -40,6 +39,7 @@ private boolean waitForAck; private long timeout; private int poolSize = 25; + private boolean suspect; public PooledSender() { queue = new SenderQueue(this,poolSize); @@ -56,7 +56,7 @@ queue.returnSender(sender); } - public synchronized void connect() throws ChannelException { + public synchronized void connect() throws IOException { //do nothing, happens in the socket sender itself queue.open(); setConnected(true); @@ -99,6 +99,10 @@ public void setPoolSize(int poolSize) { this.poolSize = poolSize; queue.setLimit(poolSize); + } + + public void setSuspect(Boolean suspect) { + this.suspect = suspect; } public boolean isConnected() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Mon Mar 13 18:00:05 2006 @@ -147,7 +147,7 @@ * @see org.apache.catalina.tribes.ClusterSender#remove(org.apache.catalina.tribes.Member) */ public synchronized void remove(Member member) { - getTransport().memberRemoved(member); + getTransport().memberDisappeared(member); } // ------------------------------------------------------------- protected Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Mon Mar 13 18:00:05 2006 @@ -18,8 +18,8 @@ import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; -import java.net.SocketException; import java.util.Arrays; import org.apache.catalina.tribes.ChannelException; @@ -28,7 +28,6 @@ import org.apache.catalina.tribes.tcp.DataSender; import org.apache.catalina.tribes.tcp.SenderState; import org.apache.catalina.util.StringManager; -import java.net.InetSocketAddress; /** * Send cluster messages with only one socket. Ack and keep Alive Handling is @@ -100,7 +99,7 @@ /** * max requests before reconnecting (default -1 unlimited) */ - private int keepAliveMaxRequestCount = -1; + private int keepAliveCount = -1; /** * Last connect timestamp @@ -110,7 +109,7 @@ /** * keepalive counter */ - protected int keepAliveCount = 0; + protected int requestCount = 0; /** * wait for receiver Ack @@ -222,12 +221,12 @@ this.keepAliveTimeout = keepAliveTimeout; } - public int getKeepAliveMaxRequestCount() { - return keepAliveMaxRequestCount; + public int getKeepAliveCount() { + return keepAliveCount; } - public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) { - this.keepAliveMaxRequestCount = keepAliveMaxRequestCount; + public void setKeepAliveCount(int keepAliveMaxRequestCount) { + this.keepAliveCount = keepAliveMaxRequestCount; } /** @@ -240,8 +239,8 @@ /** * @return Returns the keepAliveCount. */ - public int getKeepAliveCount() { - return keepAliveCount; + public int getRequestCount() { + return requestCount; } /** @@ -298,12 +297,8 @@ * Connect other cluster member receiver * @see org.apache.catalina.tribes.tcp.IDataSender#connect() */ - public void connect() throws ChannelException { - try { - openSocket(); - }catch ( Exception x ) { - throw new ChannelException(x); - } + public void connect() throws IOException { + openSocket(); } @@ -334,7 +329,7 @@ boolean isCloseSocket = true ; if(isConnected()) { if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout) - || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) { + || (keepAliveCount > -1 && requestCount >= keepAliveCount)) { closeSocket(); } else isCloseSocket = false ; @@ -380,7 +375,7 @@ socket.setReceiveBufferSize(getRxBufSize()); socket.setSoTimeout( (int) timeout); connected = true; - this.keepAliveCount = 0; + this.requestCount = 0; this.keepAliveConnectTime = System.currentTimeMillis(); if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port), new Long(0))); @@ -409,7 +404,7 @@ socket = null; } } - this.keepAliveCount = 0; + this.requestCount = 0; connected = false; if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.closeSocket",address.getHostAddress(), new Integer(port),new Long(0))); @@ -463,7 +458,7 @@ closeSocket(); } } finally { - this.keepAliveCount++; + this.requestCount++; keepalive(); if(messageTransfered) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Mon Mar 13 18:00:05 2006 @@ -11,6 +11,7 @@ import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.tcp.MultiPointSender; import org.apache.catalina.tribes.tcp.SenderState; +import java.io.IOException; /** * <p>Title: </p> @@ -59,7 +60,7 @@ - private BioSender[] setupForSend(Member[] destination) throws ChannelException { + protected BioSender[] setupForSend(Member[] destination) throws ChannelException { ChannelException cx = null; BioSender[] result = new BioSender[destination.length]; for ( int i=0; i<destination.length; i++ ) { @@ -68,6 +69,10 @@ if (sender == null) { InetAddress dest = InetAddress.getByAddress(destination[i].getHost()); sender = new BioSender(dest, destination[i].getPort(), new SenderState(), rxBufSize, txBufSize); + sender.setKeepAliveCount(keepAliveCount); + sender.setTimeout(timeout); + //sender.setResend(); + //sender.setKeepAliveTimeout(); bioSenders.put(destination[i], sender); } sender.setWaitForAck(waitForAck); @@ -83,7 +88,7 @@ else return result; } - public void connect() { + public void connect() throws IOException { //do nothing, we connect on demand setConnected(true); } @@ -110,7 +115,7 @@ } - public void memberRemoved(Member member) { + public void memberDisappeared(Member member) { //disconnect senders BioSender sender = (BioSender)bioSenders.remove(member); if ( sender != null ) sender.disconnect(); @@ -172,6 +177,10 @@ public void setAutoConnect(boolean autoConnect) { this.autoConnect = autoConnect; + } + + public void setKeepAliveCount(int keepAliveCount) { + this.keepAliveCount = keepAliveCount; } public boolean keepalive() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledMultiSender.java Mon Mar 13 18:00:05 2006 @@ -2,6 +2,10 @@ import org.apache.catalina.tribes.tcp.DataSender; import org.apache.catalina.tribes.tcp.PooledSender; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.tcp.MultiPointSender; +import org.apache.catalina.tribes.ChannelMessage; /** * <p>Title: </p> @@ -26,10 +30,16 @@ protected int txBufSize = 25188; protected boolean suspect = false; private boolean autoConnect; + private boolean useDirectBuffer; - public PooledMultiSender() { } + + public void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException { + MultiPointSender sender = (MultiPointSender)getSender(); + sender.sendMessage(destination,msg); + + } /** * getNewDataSender @@ -61,11 +71,32 @@ this.keepAliveCount = keepAliveCount; } - public void setRetryAttempts(int retryAttempts) { + public void setMaxRetryAttempts(int retryAttempts) { this.retryAttempts = retryAttempts; } public void setSuspect(boolean suspect) { this.suspect = suspect; } + + public void setUseDirectBuffer(boolean useDirectBuffer) { + this.useDirectBuffer = useDirectBuffer; + } + + public boolean getSuspect() { + return suspect; + } + + public boolean isUseDirectBuffer() { + return useDirectBuffer; + } + + public void memberAdded(Member member) { + + } + + public void memberDisappeared(Member member) { + //disconnect senders + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Mon Mar 13 18:00:05 2006 @@ -28,6 +28,7 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.tcp.DataSender; /** * This class is NOT thread safe and should never be used with more than one thread at a time @@ -43,7 +44,7 @@ * @author Filip Hanik * @version 1.0 */ -public class NioSender { +public class NioSender implements DataSender{ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioSender.class); @@ -67,8 +68,10 @@ protected int curPos=0; protected XByteBuffer ackbuf = new XByteBuffer(128,true); protected int remaining = 0; - private boolean complete; - private int attempt; + protected boolean complete; + protected int attempt; + protected int keepAliveCount; + protected long timeout; public NioSender(Member destination) { this.destination = destination; @@ -90,6 +93,7 @@ connected = true; socketChannel.socket().setSendBufferSize(txBufSize); socketChannel.socket().setReceiveBufferSize(rxBufSize); + socketChannel.socket().setSoTimeout((int)timeout); if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); return false; } else { @@ -190,6 +194,7 @@ socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(addr); + socketChannel.register(getSelector(),SelectionKey.OP_CONNECT,this); } @@ -321,6 +326,10 @@ return attempt; } + public long getTimeout() { + return timeout; + } + /** * setRxBufSize * @@ -375,5 +384,13 @@ public void setAttempt(int attempt) { this.attempt = attempt; + } + + public void setKeepAliveCount(int keepAliveCount) { + this.keepAliveCount = keepAliveCount; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Mon Mar 13 18:00:05 2006 @@ -47,7 +47,7 @@ protected long selectTimeout = 1000; protected boolean waitForAck = false; protected int retryAttempts=0; - protected int keepAliveCount = Integer.MAX_VALUE; + protected int keepAliveCount = -1; protected Selector selector; protected HashMap nioSenders = new HashMap(); protected boolean directBuf = false; @@ -177,6 +177,8 @@ sender.setRxBufSize(rxBufSize); sender.setTxBufSize(txBufSize); sender.setWaitForAck(waitForAck); + sender.setTimeout(timeout); + sender.setKeepAliveCount(keepAliveCount); result[i] = sender; } return result; @@ -209,7 +211,7 @@ } - public void memberRemoved(Member member) { + public void memberDisappeared(Member member) { //disconnect senders NioSender sender = (NioSender)nioSenders.remove(member); if ( sender != null ) sender.disconnect(); @@ -271,6 +273,10 @@ public void setAutoConnect(boolean autoConnect) { this.autoConnect = autoConnect; + } + + public void setKeepAliveCount(int keepAliveCount) { + this.keepAliveCount = keepAliveCount; } public boolean keepalive() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java?rev=385711&r1=385710&r2=385711&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java Mon Mar 13 18:00:05 2006 @@ -69,7 +69,7 @@ } - public void memberRemoved(Member member) { + public void memberDisappeared(Member member) { //disconnect senders } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]