Author: fhanik
Date: Wed Mar 15 11:32:57 2006
New Revision: 386148

URL: http://svn.apache.org/viewcvs?rev=386148&view=rev
Log:
Refactored so that sender state is handled on a per member basis, and is not 
per socket.
as suspicion is handled correctly even in a multi threaded environment

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 Wed Mar 15 11:32:57 2006
@@ -15,16 +15,15 @@
  */
 package org.apache.catalina.tribes.group;
 
-import org.apache.catalina.tribes.MembershipService;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelSender;
+import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelSender;
 import org.apache.catalina.tribes.InterceptorPayload;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipService;
 import org.apache.catalina.tribes.MessageListener;
-import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.tcp.SenderState;
 
 
 /**
@@ -122,11 +121,13 @@
     }
     
     public void memberAdded(Member member){
+        SenderState.getSenderState(member);
         if ( clusterSender!=null ) clusterSender.add(member);
         super.memberAdded(member);
     }
     
     public void memberDisappeared(Member member){
+        SenderState.removeSenderState(member);
         if ( clusterSender!=null ) clusterSender.remove(member);
         super.memberDisappeared(member);
     }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
 Wed Mar 15 11:32:57 2006
@@ -30,7 +30,6 @@
  * @version 1.0
  */
 public abstract class AbstractPooledSender extends PooledSender implements 
MultiPointSender{
-    protected boolean suspect;
     protected boolean useDirectBuffer = true;
     protected int maxRetryAttempts;
     protected boolean autoConnect;
@@ -38,10 +37,6 @@
     public AbstractPooledSender() {
         super();
     }
-    
-    public void setSuspect(boolean suspect) {
-        this.suspect = suspect;
-    }
 
     public void setUseDirectBuffer(boolean useDirectBuffer) {
         this.useDirectBuffer = useDirectBuffer;
@@ -57,10 +52,6 @@
 
     public void setKeepAliveCount(int keepAliveCount) {
         this.keepAliveCount = keepAliveCount;
-    }
-
-    public boolean getSuspect() {
-        return suspect;
     }
 
     public boolean getUseDirectBuffer() {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
 Wed Mar 15 11:32:57 2006
@@ -32,8 +32,6 @@
     public void setTxBufSize(int size);
     public void setMaxRetryAttempts(int attempts);
     public void setUseDirectBuffer(boolean directBuf);
-    public void setSuspect(boolean suspect);
-    public boolean getSuspect();
     public void memberAdded(Member member);
     public void memberDisappeared(Member member);
     public void setAutoConnect(boolean auto);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
 Wed Mar 15 11:32:57 2006
@@ -39,8 +39,6 @@
     private boolean waitForAck;
     private long timeout;
     private int poolSize = 25;
-    private boolean suspect;
-
     public PooledSender() {
         queue = new SenderQueue(this,poolSize);
     }
@@ -99,14 +97,6 @@
     public void setPoolSize(int poolSize) {
         this.poolSize = poolSize;
         queue.setLimit(poolSize);
-    }
-
-    public void setSuspect(boolean suspect) {
-        this.suspect = suspect;
-    }
-    
-    public boolean getSuspect() {
-        return suspect;
     }
 
     public boolean isConnected() {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SenderState.java
 Wed Mar 15 11:32:57 2006
@@ -16,6 +16,9 @@
 
 package org.apache.catalina.tribes.tcp;
 
+import org.apache.catalina.tribes.Member;
+import java.util.HashMap;
+
 
 /**
  * Send cluster messages with a pool of sockets (25).
@@ -36,6 +39,30 @@
      * The descriptive information about this implementation.
      */
     private static final String info = "SenderState/1.0";
+    
+    
+    protected static HashMap memberStates = new HashMap();
+    
+    public static SenderState getSenderState(Member member) {
+        SenderState state = (SenderState)memberStates.get(member);
+        if ( state == null ) {
+            synchronized ( memberStates ) {
+                state = (SenderState)memberStates.get(member);
+                if ( state == null ) {
+                    state = new SenderState();
+                    memberStates.put(member,state);
+                }
+            }
+        }
+        return state;
+    }
+    
+    public static void removeSenderState(Member member) {
+        synchronized ( memberStates ) {
+            memberStates.remove(member);
+        }
+    }
+    
 
     // ----------------------------------------------------- Instance Variables
 
@@ -44,11 +71,11 @@
     //  ----------------------------------------------------- Constructor
 
     
-    public SenderState() {
+    private SenderState() {
         this(READY);
     }
 
-    public SenderState(int state) {
+    private SenderState(int state) {
         this.state = state;
     }
     

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
 Wed Mar 15 11:32:57 2006
@@ -29,6 +29,8 @@
 import org.apache.catalina.util.StringManager;
 import java.io.OutputStream;
 import java.io.InputStream;
+import org.apache.catalina.tribes.Member;
+import java.net.UnknownHostException;
 
 /**
  * Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -64,6 +66,7 @@
      * receiver port
      */
     private int port;
+    protected Member member;
 
     
     /**
@@ -79,11 +82,6 @@
     private boolean connected = false;
 
     /**
-     * sender is in suspect state (last transfer failed)
-     */
-    private SenderState senderState = new SenderState();
-
-    /**
      * wait time for ack
      */
     private long timeout;
@@ -137,19 +135,16 @@
 
     // ------------------------------------------------------------- 
Constructor
     
-    public BioSender(InetAddress host, int port) {
-        this.address = host;
-        this.port = port;
+    public BioSender(Member member) throws UnknownHostException {
+        this.member = member;
+        this.address = InetAddress.getByAddress(member.getHost());
+        this.port = member.getPort();
         if (log.isDebugEnabled())
             log.debug(sm.getString("IDataSender.create",address, new 
Integer(port)));
     }
 
-    public BioSender(InetAddress host, int port, SenderState state) {
-        this(host,port);
-        if ( state != null ) this.senderState = state;
-    }
-    public BioSender(InetAddress host, int port, SenderState state, int 
rxBufSize, int txBufSize) {
-        this(host,port,state);
+    public BioSender(Member member, int rxBufSize, int txBufSize) throws 
UnknownHostException {
+        this(member);
         this.rxBufSize = rxBufSize;
         this.txBufSize = txBufSize;
     }
@@ -193,21 +188,6 @@
         return connected;
     }
 
-    public boolean isSuspect() {
-        return senderState.isSuspect() || senderState.isFailing();
-    }
-
-    public boolean getSuspect() {
-        return isSuspect();
-    }
-
-    public void setSuspect(boolean suspect) {
-        if ( suspect ) 
-            this.senderState.setSuspect();
-        else
-            this.senderState.setReady();
-    }
-
     public long getTimeout() {
         return timeout;
     }
@@ -274,10 +254,6 @@
         this.resend = resend;
     }
 
-    public SenderState getSenderState() {
-        return senderState;
-    }
-
     public int getRxBufSize() {
         return rxBufSize;
     }
@@ -413,7 +389,7 @@
            if (log.isDebugEnabled())
                log.debug(sm.getString("IDataSender.openSocket", 
address.getHostAddress(), new Integer(port), new Long(0)));
       } catch (IOException ex1) {
-          getSenderState().setSuspect();
+          SenderState.getSenderState(member).setSuspect();
           if (log.isDebugEnabled())
               
log.debug(sm.getString("IDataSender.openSocket.failure",address.getHostAddress(),
 new Integer(port),new Long(0)), ex1);
           throw (ex1);
@@ -514,8 +490,8 @@
             }
         } catch (IOException x) {
             String errmsg = sm.getString("IDataSender.ack.missing", 
getAddress(),new Integer(socket.getLocalPort()), new Long(this.timeout));
-            if ( !this.isSuspect() ) {
-                this.setSuspect(true);
+            if ( !SenderState.getSenderState(member).isSuspect() ) {
+                SenderState.getSenderState(member).setSuspect();
                 if ( log.isWarnEnabled() ) log.warn(errmsg, x);
             } else {
                 if ( log.isDebugEnabled() )log.debug(errmsg, x);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
 Wed Mar 15 11:32:57 2006
@@ -38,7 +38,6 @@
     protected boolean directBuf = false;
     protected int rxBufSize = 43800;
     protected int txBufSize = 25188;
-    protected boolean suspect = false;
     private boolean connected;
     private boolean autoConnect;
 
@@ -67,8 +66,7 @@
             try {
                 BioSender sender = (BioSender) bioSenders.get(destination[i]);
                 if (sender == null) {
-                    InetAddress dest = 
InetAddress.getByAddress(destination[i].getHost());
-                    sender = new BioSender(dest, destination[i].getPort(), new 
SenderState(), rxBufSize, txBufSize);
+                    sender = new BioSender(destination[i], rxBufSize, 
txBufSize);
                     sender.setKeepAliveCount(keepAliveCount);
                     sender.setTimeout(timeout);
                     //sender.setResend();
@@ -131,20 +129,12 @@
         try {disconnect(); }catch ( Exception ignore){}
     }
 
-    public boolean getSuspect() {
-        return suspect;
-    }
-
     public boolean isConnected() {
         return connected;
     }
 
     public boolean isAutoConnect() {
         return autoConnect;
-    }
-
-    public void setSuspect(boolean suspect) {
-        this.suspect = suspect;
     }
 
     public void setUseDirectBuffer(boolean directBuf) {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
 Wed Mar 15 11:32:57 2006
@@ -48,7 +48,6 @@
 
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(NioSender.class);
 
-    protected boolean suspect = false;
     protected boolean connected = false;
     protected boolean waitForAck = false;
     protected int rxBufSize = 25188;
@@ -277,16 +276,6 @@
         return false;
     }
     /**
-     * getSuspect
-     *
-     * @return boolean
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public boolean getSuspect() {
-        return suspect;
-    }
-
-    /**
      * isConnected
      *
      * @return boolean
@@ -339,17 +328,6 @@
     public void setRxBufSize(int size) {
         this.rxBufSize = size;
     }
-
-    /**
-     * setSuspect
-     *
-     * @param suspect boolean
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public void setSuspect(boolean suspect) {
-        this.suspect = suspect;
-    }
-
     /**
      * setTxBufSize
      *

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=386148&r1=386147&r2=386148&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
 Wed Mar 15 11:32:57 2006
@@ -53,7 +53,6 @@
     protected boolean directBuf = false;
     protected int rxBufSize = 43800;
     protected int txBufSize = 25188;
-    protected boolean suspect = false;
     private boolean connected;
     private boolean autoConnect;
 
@@ -227,10 +226,6 @@
     public void finalize() {
         try {disconnect(); }catch ( Exception ignore){}
     }
-    
-    public boolean getSuspect() {
-        return suspect;
-    }
 
     public boolean isConnected() {
         return connected;
@@ -240,10 +235,6 @@
         return autoConnect;
     }
 
-    public void setSuspect(boolean suspect) {
-        this.suspect = suspect;
-    }
-    
     public void setUseDirectBuffer(boolean directBuf) {
         this.directBuf = directBuf;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to