Author: fhanik Date: Wed Mar 15 11:32:57 2006 New Revision: 386148 URL: http://svn.apache.org/viewcvs?rev=386148&view=rev Log: Refactored so that sender state is handled on a per member basis, and is not per socket. as suspicion is handled correctly even in a multi threaded environment
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Wed Mar 15 11:32:57 2006 @@ -15,16 +15,15 @@ */ package org.apache.catalina.tribes.group; -import org.apache.catalina.tribes.MembershipService; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelSender; +import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; -import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelSender; import org.apache.catalina.tribes.InterceptorPayload; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipService; import org.apache.catalina.tribes.MessageListener; -import org.apache.catalina.tribes.tcp.*; +import org.apache.catalina.tribes.tcp.SenderState; /** @@ -122,11 +121,13 @@ } public void memberAdded(Member member){ + SenderState.getSenderState(member); if ( clusterSender!=null ) clusterSender.add(member); super.memberAdded(member); } public void memberDisappeared(Member member){ + SenderState.removeSenderState(member); if ( clusterSender!=null ) clusterSender.remove(member); super.memberDisappeared(member); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java Wed Mar 15 11:32:57 2006 @@ -30,7 +30,6 @@ * @version 1.0 */ public abstract class AbstractPooledSender extends PooledSender implements MultiPointSender{ - protected boolean suspect; protected boolean useDirectBuffer = true; protected int maxRetryAttempts; protected boolean autoConnect; @@ -38,10 +37,6 @@ public AbstractPooledSender() { super(); } - - public void setSuspect(boolean suspect) { - this.suspect = suspect; - } public void setUseDirectBuffer(boolean useDirectBuffer) { this.useDirectBuffer = useDirectBuffer; @@ -57,10 +52,6 @@ public void setKeepAliveCount(int keepAliveCount) { this.keepAliveCount = keepAliveCount; - } - - public boolean getSuspect() { - return suspect; } public boolean getUseDirectBuffer() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java Wed Mar 15 11:32:57 2006 @@ -32,8 +32,6 @@ public void setTxBufSize(int size); public void setMaxRetryAttempts(int attempts); public void setUseDirectBuffer(boolean directBuf); - public void setSuspect(boolean suspect); - public boolean getSuspect(); public void memberAdded(Member member); public void memberDisappeared(Member member); public void setAutoConnect(boolean auto); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java Wed Mar 15 11:32:57 2006 @@ -39,8 +39,6 @@ private boolean waitForAck; private long timeout; private int poolSize = 25; - private boolean suspect; - public PooledSender() { queue = new SenderQueue(this,poolSize); } @@ -99,14 +97,6 @@ public void setPoolSize(int poolSize) { this.poolSize = poolSize; queue.setLimit(poolSize); - } - - public void setSuspect(boolean suspect) { - this.suspect = suspect; - } - - public boolean getSuspect() { - return suspect; } public boolean isConnected() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java Wed Mar 15 11:32:57 2006 @@ -16,6 +16,9 @@ package org.apache.catalina.tribes.tcp; +import org.apache.catalina.tribes.Member; +import java.util.HashMap; + /** * Send cluster messages with a pool of sockets (25). @@ -36,6 +39,30 @@ * The descriptive information about this implementation. */ private static final String info = "SenderState/1.0"; + + + protected static HashMap memberStates = new HashMap(); + + public static SenderState getSenderState(Member member) { + SenderState state = (SenderState)memberStates.get(member); + if ( state == null ) { + synchronized ( memberStates ) { + state = (SenderState)memberStates.get(member); + if ( state == null ) { + state = new SenderState(); + memberStates.put(member,state); + } + } + } + return state; + } + + public static void removeSenderState(Member member) { + synchronized ( memberStates ) { + memberStates.remove(member); + } + } + // ----------------------------------------------------- Instance Variables @@ -44,11 +71,11 @@ // ----------------------------------------------------- Constructor - public SenderState() { + private SenderState() { this(READY); } - public SenderState(int state) { + private SenderState(int state) { this.state = state; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Wed Mar 15 11:32:57 2006 @@ -29,6 +29,8 @@ import org.apache.catalina.util.StringManager; import java.io.OutputStream; import java.io.InputStream; +import org.apache.catalina.tribes.Member; +import java.net.UnknownHostException; /** * Send cluster messages with only one socket. Ack and keep Alive Handling is @@ -64,6 +66,7 @@ * receiver port */ private int port; + protected Member member; /** @@ -79,11 +82,6 @@ private boolean connected = false; /** - * sender is in suspect state (last transfer failed) - */ - private SenderState senderState = new SenderState(); - - /** * wait time for ack */ private long timeout; @@ -137,19 +135,16 @@ // ------------------------------------------------------------- Constructor - public BioSender(InetAddress host, int port) { - this.address = host; - this.port = port; + public BioSender(Member member) throws UnknownHostException { + this.member = member; + this.address = InetAddress.getByAddress(member.getHost()); + this.port = member.getPort(); if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.create",address, new Integer(port))); } - public BioSender(InetAddress host, int port, SenderState state) { - this(host,port); - if ( state != null ) this.senderState = state; - } - public BioSender(InetAddress host, int port, SenderState state, int rxBufSize, int txBufSize) { - this(host,port,state); + public BioSender(Member member, int rxBufSize, int txBufSize) throws UnknownHostException { + this(member); this.rxBufSize = rxBufSize; this.txBufSize = txBufSize; } @@ -193,21 +188,6 @@ return connected; } - public boolean isSuspect() { - return senderState.isSuspect() || senderState.isFailing(); - } - - public boolean getSuspect() { - return isSuspect(); - } - - public void setSuspect(boolean suspect) { - if ( suspect ) - this.senderState.setSuspect(); - else - this.senderState.setReady(); - } - public long getTimeout() { return timeout; } @@ -274,10 +254,6 @@ this.resend = resend; } - public SenderState getSenderState() { - return senderState; - } - public int getRxBufSize() { return rxBufSize; } @@ -413,7 +389,7 @@ if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.openSocket", address.getHostAddress(), new Integer(port), new Long(0))); } catch (IOException ex1) { - getSenderState().setSuspect(); + SenderState.getSenderState(member).setSuspect(); if (log.isDebugEnabled()) log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(), new Integer(port),new Long(0)), ex1); throw (ex1); @@ -514,8 +490,8 @@ } } catch (IOException x) { String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.timeout)); - if ( !this.isSuspect() ) { - this.setSuspect(true); + if ( !SenderState.getSenderState(member).isSuspect() ) { + SenderState.getSenderState(member).setSuspect(); if ( log.isWarnEnabled() ) log.warn(errmsg, x); } else { if ( log.isDebugEnabled() )log.debug(errmsg, x); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Wed Mar 15 11:32:57 2006 @@ -38,7 +38,6 @@ protected boolean directBuf = false; protected int rxBufSize = 43800; protected int txBufSize = 25188; - protected boolean suspect = false; private boolean connected; private boolean autoConnect; @@ -67,8 +66,7 @@ try { BioSender sender = (BioSender) bioSenders.get(destination[i]); if (sender == null) { - InetAddress dest = InetAddress.getByAddress(destination[i].getHost()); - sender = new BioSender(dest, destination[i].getPort(), new SenderState(), rxBufSize, txBufSize); + sender = new BioSender(destination[i], rxBufSize, txBufSize); sender.setKeepAliveCount(keepAliveCount); sender.setTimeout(timeout); //sender.setResend(); @@ -131,20 +129,12 @@ try {disconnect(); }catch ( Exception ignore){} } - public boolean getSuspect() { - return suspect; - } - public boolean isConnected() { return connected; } public boolean isAutoConnect() { return autoConnect; - } - - public void setSuspect(boolean suspect) { - this.suspect = suspect; } public void setUseDirectBuffer(boolean directBuf) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Wed Mar 15 11:32:57 2006 @@ -48,7 +48,6 @@ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioSender.class); - protected boolean suspect = false; protected boolean connected = false; protected boolean waitForAck = false; protected int rxBufSize = 25188; @@ -277,16 +276,6 @@ return false; } /** - * getSuspect - * - * @return boolean - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public boolean getSuspect() { - return suspect; - } - - /** * isConnected * * @return boolean @@ -339,17 +328,6 @@ public void setRxBufSize(int size) { this.rxBufSize = size; } - - /** - * setSuspect - * - * @param suspect boolean - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public void setSuspect(boolean suspect) { - this.suspect = suspect; - } - /** * setTxBufSize * Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=386148&r1=386147&r2=386148&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Wed Mar 15 11:32:57 2006 @@ -53,7 +53,6 @@ protected boolean directBuf = false; protected int rxBufSize = 43800; protected int txBufSize = 25188; - protected boolean suspect = false; private boolean connected; private boolean autoConnect; @@ -227,10 +226,6 @@ public void finalize() { try {disconnect(); }catch ( Exception ignore){} } - - public boolean getSuspect() { - return suspect; - } public boolean isConnected() { return connected; @@ -240,10 +235,6 @@ return autoConnect; } - public void setSuspect(boolean suspect) { - this.suspect = suspect; - } - public void setUseDirectBuffer(boolean directBuf) { this.directBuf = directBuf; } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]