Author: fhanik
Date: Mon Mar 13 10:00:08 2006
New Revision: 385606

URL: http://svn.apache.org/viewcvs?rev=385606&view=rev
Log:
Refactoring the senders into a cleaner structure

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java
      - copied, changed from r385604, 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
Removed:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenderFactory.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/IDynamicProperty.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml Mon Mar 13 
10:00:08 2006
@@ -305,12 +305,30 @@
                     txBufSize="25188"/>
 
                 <Sender
-                    
className="org.apache.catalina.tribes.tcp.ReplicationTransmitter"
-                    replicationMode="pooled"
-                    ackTimeout="15000"
-                    waitForAck="true"
-                    rxBufSize="43800"
-                    txBufSize="25188"/>
+                    
className="org.apache.catalina.tribes.tcp.ReplicationTransmitter">
+                    <transport 
className="org.apache.catalina.tribes.tcp.nio.PooledParallelSender"
+                               maxRetryAttempts="2"
+                               timeout="15000"
+                               waitForAck="true"
+                               ackTimeout="15000"
+                               rxBufSize="43800"
+                               txBufSize="25188"
+                               autoConnect="true"
+                               poolSize="25"/>
+                    <!--
+                    <transport 
className="org.apache.catalina.tribes.tcp.bio.PooledBioSender"
+                               maxRetryAttempts="2"
+                               timeout="15000"
+                               waitForAck="true"
+                               ackTimeout="15000"
+                               rxBufSize="43800"
+                               txBufSize="25188"
+                               autoConnect="true"
+                               poolSize="25"/>
+                    -->
+
+                </Sender>
+
                     
                 <!--
                 <Interceptor 
className="org.apache.catalina.tribes.group.interceptors.GzipInterceptor"/>

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 Mon Mar 13 10:00:08 2006
@@ -25,21 +25,10 @@
  */    
 public interface ChannelSender
 {
-
     public void add(Member member);
-
     public void remove(Member member);
-
     public void start() throws java.io.IOException;
-
     public void stop();
-
     public void heartbeat() ;
-
     public void sendMessage(ChannelMessage message, Member[] destination) 
throws ChannelException;
-
-    public boolean getWaitForAck();
-    public void setWaitForAck(boolean isWaitForAck);
-    
-    public boolean isParallel();
 }

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java?rev=385606&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java
 Mon Mar 13 10:00:08 2006
@@ -0,0 +1,79 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tcp;
+
+import java.io.IOException;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.catalina.tribes.tcp.PooledSender;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public abstract class AbstractPooledSender extends PooledSender implements 
MultiPointSender{
+    protected boolean suspect;
+    protected boolean useDirectBuffer;
+    protected int maxRetryAttempts;
+    protected boolean autoConnect;
+    public AbstractPooledSender() {
+        super();
+    }
+    
+    public void setSuspect(boolean suspect) {
+        this.suspect = suspect;
+    }
+
+    public void setUseDirectBuffer(boolean useDirectBuffer) {
+        this.useDirectBuffer = useDirectBuffer;
+    }
+
+    public void setMaxRetryAttempts(int maxRetryAttempts) {
+        this.maxRetryAttempts = maxRetryAttempts;
+    }
+
+    public void setAutoConnect(boolean autoConnect) {
+        this.autoConnect = autoConnect;
+    }
+
+    public boolean getSuspect() {
+        return suspect;
+    }
+
+    public boolean getUseDirectBuffer() {
+        return useDirectBuffer;
+    }
+
+    public int getMaxRetryAttempts() {
+        return maxRetryAttempts;
+    }
+
+    public boolean isAutoConnect() {
+        return autoConnect;
+    }
+}

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 Mon Mar 13 10:00:08 2006
@@ -1,5 +1,5 @@
 /*
- * Copyright 1999,2004 The Apache Software Foundation.
+ * Copyright 1999,2006 The Apache Software Foundation.
  * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -38,7 +38,4 @@
     public boolean checkKeepAlive();
     public void setTimeout(long timeout);
     public void setWaitForAck(boolean isWaitForAck);
-
-
-
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java
 Mon Mar 13 10:00:08 2006
@@ -1,5 +1,5 @@
 /*
- * Copyright 1999,2004 The Apache Software Foundation.
+ * Copyright 1999,2006 The Apache Software Foundation.
  * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -34,5 +34,8 @@
     public void setUseDirectBuffer(boolean directBuf);
     public void setSuspect(boolean suspect);
     public boolean getSuspect();
+    public void memberAdded(Member member);
+    public void memberRemoved(Member member);
+    public void setAutoConnect(boolean auto);
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java
 Mon Mar 13 10:00:08 2006
@@ -1,11 +1,23 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.catalina.tribes.tcp;
 
-import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.catalina.tribes.ChannelException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.List;
 
 /**
  * <p>Title: </p>
@@ -27,9 +39,10 @@
     private int txBufSize;
     private boolean waitForAck;
     private long timeout;
+    private int poolSize = 25;
 
-    public PooledSender(int queueSize) {
-        queue = new SenderQueue(this,queueSize);
+    public PooledSender() {
+        queue = new SenderQueue(this,poolSize);
     }
     
     public abstract DataSender getNewDataSender();
@@ -83,6 +96,11 @@
         this.timeout = timeout;
     }
 
+    public void setPoolSize(int poolSize) {
+        this.poolSize = poolSize;
+        queue.setLimit(poolSize);
+    }
+
     public boolean isConnected() {
         return connected;
     }
@@ -103,6 +121,10 @@
         return timeout;
     }
 
+    public int getPoolSize() {
+        return poolSize;
+    }
+
     public boolean checkKeepAlive() {
         //do nothing, the pool checks on every return
         return false;
@@ -187,7 +209,9 @@
             }
             //to do
             inuse.remove(sender);
-            notinuse.add(sender);
+            //just in case the limit has changed
+            if ( notinuse.size() < this.getLimit() ) notinuse.add(sender);
+            else try {sender.disconnect(); } catch ( Exception ignore){}
             notify();
         }
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 Mon Mar 13 10:00:08 2006
@@ -16,20 +16,15 @@
 
 package org.apache.catalina.tribes.tcp;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import javax.management.ObjectName;
 
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelSender;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.util.IDynamicProperty;
 import org.apache.catalina.util.StringManager;
-import org.apache.tomcat.util.IntrospectionUtils;
-import org.apache.catalina.tribes.tcp.nio.PooledParallelSender;
 
 /**
  * Transmit message to other cluster members
@@ -39,7 +34,7 @@
  * @author Filip Hanik
  * @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb 
2006) $
  */
-public class ReplicationTransmitter implements ChannelSender,IDynamicProperty {
+public class ReplicationTransmitter implements ChannelSender {
     private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(ReplicationTransmitter.class);
 
     /**
@@ -53,9 +48,7 @@
     protected StringManager sm = StringManager.getManager(Constants.Package);
 
     
-    private Map map = new HashMap();
-
-    /**
+    /* 
      * @todo make this configurable
      */
     protected int rxBufSize = 43800;
@@ -66,12 +59,6 @@
 
     public ReplicationTransmitter() {
     }
-
-    /**
-     * current sender replication mode
-     */
-    private String replicationMode;
-
     /**
      * sender default ackTimeout
      */
@@ -85,15 +72,8 @@
     /**
      * autoConnect sender when next message send
      */
-    private boolean autoConnect = false; 
-   
-    /**
-     * dynamic sender <code>properties</code>
-     */
-    private Map properties = new HashMap();
-
-
-    // ------------------------------------------------------------- Properties
+    private boolean autoConnect = false;
+    private MultiPointSender transport;
 
     /**
      * Return descriptive information about this implementation and the
@@ -104,160 +84,12 @@
         return (info);
     }
 
-    
-
-    /**
-     * current replication mode
-     * 
-     * @return The mode
-     */
-    public String getReplicationMode() {
-        return replicationMode;
-    }
-
-    /**
-     * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
-     * 
-     * @see IDataSenderFactory#validateMode(String)
-     * @param mode
-     */
-    public void setReplicationMode(String mode) {
-        String msg = DataSenderFactory.validateMode(mode);
-        if (msg == null) {
-            if (log.isDebugEnabled())
-                log.debug("Setting replication mode to " + mode);
-            this.replicationMode = mode;
-        } else
-            throw new IllegalArgumentException(msg);
-
-    }
-
-    
-
-    /**
-     * @return Returns the autoConnect.
-     */
-    public boolean isAutoConnect() {
-        return autoConnect;
-    }
-
-    /**
-     * @param autoConnect
-     *            The autoConnect to set.
-     */
-    public void setAutoConnect(boolean autoConnect) {
-        this.autoConnect = autoConnect;
-        setProperty("autoConnect", String.valueOf(autoConnect));
-
-    }
-
-    /**
-     * @return The ack timeout
-     */
-    public long getAckTimeout() {
-        return ackTimeout;
-    }
-
-    /**
-     * @param ackTimeout
-     */
-    public void setAckTimeout(long ackTimeout) {
-        this.ackTimeout = ackTimeout;
-        setProperty("ackTimeout", String.valueOf(ackTimeout));
-    }
-
-    /**
-     * @return Returns the waitForAck.
-     */
-    public boolean getWaitForAck() {
-        return waitForAck;
-    }
-
-    /**
-     * @param waitForAck
-     *            The waitForAck to set.
-     */
-    public void setWaitForAck(boolean waitForAck) {
-        this.waitForAck = waitForAck;
-        setProperty("waitForAck", String.valueOf(waitForAck));
-    }
-
-    
-    public int getTxBufSize() {
-        return txBufSize;
-    }
-
-    public int getRxBufSize() {
-        return rxBufSize;
-    }
-
-    public boolean isParallel() {
-        return "parallel".equals(replicationMode);
-    }
-
-    public void setTxBufSize(int txBufSize) {
-        this.txBufSize = txBufSize;
-    }
-
-    public void setRxBufSize(int rxBufSize) {
-        this.rxBufSize = rxBufSize;
-    }
-
-    /**
-     * @return True if synchronized sender
-     */
-    public boolean getIsSenderSynchronized() {
-        return 
-            DataSenderFactory.SYNC_MODE.equals(replicationMode) ||
-            DataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode) ||
-            (DataSenderFactory.PARALLEL_MODE.equals(replicationMode) && 
waitForAck);
+    public MultiPointSender getTransport() {
+        return transport;
     }
 
-    // ------------------------------------------------------------- dynamic
-    // sender property handling
-
-    /**
-     * set config attributes with reflect
-     * 
-     * @param name
-     * @param value
-     */
-    public void setProperty(String name, Object value) {
-        if (log.isTraceEnabled())
-            log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
-                    value, properties.get(name)));
-
-        properties.put(name, value);
-    }
-
-    /**
-     * get current config
-     * 
-     * @param key
-     * @return The property
-     */
-    public Object getProperty(String key) {
-        if (log.isTraceEnabled())
-            log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
-        return properties.get(key);
-    }
-
-    /**
-     * Get all properties keys
-     * 
-     * @return An iterator over the propery name set
-     */
-    public Iterator getPropertyNames() {
-        return properties.keySet().iterator();
-    }
-
-    /**
-     * remove a configured property.
-     * 
-     * @param key
-     */
-    public void removeProperty(String key) {
-        properties.remove(key);
+    public void setTransport(MultiPointSender transport) {
+        this.transport = transport;
     }
 
     // ------------------------------------------------------------- public
@@ -267,50 +99,10 @@
      * @see 
org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage,
 org.apache.catalina.tribes.Member)
      */
     public void sendMessage(ChannelMessage message, Member[] destination) 
throws ChannelException {
-        if ( !isParallel() ) {
-            ChannelException exception = null;
-            for (int i = 0; i < destination.length; i++) {
-                try {
-                    sendMessage(message, destination[i]);
-                } catch (Exception x) {
-                    if (exception == null) exception = new ChannelException(x);
-                    exception.addFaultyMember(destination[i]);
-                }
-            }
-            if (exception != null)throw exception;
-        } else {
-            MultiPointSender sender = getParallelSender();
-            sender.sendMessage(destination,message);
-        }
+        MultiPointSender sender = getTransport();
+        sender.sendMessage(destination,message);
     }
     
-    /**
-     * @todo FIX THIS TO BE IN THE FACTORY
-     */
-    PooledParallelSender parallelsender = null;
-    public MultiPointSender getParallelSender() {
-        if ( parallelsender == null ) {
-            PooledParallelSender sender = new PooledParallelSender();
-            sender.setMaxRetryAttempts(2);
-            sender.setRxBufSize(getRxBufSize());
-            sender.setTimeout(ackTimeout);
-            sender.setUseDirectBuffer(true);
-            sender.setWaitForAck(getWaitForAck());
-            sender.setTxBufSize(getTxBufSize());
-            parallelsender = sender;
-        }
-        return parallelsender;
-    }
-    
-    public void sendMessage(ChannelMessage message, Member destination) throws 
ChannelException {       
-        Object key = getKey(destination);
-        SinglePointSender sender = (SinglePointSender) map.get(key);
-        if ( sender == null ) {
-            add(destination);
-            sender = (SinglePointSender) map.get(key);
-        }
-        sendMessageData(message, sender);
-    }
     
     /**
      * start the sender and register transmitter mbean
@@ -326,16 +118,7 @@
      * @see org.apache.catalina.tribes.ClusterSender#stop()
      */
     public synchronized void stop() {
-        Iterator i = map.entrySet().iterator();
-        while (i.hasNext()) {
-            SinglePointSender sender = (SinglePointSender) 
((java.util.Map.Entry) i.next())
-                    .getValue();
-            try {
-                sender.disconnect();
-            } catch (Exception x) {
-            }
-            i.remove();
-        }
+        getTransport().disconnect();
     }
 
     /**
@@ -353,35 +136,10 @@
      * @see DataSender#checkKeepAlive()
      */
     public void checkKeepAlive() {
-        if (map.size() > 0) {
-            java.util.Iterator iter = map.entrySet().iterator();
-            while (iter.hasNext()) {
-                SinglePointSender sender = (SinglePointSender) 
((java.util.Map.Entry) iter
-                        .next()).getValue();
-                if (sender != null)
-                    sender.checkKeepAlive();
-            }
-        }
+        getTransport().checkKeepAlive();
     }
 
-    /**
-     * get all current senders
-     * 
-     * @return The senders
-     */
-    public SinglePointSender[] getSenders() {
-        java.util.Iterator iter = map.entrySet().iterator();
-        SinglePointSender[] array = new SinglePointSender[map.size()];
-        int i = 0;
-        while (iter.hasNext()) {
-            SinglePointSender sender = (SinglePointSender) 
((java.util.Map.Entry) iter
-                    .next()).getValue();
-            if (sender != null)
-                array[i] = sender;
-            i++;
-        }
-        return array;
-    }
+    
 
     /**
      * add new cluster member and create sender ( s. replicationMode) transfer
@@ -390,22 +148,7 @@
      * @see 
org.apache.catalina.tribes.ClusterSender#add(org.apache.catalina.tribes.Member)
      */
     public synchronized void add(Member member) {
-        try {
-            if ( !isParallel() ) {
-                Object key = getKey(member);
-                if (!map.containsKey(key)) {
-                    SinglePointSender sender = 
DataSenderFactory.getSingleSender(replicationMode, member);
-                    if (sender != null) {
-                        transferSenderProperty(sender);
-                        sender.setRxBufSize(getRxBufSize());
-                        sender.setTxBufSize(getTxBufSize());
-                        map.put(key, sender);
-                    }
-                }
-            }
-        } catch (java.io.IOException x) {
-            log.error("Unable to create and add a IDataSender object.", x);
-        }
+        getTransport().memberAdded(member);
     }
 
     /**
@@ -414,76 +157,11 @@
      * @see 
org.apache.catalina.tribes.ClusterSender#remove(org.apache.catalina.tribes.Member)
      */
     public synchronized void remove(Member member) {
-        Object key = getKey(member);
-        SinglePointSender toberemoved = (SinglePointSender) map.get(key);
-        if (toberemoved == null)
-            return;
-        toberemoved.disconnect();
-        map.remove(key);
-
+        getTransport().memberRemoved(member);
     }
 
     // ------------------------------------------------------------- protected
 
-    /**
-     * Transfer all properties from transmitter to concrete sender
-     * 
-     * @param sender
-     */
-    protected void transferSenderProperty(SinglePointSender sender) {
-        for (Iterator iter = getPropertyNames(); iter.hasNext();) {
-            String pkey = (String) iter.next();
-            Object value = getProperty(pkey);
-            IntrospectionUtils.setProperty(sender, pkey, value.toString());
-        }
-    }
-
-    /**
-     * set unique key to find sender
-     * 
-     * @param member
-     * @return concat member.host:member.port
-     */
-    protected Object getKey(Member member) {
-        return member;
-    }
-
-    /**
-     * Send message to concrete sender. If autoConnect is true, check is
-     * connection broken and the reconnect the complete sender.
-     * <ul>
-     * <li>failure the suspect flag is set true. After successfully sending the
-     * suspect flag is set to false.</li>
-     * <li>Stats is only update after sussesfull sending</li>
-     * </ul>
-     * 
-     * @param data message Data
-     * @param sender concrete message sender
-     * @return true if the message got sent, false otherwise
-     * @throws java.io.IOException If an error occurs
-     */
-    protected void sendMessageData(ChannelMessage data,
-                                   SinglePointSender sender) throws 
ChannelException {
-        if (sender == null)
-            throw new RuntimeException("Sender not available. Make sure sender 
information is available to the ReplicationTransmitter.");
-        try {
-            // deprecated not needed DataSender#pushMessage can handle 
connection
-            if (autoConnect) {
-                synchronized(sender) {
-                    if(!sender.isConnected()) sender.connect();
-                }
-            }
-            sender.sendMessage(data);
-            sender.setSuspect(false);
-        } catch (ChannelException x) {
-            if (!sender.getSuspect()) {
-                if (log.isErrorEnabled() ) log.error("Unable to send 
replicated message, is member ["+sender.toString()+"] down?",x);
-            } else if (log.isDebugEnabled() ) {
-                log.debug("Unable to send replicated message, is member 
["+sender.toString()+"] down?",x);
-            }
-            sender.setSuspect(true);
-            throw x;
-        }
+    
 
-    }
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java
 Mon Mar 13 10:00:08 2006
@@ -1,5 +1,5 @@
 /*
- * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
  * 
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy 
of

Copied: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java
 (from r385604, 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java)
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java&r1=385604&r2=385606&rev=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java
 Mon Mar 13 10:00:08 2006
@@ -16,39 +16,29 @@
 
 package org.apache.catalina.tribes.tcp.bio;
 
-import java.io.IOException;
 import java.net.InetAddress;
-import java.util.LinkedList;
 
-import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.tcp.PooledSender;
+import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.PooledSender;
 import org.apache.catalina.tribes.tcp.SenderState;
-import org.apache.catalina.tribes.tcp.SinglePointSender;
-import java.net.Inet4Address;
 
 /**
  * Send cluster messages with a pool of sockets (25).
  * 
  * @author Filip Hanik
- * @author Peter Rossbach
  * @version 1.2
  */
 
-public class PooledSocketSender extends PooledSender implements 
SinglePointSender {
+public class MultiSocketSender extends PooledSender implements DataSender {
 
-    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory
-            
.getLog(org.apache.catalina.tribes.tcp.bio.PooledSocketSender.class);
+    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(org.apache.catalina.tribes.tcp.bio.MultiSocketSender.class);
 
     /**
      * The descriptive information about this implementation.
      */
-    private static final String info = "PooledSocketSender/2.0";
-
-    // ----------------------------------------------------- Instance Variables
-
-    private int maxPoolSocketLimit = 25;
+    private static final String info = "MultiSocketSender/2.0";
     private String domain;
     private InetAddress host;
     private int port;
@@ -66,16 +56,12 @@
     * @param host replication node tcp address
     * @param port replication node tcp port
     */
-   public PooledSocketSender(String domain,InetAddress host, int port) {
-       this(domain,host,port,25);
-   }
-
-    public PooledSocketSender(String domain,InetAddress host, int port, int 
poolSize) {
-        super(poolSize);
+    public MultiSocketSender(String domain,InetAddress host, int port, int 
poolSize) {
+        super();
+        super.setPoolSize(poolSize);
         this.host = host;
         this.domain = domain;
         this.port = port;
-        this.maxPoolSocketLimit = poolSize;
     }
    
     //  ----------------------------------------------------- Public Properties
@@ -86,9 +72,7 @@
      * <code>&lt;description&gt;/&lt;version&gt;</code>.
      */
     public String getInfo() {
-
         return (info);
-
     }
 
     public void setDomain(String domain) {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
 Mon Mar 13 10:00:08 2006
@@ -21,14 +21,16 @@
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.Arrays;
 
+import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.tcp.Constants;
+import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.SenderState;
 import org.apache.catalina.util.StringManager;
-import java.util.Arrays;
-import org.apache.catalina.tribes.tcp.*;
-import org.apache.catalina.tribes.ChannelException;
 
 /**
  * Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -39,7 +41,7 @@
  * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 
2006) $
  * @since 5.5.16
  */
-public class SinglePointDataSender implements SinglePointSender {
+public class SinglePointDataSender implements DataSender {
 
     private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(SinglePointDataSender.class);
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
 Mon Mar 13 10:00:08 2006
@@ -54,20 +54,10 @@
     protected int txBufSize = 25188;
     protected boolean suspect = false;
     private boolean connected;
+    private boolean autoConnect;
 
-    public ParallelNioSender(long timeout, 
-                             boolean waitForAck,
-                             int retryAttempts,
-                             boolean directBuf,
-                             int rxBufSize,
-                             int txBufSize) throws IOException {
-        this.timeout = timeout;
-        this.waitForAck = waitForAck;
-        this.retryAttempts = retryAttempts;
+    public ParallelNioSender() throws IOException {
         selector = Selector.open();
-        this.directBuf = directBuf;
-        this.rxBufSize = rxBufSize;
-        this.txBufSize = txBufSize;
     }
     
     
@@ -214,6 +204,15 @@
         if ( x != null ) throw x;
     }
     
+    public void memberAdded(Member member) {
+        
+    }
+    
+    public void memberRemoved(Member member) {
+        //disconnect senders
+    }
+
+    
     public synchronized void disconnect() {
         try {close(); }catch (Exception x){}
         setConnected(false);
@@ -231,6 +230,10 @@
         return connected;
     }
 
+    public boolean isAutoConnect() {
+        return autoConnect;
+    }
+
     public void setSuspect(boolean suspect) {
         this.suspect = suspect;
     }
@@ -262,7 +265,11 @@
     public void setConnected(boolean connected) {
         this.connected = connected;
     }
-    
+
+    public void setAutoConnect(boolean autoConnect) {
+        this.autoConnect = autoConnect;
+    }
+
     public boolean checkKeepAlive() {
         //throw new UnsupportedOperationException("Method 
ParallelNioSender.checkKeepAlive() not implemented");
         return false;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java
 Mon Mar 13 10:00:08 2006
@@ -1,12 +1,28 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.catalina.tribes.tcp.nio;
 
-import org.apache.catalina.tribes.tcp.PooledSender;
-import org.apache.catalina.tribes.tcp.DataSender;
-import org.apache.catalina.tribes.tcp.MultiPointSender;
-import org.apache.catalina.tribes.Member;
+import java.io.IOException;
+
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
-import java.io.IOException;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.tcp.AbstractPooledSender;
+import org.apache.catalina.tribes.tcp.DataSender;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
 
 /**
  * <p>Title: </p>
@@ -20,14 +36,11 @@
  * @author not attributable
  * @version 1.0
  */
-public class PooledParallelSender extends PooledSender implements 
MultiPointSender{
-    private boolean suspect;
-    private boolean useDirectBuffer;
-    private int maxRetryAttempts;
-
+public class PooledParallelSender extends AbstractPooledSender implements 
MultiPointSender {
     public PooledParallelSender() {
-        super(25);
+        super();
     }
+    
     public void sendMessage(Member[] destination, ChannelMessage message) 
throws ChannelException {
         ParallelNioSender sender = (ParallelNioSender)getSender();
         try {
@@ -39,40 +52,24 @@
 
     public DataSender getNewDataSender() {
         try {
-            ParallelNioSender sender = 
-                new ParallelNioSender(getTimeout(), 
-                                      getWaitForAck(), 
-                                      getMaxRetryAttempts(), 
-                                      useDirectBuffer,
-                                      getRxBufSize(), 
-                                      getTxBufSize());
+            ParallelNioSender sender = new ParallelNioSender();
+            sender.setTimeout(getTimeout());
+            sender.setWaitForAck(getWaitForAck());
+            sender.setMaxRetryAttempts(getMaxRetryAttempts()); 
+            sender.setUseDirectBuffer(getUseDirectBuffer());
+            sender.setRxBufSize(getRxBufSize());
+            sender.setTxBufSize(getTxBufSize());
             return sender;
         } catch ( IOException x ) {
             throw new IllegalStateException("Unable to open NIO selector.",x);
         }
     }
 
-    public void setSuspect(boolean suspect) {
-        this.suspect = suspect;
-    }
-
-    public void setUseDirectBuffer(boolean useDirectBuffer) {
-        this.useDirectBuffer = useDirectBuffer;
-    }
-
-    public void setMaxRetryAttempts(int maxRetryAttempts) {
-        this.maxRetryAttempts = maxRetryAttempts;
-    }
-
-    public boolean getSuspect() {
-        return suspect;
-    }
-
-    public boolean getUseDirectBuffer() {
-        return useDirectBuffer;
-    }
-
-    public int getMaxRetryAttempts() {
-        return maxRetryAttempts;
+    public void memberAdded(Member member) {
+    
     }
+    
+    public void memberRemoved(Member member) {
+        //disconnect senders
+    }    
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=385606&r1=385605&r2=385606&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
 Mon Mar 13 10:00:08 2006
@@ -24,6 +24,11 @@
 import org.apache.catalina.tribes.group.interceptors.GzipInterceptor;
 import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
 import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor;
+import java.util.Properties;
+import java.util.Iterator;
+import org.apache.catalina.tribes.tcp.MultiPointSender;
+import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.tribes.tcp.nio.PooledParallelSender;
 
 /**
  * <p>Title: </p>
@@ -40,8 +45,7 @@
 
     public static StringBuffer usage() {
         StringBuffer buf = new StringBuffer();
-        buf.append("\n\t\t[-sender pooled|fastasyncqueue]")
-           .append("\n\t\t[-bind tcpbindaddress]")
+        buf.append("\n\t\t[-bind tcpbindaddress]")
            .append("\n\t\t[-tcpselto tcpselectortimeout]") 
            .append("\n\t\t[-tcpthreads tcpthreadcount]") 
            .append("\n\t\t[-port tcplistenport]")
@@ -49,6 +53,8 @@
            .append("\n\t\t[-ackto acktimeout]") 
            .append("\n\t\t[-autoconnect true|false]")
            .append("\n\t\t[-sync true|false]")
+           .append("\n\t\t[-transport 
org.apache.catalina.tribes.tcp.nio.ParallelNioSender]")
+           .append("\n\t\t[-transport.xxx transport specific property]")
            .append("\n\t\t[-maddr multicastaddr]")
            .append("\n\t\t[-mport multicastport]")
            .append("\n\t\t[-mbind multicastbindaddr]")
@@ -63,14 +69,13 @@
 
     }
 
-    public static Channel createChannel(String[] args) {
+    public static Channel createChannel(String[] args) throws Exception {
         String bind = "auto";
         int port = 4001;
         String mbind = null;
         boolean ack = false;
         boolean sync = false;
         boolean gzip = false;
-        String sender = "pooled";
         int tcpseltimeout = 100;
         int tcpthreadcount = 4;
         int acktimeout = 15000;
@@ -83,12 +88,12 @@
         int ordersize = Integer.MAX_VALUE;
         boolean frag = false;
         int fragsize = 1024;
+        Properties transportProperties = new Properties();
+        String transport = 
"org.apache.catalina.tribes.tcp.nio.PooledParallelSender";
         
         for (int i = 0; i < args.length; i++) {
             if ("-bind".equals(args[i])) {
                 bind = args[++i];
-            } else if ("-sender".equals(args[i])) {
-                sender = args[++i];
             } else if ("-port".equals(args[i])) {
                 port = Integer.parseInt(args[++i]);
             } else if ("-tcpselto".equals(args[i])) {
@@ -115,6 +120,12 @@
                 sync = Boolean.parseBoolean(args[++i]);
             } else if ("-autoconnect".equals(args[i])) {
                 autoconnect = Boolean.parseBoolean(args[++i]);
+            } else if ("-transport".equals(args[i])) {
+                transport = args[++i];
+            } else if (args[i]!=null && args[i].startsWith("transport.")) {
+                String key = args[i];
+                String val = args[++i];
+                transportProperties.setProperty(key,val);
             } else if ("-maddr".equals(args[i])) {
                 mcastaddr = args[++i];
             } else if ("-mport".equals(args[i])) {
@@ -137,11 +148,22 @@
         rl.setSendAck(ack);
         rl.setSynchronized(sync);
 
+        
         ReplicationTransmitter ps = new ReplicationTransmitter();
-        ps.setReplicationMode(sender);
-        ps.setAckTimeout(acktimeout);
-        ps.setAutoConnect(autoconnect);
-        ps.setWaitForAck(ack);
+        MultiPointSender sender = 
(MultiPointSender)Class.forName(transport,true,ChannelCreator.class.getClassLoader()).newInstance();
+        sender.setTimeout(acktimeout);
+        sender.setAutoConnect(autoconnect);
+        sender.setWaitForAck(ack);
+        sender.setMaxRetryAttempts(2);
+        sender.setRxBufSize(43800);
+        sender.setTxBufSize(25188);
+
+        Iterator i = transportProperties.keySet().iterator();
+        while ( i.hasNext() ) {
+            String key = (String)i.next();
+            
IntrospectionUtils.setProperty(sender,key,transportProperties.getProperty(key));
+        }
+        ps.setTransport(sender);
 
         McastService service = new McastService();
         service.setMcastAddr(mcastaddr);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to