Author: fhanik Date: Mon Mar 13 10:00:08 2006 New Revision: 385606 URL: http://svn.apache.org/viewcvs?rev=385606&view=rev Log: Refactoring the senders into a cleaner structure
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java - copied, changed from r385604, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java Removed: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenderFactory.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/IDynamicProperty.java Modified: tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Modified: tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml (original) +++ tomcat/container/tc5.5.x/modules/groupcom/etc/cluster-server.xml Mon Mar 13 10:00:08 2006 @@ -305,12 +305,30 @@ txBufSize="25188"/> <Sender - className="org.apache.catalina.tribes.tcp.ReplicationTransmitter" - replicationMode="pooled" - ackTimeout="15000" - waitForAck="true" - rxBufSize="43800" - txBufSize="25188"/> + className="org.apache.catalina.tribes.tcp.ReplicationTransmitter"> + <transport className="org.apache.catalina.tribes.tcp.nio.PooledParallelSender" + maxRetryAttempts="2" + timeout="15000" + waitForAck="true" + ackTimeout="15000" + rxBufSize="43800" + txBufSize="25188" + autoConnect="true" + poolSize="25"/> + <!-- + <transport className="org.apache.catalina.tribes.tcp.bio.PooledBioSender" + maxRetryAttempts="2" + timeout="15000" + waitForAck="true" + ackTimeout="15000" + rxBufSize="43800" + txBufSize="25188" + autoConnect="true" + poolSize="25"/> + --> + + </Sender> + <!-- <Interceptor className="org.apache.catalina.tribes.group.interceptors.GzipInterceptor"/> Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java Mon Mar 13 10:00:08 2006 @@ -25,21 +25,10 @@ */ public interface ChannelSender { - public void add(Member member); - public void remove(Member member); - public void start() throws java.io.IOException; - public void stop(); - public void heartbeat() ; - public void sendMessage(ChannelMessage message, Member[] destination) throws ChannelException; - - public boolean getWaitForAck(); - public void setWaitForAck(boolean isWaitForAck); - - public boolean isParallel(); } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java?rev=385606&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AbstractPooledSender.java Mon Mar 13 10:00:08 2006 @@ -0,0 +1,79 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.tcp; + +import java.io.IOException; + +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.tcp.DataSender; +import org.apache.catalina.tribes.tcp.MultiPointSender; +import org.apache.catalina.tribes.tcp.PooledSender; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public abstract class AbstractPooledSender extends PooledSender implements MultiPointSender{ + protected boolean suspect; + protected boolean useDirectBuffer; + protected int maxRetryAttempts; + protected boolean autoConnect; + public AbstractPooledSender() { + super(); + } + + public void setSuspect(boolean suspect) { + this.suspect = suspect; + } + + public void setUseDirectBuffer(boolean useDirectBuffer) { + this.useDirectBuffer = useDirectBuffer; + } + + public void setMaxRetryAttempts(int maxRetryAttempts) { + this.maxRetryAttempts = maxRetryAttempts; + } + + public void setAutoConnect(boolean autoConnect) { + this.autoConnect = autoConnect; + } + + public boolean getSuspect() { + return suspect; + } + + public boolean getUseDirectBuffer() { + return useDirectBuffer; + } + + public int getMaxRetryAttempts() { + return maxRetryAttempts; + } + + public boolean isAutoConnect() { + return autoConnect; + } +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Mar 13 10:00:08 2006 @@ -1,5 +1,5 @@ /* - * Copyright 1999,2004 The Apache Software Foundation. + * Copyright 1999,2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,4 @@ public boolean checkKeepAlive(); public void setTimeout(long timeout); public void setWaitForAck(boolean isWaitForAck); - - - } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/MultiPointSender.java Mon Mar 13 10:00:08 2006 @@ -1,5 +1,5 @@ /* - * Copyright 1999,2004 The Apache Software Foundation. + * Copyright 1999,2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,5 +34,8 @@ public void setUseDirectBuffer(boolean directBuf); public void setSuspect(boolean suspect); public boolean getSuspect(); + public void memberAdded(Member member); + public void memberRemoved(Member member); + public void setAutoConnect(boolean auto); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSender.java Mon Mar 13 10:00:08 2006 @@ -1,11 +1,23 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.catalina.tribes.tcp; -import java.util.LinkedList; +import java.util.List; import org.apache.catalina.tribes.ChannelException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.List; /** * <p>Title: </p> @@ -27,9 +39,10 @@ private int txBufSize; private boolean waitForAck; private long timeout; + private int poolSize = 25; - public PooledSender(int queueSize) { - queue = new SenderQueue(this,queueSize); + public PooledSender() { + queue = new SenderQueue(this,poolSize); } public abstract DataSender getNewDataSender(); @@ -83,6 +96,11 @@ this.timeout = timeout; } + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + queue.setLimit(poolSize); + } + public boolean isConnected() { return connected; } @@ -103,6 +121,10 @@ return timeout; } + public int getPoolSize() { + return poolSize; + } + public boolean checkKeepAlive() { //do nothing, the pool checks on every return return false; @@ -187,7 +209,9 @@ } //to do inuse.remove(sender); - notinuse.add(sender); + //just in case the limit has changed + if ( notinuse.size() < this.getLimit() ) notinuse.add(sender); + else try {sender.disconnect(); } catch ( Exception ignore){} notify(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Mon Mar 13 10:00:08 2006 @@ -16,20 +16,15 @@ package org.apache.catalina.tribes.tcp; -import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import javax.management.ObjectName; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelSender; import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.util.IDynamicProperty; import org.apache.catalina.util.StringManager; -import org.apache.tomcat.util.IntrospectionUtils; -import org.apache.catalina.tribes.tcp.nio.PooledParallelSender; /** * Transmit message to other cluster members @@ -39,7 +34,7 @@ * @author Filip Hanik * @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb 2006) $ */ -public class ReplicationTransmitter implements ChannelSender,IDynamicProperty { +public class ReplicationTransmitter implements ChannelSender { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReplicationTransmitter.class); /** @@ -53,9 +48,7 @@ protected StringManager sm = StringManager.getManager(Constants.Package); - private Map map = new HashMap(); - - /** + /* * @todo make this configurable */ protected int rxBufSize = 43800; @@ -66,12 +59,6 @@ public ReplicationTransmitter() { } - - /** - * current sender replication mode - */ - private String replicationMode; - /** * sender default ackTimeout */ @@ -85,15 +72,8 @@ /** * autoConnect sender when next message send */ - private boolean autoConnect = false; - - /** - * dynamic sender <code>properties</code> - */ - private Map properties = new HashMap(); - - - // ------------------------------------------------------------- Properties + private boolean autoConnect = false; + private MultiPointSender transport; /** * Return descriptive information about this implementation and the @@ -104,160 +84,12 @@ return (info); } - - - /** - * current replication mode - * - * @return The mode - */ - public String getReplicationMode() { - return replicationMode; - } - - /** - * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue) - * - * @see IDataSenderFactory#validateMode(String) - * @param mode - */ - public void setReplicationMode(String mode) { - String msg = DataSenderFactory.validateMode(mode); - if (msg == null) { - if (log.isDebugEnabled()) - log.debug("Setting replication mode to " + mode); - this.replicationMode = mode; - } else - throw new IllegalArgumentException(msg); - - } - - - - /** - * @return Returns the autoConnect. - */ - public boolean isAutoConnect() { - return autoConnect; - } - - /** - * @param autoConnect - * The autoConnect to set. - */ - public void setAutoConnect(boolean autoConnect) { - this.autoConnect = autoConnect; - setProperty("autoConnect", String.valueOf(autoConnect)); - - } - - /** - * @return The ack timeout - */ - public long getAckTimeout() { - return ackTimeout; - } - - /** - * @param ackTimeout - */ - public void setAckTimeout(long ackTimeout) { - this.ackTimeout = ackTimeout; - setProperty("ackTimeout", String.valueOf(ackTimeout)); - } - - /** - * @return Returns the waitForAck. - */ - public boolean getWaitForAck() { - return waitForAck; - } - - /** - * @param waitForAck - * The waitForAck to set. - */ - public void setWaitForAck(boolean waitForAck) { - this.waitForAck = waitForAck; - setProperty("waitForAck", String.valueOf(waitForAck)); - } - - - public int getTxBufSize() { - return txBufSize; - } - - public int getRxBufSize() { - return rxBufSize; - } - - public boolean isParallel() { - return "parallel".equals(replicationMode); - } - - public void setTxBufSize(int txBufSize) { - this.txBufSize = txBufSize; - } - - public void setRxBufSize(int rxBufSize) { - this.rxBufSize = rxBufSize; - } - - /** - * @return True if synchronized sender - */ - public boolean getIsSenderSynchronized() { - return - DataSenderFactory.SYNC_MODE.equals(replicationMode) || - DataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode) || - (DataSenderFactory.PARALLEL_MODE.equals(replicationMode) && waitForAck); + public MultiPointSender getTransport() { + return transport; } - // ------------------------------------------------------------- dynamic - // sender property handling - - /** - * set config attributes with reflect - * - * @param name - * @param value - */ - public void setProperty(String name, Object value) { - if (log.isTraceEnabled()) - log.trace(sm.getString("ReplicationTransmitter.setProperty", name, - value, properties.get(name))); - - properties.put(name, value); - } - - /** - * get current config - * - * @param key - * @return The property - */ - public Object getProperty(String key) { - if (log.isTraceEnabled()) - log.trace(sm.getString("ReplicationTransmitter.getProperty", key)); - return properties.get(key); - } - - /** - * Get all properties keys - * - * @return An iterator over the propery name set - */ - public Iterator getPropertyNames() { - return properties.keySet().iterator(); - } - - /** - * remove a configured property. - * - * @param key - */ - public void removeProperty(String key) { - properties.remove(key); + public void setTransport(MultiPointSender transport) { + this.transport = transport; } // ------------------------------------------------------------- public @@ -267,50 +99,10 @@ * @see org.apache.catalina.tribes.ClusterSender#sendMessage(org.apache.catalina.tribes.ClusterMessage, org.apache.catalina.tribes.Member) */ public void sendMessage(ChannelMessage message, Member[] destination) throws ChannelException { - if ( !isParallel() ) { - ChannelException exception = null; - for (int i = 0; i < destination.length; i++) { - try { - sendMessage(message, destination[i]); - } catch (Exception x) { - if (exception == null) exception = new ChannelException(x); - exception.addFaultyMember(destination[i]); - } - } - if (exception != null)throw exception; - } else { - MultiPointSender sender = getParallelSender(); - sender.sendMessage(destination,message); - } + MultiPointSender sender = getTransport(); + sender.sendMessage(destination,message); } - /** - * @todo FIX THIS TO BE IN THE FACTORY - */ - PooledParallelSender parallelsender = null; - public MultiPointSender getParallelSender() { - if ( parallelsender == null ) { - PooledParallelSender sender = new PooledParallelSender(); - sender.setMaxRetryAttempts(2); - sender.setRxBufSize(getRxBufSize()); - sender.setTimeout(ackTimeout); - sender.setUseDirectBuffer(true); - sender.setWaitForAck(getWaitForAck()); - sender.setTxBufSize(getTxBufSize()); - parallelsender = sender; - } - return parallelsender; - } - - public void sendMessage(ChannelMessage message, Member destination) throws ChannelException { - Object key = getKey(destination); - SinglePointSender sender = (SinglePointSender) map.get(key); - if ( sender == null ) { - add(destination); - sender = (SinglePointSender) map.get(key); - } - sendMessageData(message, sender); - } /** * start the sender and register transmitter mbean @@ -326,16 +118,7 @@ * @see org.apache.catalina.tribes.ClusterSender#stop() */ public synchronized void stop() { - Iterator i = map.entrySet().iterator(); - while (i.hasNext()) { - SinglePointSender sender = (SinglePointSender) ((java.util.Map.Entry) i.next()) - .getValue(); - try { - sender.disconnect(); - } catch (Exception x) { - } - i.remove(); - } + getTransport().disconnect(); } /** @@ -353,35 +136,10 @@ * @see DataSender#checkKeepAlive() */ public void checkKeepAlive() { - if (map.size() > 0) { - java.util.Iterator iter = map.entrySet().iterator(); - while (iter.hasNext()) { - SinglePointSender sender = (SinglePointSender) ((java.util.Map.Entry) iter - .next()).getValue(); - if (sender != null) - sender.checkKeepAlive(); - } - } + getTransport().checkKeepAlive(); } - /** - * get all current senders - * - * @return The senders - */ - public SinglePointSender[] getSenders() { - java.util.Iterator iter = map.entrySet().iterator(); - SinglePointSender[] array = new SinglePointSender[map.size()]; - int i = 0; - while (iter.hasNext()) { - SinglePointSender sender = (SinglePointSender) ((java.util.Map.Entry) iter - .next()).getValue(); - if (sender != null) - array[i] = sender; - i++; - } - return array; - } + /** * add new cluster member and create sender ( s. replicationMode) transfer @@ -390,22 +148,7 @@ * @see org.apache.catalina.tribes.ClusterSender#add(org.apache.catalina.tribes.Member) */ public synchronized void add(Member member) { - try { - if ( !isParallel() ) { - Object key = getKey(member); - if (!map.containsKey(key)) { - SinglePointSender sender = DataSenderFactory.getSingleSender(replicationMode, member); - if (sender != null) { - transferSenderProperty(sender); - sender.setRxBufSize(getRxBufSize()); - sender.setTxBufSize(getTxBufSize()); - map.put(key, sender); - } - } - } - } catch (java.io.IOException x) { - log.error("Unable to create and add a IDataSender object.", x); - } + getTransport().memberAdded(member); } /** @@ -414,76 +157,11 @@ * @see org.apache.catalina.tribes.ClusterSender#remove(org.apache.catalina.tribes.Member) */ public synchronized void remove(Member member) { - Object key = getKey(member); - SinglePointSender toberemoved = (SinglePointSender) map.get(key); - if (toberemoved == null) - return; - toberemoved.disconnect(); - map.remove(key); - + getTransport().memberRemoved(member); } // ------------------------------------------------------------- protected - /** - * Transfer all properties from transmitter to concrete sender - * - * @param sender - */ - protected void transferSenderProperty(SinglePointSender sender) { - for (Iterator iter = getPropertyNames(); iter.hasNext();) { - String pkey = (String) iter.next(); - Object value = getProperty(pkey); - IntrospectionUtils.setProperty(sender, pkey, value.toString()); - } - } - - /** - * set unique key to find sender - * - * @param member - * @return concat member.host:member.port - */ - protected Object getKey(Member member) { - return member; - } - - /** - * Send message to concrete sender. If autoConnect is true, check is - * connection broken and the reconnect the complete sender. - * <ul> - * <li>failure the suspect flag is set true. After successfully sending the - * suspect flag is set to false.</li> - * <li>Stats is only update after sussesfull sending</li> - * </ul> - * - * @param data message Data - * @param sender concrete message sender - * @return true if the message got sent, false otherwise - * @throws java.io.IOException If an error occurs - */ - protected void sendMessageData(ChannelMessage data, - SinglePointSender sender) throws ChannelException { - if (sender == null) - throw new RuntimeException("Sender not available. Make sure sender information is available to the ReplicationTransmitter."); - try { - // deprecated not needed DataSender#pushMessage can handle connection - if (autoConnect) { - synchronized(sender) { - if(!sender.isConnected()) sender.connect(); - } - } - sender.sendMessage(data); - sender.setSuspect(false); - } catch (ChannelException x) { - if (!sender.getSuspect()) { - if (log.isErrorEnabled() ) log.error("Unable to send replicated message, is member ["+sender.toString()+"] down?",x); - } else if (log.isDebugEnabled() ) { - log.debug("Unable to send replicated message, is member ["+sender.toString()+"] down?",x); - } - sender.setSuspect(true); - throw x; - } + - } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/FastAsyncSocketSender.java Mon Mar 13 10:00:08 2006 @@ -1,5 +1,5 @@ /* - * Copyright 1999,2004-2005 The Apache Software Foundation. + * Copyright 1999,2004-2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of Copied: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java (from r385604, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java) URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java&r1=385604&r2=385606&rev=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultiSocketSender.java Mon Mar 13 10:00:08 2006 @@ -16,39 +16,29 @@ package org.apache.catalina.tribes.tcp.bio; -import java.io.IOException; import java.net.InetAddress; -import java.util.LinkedList; -import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.tcp.PooledSender; +import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.tcp.DataSender; +import org.apache.catalina.tribes.tcp.PooledSender; import org.apache.catalina.tribes.tcp.SenderState; -import org.apache.catalina.tribes.tcp.SinglePointSender; -import java.net.Inet4Address; /** * Send cluster messages with a pool of sockets (25). * * @author Filip Hanik - * @author Peter Rossbach * @version 1.2 */ -public class PooledSocketSender extends PooledSender implements SinglePointSender { +public class MultiSocketSender extends PooledSender implements DataSender { - private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory - .getLog(org.apache.catalina.tribes.tcp.bio.PooledSocketSender.class); + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(org.apache.catalina.tribes.tcp.bio.MultiSocketSender.class); /** * The descriptive information about this implementation. */ - private static final String info = "PooledSocketSender/2.0"; - - // ----------------------------------------------------- Instance Variables - - private int maxPoolSocketLimit = 25; + private static final String info = "MultiSocketSender/2.0"; private String domain; private InetAddress host; private int port; @@ -66,16 +56,12 @@ * @param host replication node tcp address * @param port replication node tcp port */ - public PooledSocketSender(String domain,InetAddress host, int port) { - this(domain,host,port,25); - } - - public PooledSocketSender(String domain,InetAddress host, int port, int poolSize) { - super(poolSize); + public MultiSocketSender(String domain,InetAddress host, int port, int poolSize) { + super(); + super.setPoolSize(poolSize); this.host = host; this.domain = domain; this.port = port; - this.maxPoolSocketLimit = poolSize; } // ----------------------------------------------------- Public Properties @@ -86,9 +72,7 @@ * <code><description>/<version></code>. */ public String getInfo() { - return (info); - } public void setDomain(String domain) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java Mon Mar 13 10:00:08 2006 @@ -21,14 +21,16 @@ import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.util.Arrays; +import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.tcp.Constants; +import org.apache.catalina.tribes.tcp.DataSender; +import org.apache.catalina.tribes.tcp.SenderState; import org.apache.catalina.util.StringManager; -import java.util.Arrays; -import org.apache.catalina.tribes.tcp.*; -import org.apache.catalina.tribes.ChannelException; /** * Send cluster messages with only one socket. Ack and keep Alive Handling is @@ -39,7 +41,7 @@ * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $ * @since 5.5.16 */ -public class SinglePointDataSender implements SinglePointSender { +public class SinglePointDataSender implements DataSender { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(SinglePointDataSender.class); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Mon Mar 13 10:00:08 2006 @@ -54,20 +54,10 @@ protected int txBufSize = 25188; protected boolean suspect = false; private boolean connected; + private boolean autoConnect; - public ParallelNioSender(long timeout, - boolean waitForAck, - int retryAttempts, - boolean directBuf, - int rxBufSize, - int txBufSize) throws IOException { - this.timeout = timeout; - this.waitForAck = waitForAck; - this.retryAttempts = retryAttempts; + public ParallelNioSender() throws IOException { selector = Selector.open(); - this.directBuf = directBuf; - this.rxBufSize = rxBufSize; - this.txBufSize = txBufSize; } @@ -214,6 +204,15 @@ if ( x != null ) throw x; } + public void memberAdded(Member member) { + + } + + public void memberRemoved(Member member) { + //disconnect senders + } + + public synchronized void disconnect() { try {close(); }catch (Exception x){} setConnected(false); @@ -231,6 +230,10 @@ return connected; } + public boolean isAutoConnect() { + return autoConnect; + } + public void setSuspect(boolean suspect) { this.suspect = suspect; } @@ -262,7 +265,11 @@ public void setConnected(boolean connected) { this.connected = connected; } - + + public void setAutoConnect(boolean autoConnect) { + this.autoConnect = autoConnect; + } + public boolean checkKeepAlive() { //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented"); return false; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/PooledParallelSender.java Mon Mar 13 10:00:08 2006 @@ -1,12 +1,28 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.catalina.tribes.tcp.nio; -import org.apache.catalina.tribes.tcp.PooledSender; -import org.apache.catalina.tribes.tcp.DataSender; -import org.apache.catalina.tribes.tcp.MultiPointSender; -import org.apache.catalina.tribes.Member; +import java.io.IOException; + import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; -import java.io.IOException; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.tcp.AbstractPooledSender; +import org.apache.catalina.tribes.tcp.DataSender; +import org.apache.catalina.tribes.tcp.MultiPointSender; /** * <p>Title: </p> @@ -20,14 +36,11 @@ * @author not attributable * @version 1.0 */ -public class PooledParallelSender extends PooledSender implements MultiPointSender{ - private boolean suspect; - private boolean useDirectBuffer; - private int maxRetryAttempts; - +public class PooledParallelSender extends AbstractPooledSender implements MultiPointSender { public PooledParallelSender() { - super(25); + super(); } + public void sendMessage(Member[] destination, ChannelMessage message) throws ChannelException { ParallelNioSender sender = (ParallelNioSender)getSender(); try { @@ -39,40 +52,24 @@ public DataSender getNewDataSender() { try { - ParallelNioSender sender = - new ParallelNioSender(getTimeout(), - getWaitForAck(), - getMaxRetryAttempts(), - useDirectBuffer, - getRxBufSize(), - getTxBufSize()); + ParallelNioSender sender = new ParallelNioSender(); + sender.setTimeout(getTimeout()); + sender.setWaitForAck(getWaitForAck()); + sender.setMaxRetryAttempts(getMaxRetryAttempts()); + sender.setUseDirectBuffer(getUseDirectBuffer()); + sender.setRxBufSize(getRxBufSize()); + sender.setTxBufSize(getTxBufSize()); return sender; } catch ( IOException x ) { throw new IllegalStateException("Unable to open NIO selector.",x); } } - public void setSuspect(boolean suspect) { - this.suspect = suspect; - } - - public void setUseDirectBuffer(boolean useDirectBuffer) { - this.useDirectBuffer = useDirectBuffer; - } - - public void setMaxRetryAttempts(int maxRetryAttempts) { - this.maxRetryAttempts = maxRetryAttempts; - } - - public boolean getSuspect() { - return suspect; - } - - public boolean getUseDirectBuffer() { - return useDirectBuffer; - } - - public int getMaxRetryAttempts() { - return maxRetryAttempts; + public void memberAdded(Member member) { + } + + public void memberRemoved(Member member) { + //disconnect senders + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=385606&r1=385605&r2=385606&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Mon Mar 13 10:00:08 2006 @@ -24,6 +24,11 @@ import org.apache.catalina.tribes.group.interceptors.GzipInterceptor; import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor; +import java.util.Properties; +import java.util.Iterator; +import org.apache.catalina.tribes.tcp.MultiPointSender; +import org.apache.tomcat.util.IntrospectionUtils; +import org.apache.catalina.tribes.tcp.nio.PooledParallelSender; /** * <p>Title: </p> @@ -40,8 +45,7 @@ public static StringBuffer usage() { StringBuffer buf = new StringBuffer(); - buf.append("\n\t\t[-sender pooled|fastasyncqueue]") - .append("\n\t\t[-bind tcpbindaddress]") + buf.append("\n\t\t[-bind tcpbindaddress]") .append("\n\t\t[-tcpselto tcpselectortimeout]") .append("\n\t\t[-tcpthreads tcpthreadcount]") .append("\n\t\t[-port tcplistenport]") @@ -49,6 +53,8 @@ .append("\n\t\t[-ackto acktimeout]") .append("\n\t\t[-autoconnect true|false]") .append("\n\t\t[-sync true|false]") + .append("\n\t\t[-transport org.apache.catalina.tribes.tcp.nio.ParallelNioSender]") + .append("\n\t\t[-transport.xxx transport specific property]") .append("\n\t\t[-maddr multicastaddr]") .append("\n\t\t[-mport multicastport]") .append("\n\t\t[-mbind multicastbindaddr]") @@ -63,14 +69,13 @@ } - public static Channel createChannel(String[] args) { + public static Channel createChannel(String[] args) throws Exception { String bind = "auto"; int port = 4001; String mbind = null; boolean ack = false; boolean sync = false; boolean gzip = false; - String sender = "pooled"; int tcpseltimeout = 100; int tcpthreadcount = 4; int acktimeout = 15000; @@ -83,12 +88,12 @@ int ordersize = Integer.MAX_VALUE; boolean frag = false; int fragsize = 1024; + Properties transportProperties = new Properties(); + String transport = "org.apache.catalina.tribes.tcp.nio.PooledParallelSender"; for (int i = 0; i < args.length; i++) { if ("-bind".equals(args[i])) { bind = args[++i]; - } else if ("-sender".equals(args[i])) { - sender = args[++i]; } else if ("-port".equals(args[i])) { port = Integer.parseInt(args[++i]); } else if ("-tcpselto".equals(args[i])) { @@ -115,6 +120,12 @@ sync = Boolean.parseBoolean(args[++i]); } else if ("-autoconnect".equals(args[i])) { autoconnect = Boolean.parseBoolean(args[++i]); + } else if ("-transport".equals(args[i])) { + transport = args[++i]; + } else if (args[i]!=null && args[i].startsWith("transport.")) { + String key = args[i]; + String val = args[++i]; + transportProperties.setProperty(key,val); } else if ("-maddr".equals(args[i])) { mcastaddr = args[++i]; } else if ("-mport".equals(args[i])) { @@ -137,11 +148,22 @@ rl.setSendAck(ack); rl.setSynchronized(sync); + ReplicationTransmitter ps = new ReplicationTransmitter(); - ps.setReplicationMode(sender); - ps.setAckTimeout(acktimeout); - ps.setAutoConnect(autoconnect); - ps.setWaitForAck(ack); + MultiPointSender sender = (MultiPointSender)Class.forName(transport,true,ChannelCreator.class.getClassLoader()).newInstance(); + sender.setTimeout(acktimeout); + sender.setAutoConnect(autoconnect); + sender.setWaitForAck(ack); + sender.setMaxRetryAttempts(2); + sender.setRxBufSize(43800); + sender.setTxBufSize(25188); + + Iterator i = transportProperties.keySet().iterator(); + while ( i.hasNext() ) { + String key = (String)i.next(); + IntrospectionUtils.setProperty(sender,key,transportProperties.getProperty(key)); + } + ps.setTransport(sender); McastService service = new McastService(); service.setMcastAddr(mcastaddr); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]