Author: fhanik
Date: Wed Mar 15 11:32:57 2006
New Revision: 386148
URL: http://svn.apache.org/viewcvs?rev=386148&view=rev
Log:
Refactored so that sender state is handled on a per member basis, and is not
per socket.
as suspicion is handled correctly even in a multi threaded environment
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
Wed Mar 15 11:32:57 2006
@@ -15,16 +15,15 @@
*/
package org.apache.catalina.tribes.group;
-import org.apache.catalina.tribes.MembershipService;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelSender;
+import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelSender;
import org.apache.catalina.tribes.InterceptorPayload;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.MessageListener;
-import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.tcp.SenderState;
/**
@@ -122,11 +121,13 @@
}
public void memberAdded(Member member){
+ SenderState.getSenderState(member);
if ( clusterSender!=null ) clusterSender.add(member);
super.memberAdded(member);
}
public void memberDisappeared(Member member){
+ SenderState.removeSenderState(member);
if ( clusterSender!=null ) clusterSender.remove(member);
super.memberDisappeared(member);
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
Wed Mar 15 11:32:57 2006
@@ -30,7 +30,6 @@
* @version 1.0
*/
public abstract class AbstractPooledSender extends PooledSender implements
MultiPointSender{
- protected boolean suspect;
protected boolean useDirectBuffer = true;
protected int maxRetryAttempts;
protected boolean autoConnect;
@@ -38,10 +37,6 @@
public AbstractPooledSender() {
super();
}
-
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
- }
public void setUseDirectBuffer(boolean useDirectBuffer) {
this.useDirectBuffer = useDirectBuffer;
@@ -57,10 +52,6 @@
public void setKeepAliveCount(int keepAliveCount) {
this.keepAliveCount = keepAliveCount;
- }
-
- public boolean getSuspect() {
- return suspect;
}
public boolean getUseDirectBuffer() {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
Wed Mar 15 11:32:57 2006
@@ -32,8 +32,6 @@
public void setTxBufSize(int size);
public void setMaxRetryAttempts(int attempts);
public void setUseDirectBuffer(boolean directBuf);
- public void setSuspect(boolean suspect);
- public boolean getSuspect();
public void memberAdded(Member member);
public void memberDisappeared(Member member);
public void setAutoConnect(boolean auto);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
Wed Mar 15 11:32:57 2006
@@ -39,8 +39,6 @@
private boolean waitForAck;
private long timeout;
private int poolSize = 25;
- private boolean suspect;
-
public PooledSender() {
queue = new SenderQueue(this,poolSize);
}
@@ -99,14 +97,6 @@
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
queue.setLimit(poolSize);
- }
-
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
- }
-
- public boolean getSuspect() {
- return suspect;
}
public boolean isConnected() {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
Wed Mar 15 11:32:57 2006
@@ -16,6 +16,9 @@
package org.apache.catalina.tribes.tcp;
+import org.apache.catalina.tribes.Member;
+import java.util.HashMap;
+
/**
* Send cluster messages with a pool of sockets (25).
@@ -36,6 +39,30 @@
* The descriptive information about this implementation.
*/
private static final String info = "SenderState/1.0";
+
+
+ protected static HashMap memberStates = new HashMap();
+
+ public static SenderState getSenderState(Member member) {
+ SenderState state = (SenderState)memberStates.get(member);
+ if ( state == null ) {
+ synchronized ( memberStates ) {
+ state = (SenderState)memberStates.get(member);
+ if ( state == null ) {
+ state = new SenderState();
+ memberStates.put(member,state);
+ }
+ }
+ }
+ return state;
+ }
+
+ public static void removeSenderState(Member member) {
+ synchronized ( memberStates ) {
+ memberStates.remove(member);
+ }
+ }
+
// ----------------------------------------------------- Instance Variables
@@ -44,11 +71,11 @@
// ----------------------------------------------------- Constructor
- public SenderState() {
+ private SenderState() {
this(READY);
}
- public SenderState(int state) {
+ private SenderState(int state) {
this.state = state;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
Wed Mar 15 11:32:57 2006
@@ -29,6 +29,8 @@
import org.apache.catalina.util.StringManager;
import java.io.OutputStream;
import java.io.InputStream;
+import org.apache.catalina.tribes.Member;
+import java.net.UnknownHostException;
/**
* Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -64,6 +66,7 @@
* receiver port
*/
private int port;
+ protected Member member;
/**
@@ -79,11 +82,6 @@
private boolean connected = false;
/**
- * sender is in suspect state (last transfer failed)
- */
- private SenderState senderState = new SenderState();
-
- /**
* wait time for ack
*/
private long timeout;
@@ -137,19 +135,16 @@
// -------------------------------------------------------------
Constructor
- public BioSender(InetAddress host, int port) {
- this.address = host;
- this.port = port;
+ public BioSender(Member member) throws UnknownHostException {
+ this.member = member;
+ this.address = InetAddress.getByAddress(member.getHost());
+ this.port = member.getPort();
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.create",address, new
Integer(port)));
}
- public BioSender(InetAddress host, int port, SenderState state) {
- this(host,port);
- if ( state != null ) this.senderState = state;
- }
- public BioSender(InetAddress host, int port, SenderState state, int
rxBufSize, int txBufSize) {
- this(host,port,state);
+ public BioSender(Member member, int rxBufSize, int txBufSize) throws
UnknownHostException {
+ this(member);
this.rxBufSize = rxBufSize;
this.txBufSize = txBufSize;
}
@@ -193,21 +188,6 @@
return connected;
}
- public boolean isSuspect() {
- return senderState.isSuspect() || senderState.isFailing();
- }
-
- public boolean getSuspect() {
- return isSuspect();
- }
-
- public void setSuspect(boolean suspect) {
- if ( suspect )
- this.senderState.setSuspect();
- else
- this.senderState.setReady();
- }
-
public long getTimeout() {
return timeout;
}
@@ -274,10 +254,6 @@
this.resend = resend;
}
- public SenderState getSenderState() {
- return senderState;
- }
-
public int getRxBufSize() {
return rxBufSize;
}
@@ -413,7 +389,7 @@
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.openSocket",
address.getHostAddress(), new Integer(port), new Long(0)));
} catch (IOException ex1) {
- getSenderState().setSuspect();
+ SenderState.getSenderState(member).setSuspect();
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(),
new Integer(port),new Long(0)), ex1);
throw (ex1);
@@ -514,8 +490,8 @@
}
} catch (IOException x) {
String errmsg = sm.getString("IDataSender.ack.missing",
getAddress(),new Integer(socket.getLocalPort()), new Long(this.timeout));
- if ( !this.isSuspect() ) {
- this.setSuspect(true);
+ if ( !SenderState.getSenderState(member).isSuspect() ) {
+ SenderState.getSenderState(member).setSuspect();
if ( log.isWarnEnabled() ) log.warn(errmsg, x);
} else {
if ( log.isDebugEnabled() )log.debug(errmsg, x);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
Wed Mar 15 11:32:57 2006
@@ -38,7 +38,6 @@
protected boolean directBuf = false;
protected int rxBufSize = 43800;
protected int txBufSize = 25188;
- protected boolean suspect = false;
private boolean connected;
private boolean autoConnect;
@@ -67,8 +66,7 @@
try {
BioSender sender = (BioSender) bioSenders.get(destination[i]);
if (sender == null) {
- InetAddress dest =
InetAddress.getByAddress(destination[i].getHost());
- sender = new BioSender(dest, destination[i].getPort(), new
SenderState(), rxBufSize, txBufSize);
+ sender = new BioSender(destination[i], rxBufSize,
txBufSize);
sender.setKeepAliveCount(keepAliveCount);
sender.setTimeout(timeout);
//sender.setResend();
@@ -131,20 +129,12 @@
try {disconnect(); }catch ( Exception ignore){}
}
- public boolean getSuspect() {
- return suspect;
- }
-
public boolean isConnected() {
return connected;
}
public boolean isAutoConnect() {
return autoConnect;
- }
-
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
}
public void setUseDirectBuffer(boolean directBuf) {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
Wed Mar 15 11:32:57 2006
@@ -48,7 +48,6 @@
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(NioSender.class);
- protected boolean suspect = false;
protected boolean connected = false;
protected boolean waitForAck = false;
protected int rxBufSize = 25188;
@@ -277,16 +276,6 @@
return false;
}
/**
- * getSuspect
- *
- * @return boolean
- * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
- */
- public boolean getSuspect() {
- return suspect;
- }
-
- /**
* isConnected
*
* @return boolean
@@ -339,17 +328,6 @@
public void setRxBufSize(int size) {
this.rxBufSize = size;
}
-
- /**
- * setSuspect
- *
- * @param suspect boolean
- * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
- */
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
- }
-
/**
* setTxBufSize
*
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Wed Mar 15 11:32:57 2006
@@ -53,7 +53,6 @@
protected boolean directBuf = false;
protected int rxBufSize = 43800;
protected int txBufSize = 25188;
- protected boolean suspect = false;
private boolean connected;
private boolean autoConnect;
@@ -227,10 +226,6 @@
public void finalize() {
try {disconnect(); }catch ( Exception ignore){}
}
-
- public boolean getSuspect() {
- return suspect;
- }
public boolean isConnected() {
return connected;
@@ -240,10 +235,6 @@
return autoConnect;
}
- public void setSuspect(boolean suspect) {
- this.suspect = suspect;
- }
-
public void setUseDirectBuffer(boolean directBuf) {
this.directBuf = directBuf;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]