Author: fhanik
Date: Mon Feb 27 07:43:00 2006
New Revision: 381364

URL: http://svn.apache.org/viewcvs?rev=381364&view=rev
Log:
Created a load test, also implemented a "synchronized/asynchronized" option, 
since we are using TCP.

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java
    tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java?rev=381364&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java
 Mon Feb 27 07:43:00 2006
@@ -0,0 +1,44 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes;
+
+import java.io.Serializable;
+
+/**
+ * A byte message is not serialized and deserialized by the channel
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
+ */
+
+public class ByteMessage implements Serializable {
+    private byte[] message;
+    
+    public ByteMessage() {
+        
+    }
+    public ByteMessage(byte[] data) {
+        message = data;
+    }
+    
+    public byte[] getMessage() {
+        return message;
+    }
+
+    public void setMessage(byte[] message) {
+        this.message = message;
+    }
+
+}
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
 Mon Feb 27 07:43:00 2006
@@ -74,7 +74,7 @@
      * @param options int - sender options, see class documentation
      * @return ClusterMessage[] - the replies from the members, if any. 
      */
-    public void send(Member[] destination, Serializable msg, int options) 
throws ChannelException;
+    public void send(Member[] destination, Serializable msg) throws 
ChannelException;
 
     
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java
 Mon Feb 27 07:43:00 2006
@@ -20,7 +20,6 @@
 /**
  * Cluster Receiver Interface
  * @author Filip Hanik
- * @author Peter Rossbach
  * @version $Revision: 379904 $, $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 
2006) $
  */
 public interface ChannelReceiver {
@@ -35,8 +34,11 @@
      */
     public void stop();
 
-    
-    public boolean isSendAck();
+    /**
+     * returns true of the receiver is sending acks when it receives messages
+     * @return boolean
+     */
+    public boolean getSendAck();
     
     /**
      * set ack mode
@@ -44,9 +46,6 @@
      */
     public void setSendAck(boolean isSendAck);
     
-    public boolean isCompress() ;
-    public void setCompress(boolean compress);
-
     /**
      * get the listing ip interface
      * @return The host

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 Mon Feb 27 07:43:00 2006
@@ -40,10 +40,6 @@
 
     public void sendMessage(ChannelMessage message) throws java.io.IOException;
     
-    public boolean isWaitForAck();
+    public boolean getWaitForAck();
     public void setWaitForAck(boolean isWaitForAck);
-
-    public boolean isCompress() ;
-    public void setCompress(boolean compress);
-
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 Mon Feb 27 07:43:00 2006
@@ -83,13 +83,10 @@
         try {
             //synchronize, big time FIXME
             
membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), 
getClusterReceiver().getPort());
-            clusterReceiver.setSendAck(clusterSender.isWaitForAck());
-            clusterReceiver.setCompress(clusterSender.isCompress());
             //end FIXME
-            
-            if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) 
membershipService.start(MembershipService.MBR_RX);
             if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) 
clusterReceiver.start();
             if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) 
clusterSender.start();
+            if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) 
membershipService.start(MembershipService.MBR_RX);
             if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) 
membershipService.start(MembershipService.MBR_TX);
         }catch ( ChannelException cx ) {
             throw cx;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Mon Feb 27 07:43:00 2006
@@ -33,6 +33,7 @@
 import org.apache.catalina.tribes.ManagedChannel;
 import java.util.Iterator;
 import java.util.UUID;
+import org.apache.catalina.tribes.ByteMessage;
 
 /**
  * The GroupChannel manages the replication channel. It coordinates
@@ -43,6 +44,8 @@
  * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
  */
 public class GroupChannel extends ChannelInterceptorBase implements 
ManagedChannel {
+    public static final int BYTE_MESSAGE = 0x0001;
+    
     private ChannelCoordinator coordinator = new ChannelCoordinator();
     private ChannelInterceptor interceptors = null;
     private MembershipListener membershipListener;
@@ -86,15 +89,23 @@
      * @param options int - sender options, see class documentation
      * @return ClusterMessage[] - the replies from the members, if any.
      */
-    public void send(Member[] destination, Serializable msg, int options) 
throws ChannelException {
+    public void send(Member[] destination, Serializable msg) throws 
ChannelException {
         if ( msg == null ) return;
         try {
+            int options = 0;
             ClusterData data = new ClusterData();
             data.setAddress(getLocalMember());
             data.setUniqueId(UUID.randomUUID().toString());
             data.setTimestamp(System.currentTimeMillis());
+            byte[] b = null;
+            if ( msg instanceof ByteMessage ){
+                b = ((ByteMessage)msg).getMessage();
+                options = options | BYTE_MESSAGE;
+            } else {
+                b = XByteBuffer.serialize(msg);
+            }
             data.setOptions(options);
-            byte[] b = XByteBuffer.serialize(msg);
+            
             data.setMessage(b);
             getFirstInterceptor().sendMessage(destination, data, null);
         }catch ( Exception x ) {
@@ -105,7 +116,13 @@
     public void messageReceived(ChannelMessage msg) {
         if ( msg == null ) return;
         try {
-            Serializable fwd = XByteBuffer.deserialize(msg.getMessage());
+            
+            Serializable fwd = null;
+            if ( (msg.getOptions() & BYTE_MESSAGE) == BYTE_MESSAGE ) {
+                fwd = new ByteMessage(msg.getMessage());
+            } else {
+                fwd = XByteBuffer.deserialize(msg.getMessage());
+            }
             if ( channelListener != null ) 
channelListener.messageReceived(fwd,msg.getAddress());
         }catch ( Exception x ) {
             log.error("Unable to deserialize channel message.",x);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java
 Mon Feb 27 07:43:00 2006
@@ -25,7 +25,6 @@
  * when data has been received. The interface does not care about
  * objects and marshalling and just passes the bytes straight through.
  * @author Filip Hanik
- * @author Peter Rossbach
  * @version $Revision: 303987 $, $Date: 2005-07-08 15:50:30 -0500 (Fri, 08 Jul 
2005) $
  */
 public interface ListenCallback
@@ -37,13 +36,4 @@
      */
      public void messageDataReceived(ChannelMessage data);
      
-    /** receiver must be send ack
-      */
-     public boolean isSendAck() ;
-     
-     /** send ack
-      *
-      */
-     public void sendAck() throws java.io.IOException ;
-
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 Mon Feb 27 07:43:00 2006
@@ -110,6 +110,14 @@
     }
     
     /**
+     * Returns the number of packages that the reader has read
+     * @return int
+     */
+    public int count() {
+        return buffer.countPackages();
+    }
+    
+    /**
      * Write Ack to sender
      * @param buf
      * @return The bytes written count

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Mon Feb 27 07:43:00 2006
@@ -39,16 +39,12 @@
  * Transfer package:
  * <ul>
  * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
- * <li><b>COMPRESS</b>  - 4 bytes - is message compressed flag</li>
+ * <li><b>OPTIONS</b>  - 4 bytes - message options, implementation 
specific</li>
  * <li><b>SIZE</b>      - 4 bytes - size of the data package</li>
  * <li><b>DATA</b>      - should be as many bytes as the prev SIZE</li>
  * <li><b>END_DATA</b>  - 7 bytes - <i>TLF2003</i></lI>
  * </ul>
- * FIXME: Why we not use a list of byte buffers?
- * FIXME: Used a pool of buffers instead, every time new generation
- *
  * @author Filip Hanik
- * @author Peter Rossbach
  * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 
2006) $
  */
 public class XByteBuffer

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
 Mon Feb 27 07:43:00 2006
@@ -256,7 +256,7 @@
      * String representation of this object
      */
     public String toString()  {
-        return 
"org.apache.catalina.tribes.mcast.McastMember["+getName()+","+domain+","+host+","+port+",
 alive="+memberAliveTime+"]";
+        return 
"org.apache.catalina.tribes.mcast.McastMember["+getName()+","+domain+","+getHostname()+","+port+",
 alive="+memberAliveTime+"]";
     }
 
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 Mon Feb 27 07:43:00 2006
@@ -738,8 +738,8 @@
     protected void addStats(int length) {
         nrOfRequests++;
         totalBytes += length;
-        if (log.isInfoEnabled() && (nrOfRequests % 1000) == 0) {
-            log.info(sm.getString("IDataSender.stats", new Object[] {
+        if (log.isDebugEnabled() && (nrOfRequests % 1000) == 0) {
+            log.debug(sm.getString("IDataSender.stats", new Object[] {
                     getAddress().getHostAddress(), new Integer(getPort()),
                     new Long(totalBytes), new Long(nrOfRequests),
                     new Long(totalBytes / nrOfRequests),

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
 Mon Feb 27 07:43:00 2006
@@ -29,12 +29,9 @@
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelReceiver;
 import org.apache.catalina.tribes.MessageListener;
-
 import org.apache.catalina.tribes.io.ListenCallback;
 import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.util.StringManager;
-import java.io.Serializable;
 
 /**
  * @author Filip Hanik
@@ -74,6 +71,7 @@
 
     private Object interestOpsMutex = new Object();
     private MessageListener listener = null;
+    private boolean sync;
     public ReplicationListener() {
     }
 
@@ -119,7 +117,7 @@
      */
     public void start() {
         try {
-            pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, 
interestOpsMutex);
+            pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, 
interestOpsMutex, getWorkerThreadOptions());
         } catch (Exception e) {
             log.error("ThreadPool can initilzed. Listener not started", e);
             return;
@@ -290,7 +288,7 @@
                 log.debug("No TcpReplicationThread available");
         } else {
             // invoking this wakes up the worker thread then returns
-            worker.serviceChannel(key, isSendAck());
+            worker.serviceChannel(key);
         }
     }
 
@@ -348,7 +346,7 @@
      *
      * @return True if sending ACK
      */
-    public boolean isSendAck() {
+    public boolean getSendAck() {
         return sendAck;
     }
 
@@ -373,12 +371,32 @@
         return tcpListenPort;
     }
 
+    public boolean isSync() {
+        return sync;
+    }
+
     public MessageListener getMessageListener() {
         return listener;
     }
 
     public void setTcpListenPort(int tcpListenPort) {
         this.tcpListenPort = tcpListenPort;
+    }
+
+
+    public void setSynchronized(boolean sync) {
+        this.sync = sync;
+    }
+
+    public boolean getSynchronized() {
+        return this.sync;
+    }
+    
+    public int getWorkerThreadOptions() {
+        int options = 0;
+        if ( getSynchronized() ) options = options 
|TcpReplicationThread.OPTION_SYNCHRONIZED;
+        if ( getSendAck() ) options = options 
|TcpReplicationThread.OPTION_SEND_ACK;
+        return options;
     }
 
     public void setMessageListener(MessageListener listener) {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx
 Mon Feb 27 07:43:00 2006
@@ -1,18 +1,20 @@
 [PropertyInfo]
 bind,java.net.InetAddress,false,false, , ,true,<default>
 compress,boolean,false,false, , ,true,<default>
-coordinator,org.apache.catalina.cluster.MessageListener,false,false, , 
,true,<default>
-coordinator,org.apache.catalina.cluster.group.ChannelInterceptorBase,false,false,coordinator,coordinator,true,<default>
 doListen,boolean,false,false, , ,false,<default>
 host,String,false,false, , ,true,<default>
 info,String,false,false, , ,true,<default>
 interestOpsMutex,Object,false,false, , ,true,<default>
+listener,org.apache.catalina.tribes.MessageListener,false,false, , 
,false,<default>
 log,org.apache.commons.logging.Log,false,false, , ,false,<default>
-pool,org.apache.catalina.cluster.tcp.ThreadPool,false,false, , ,false,<default>
+messageListener,org.apache.catalina.tribes.MessageListener,false,false, , 
,true,<default>
+pool,org.apache.catalina.tribes.tcp.ThreadPool,false,false, , ,false,<default>
 port,int,false,false, , ,true,<default>
 selector,java.nio.channels.Selector,false,false, , ,false,<default>
 sendAck,boolean,false,false, , ,true,<default>
 sm,org.apache.catalina.util.StringManager,false,false, , ,false,<default>
+sync,boolean,false,false,sync,sync,true,<default>
+synchronized,boolean,false,false, , ,true,<default>
 tcpListenAddress,String,false,false, , ,true,<default>
 tcpListenPort,int,false,false, , ,true,<default>
 tcpSelectorTimeout,long,false,false, , ,true,<default>

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 Mon Feb 27 07:43:00 2006
@@ -16,38 +16,26 @@
 
 package org.apache.catalina.tribes.tcp;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.zip.GZIPOutputStream;
-
-import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.catalina.Container;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelSender;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.util.IDynamicProperty;
-import org.apache.catalina.core.StandardHost;
 import org.apache.catalina.util.StringManager;
 import org.apache.tomcat.util.IntrospectionUtils;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.io.*;
 
 /**
  * Transmit message to other cluster members
  * Actual senders are created based on the replicationMode
  * type 
- * FIXME i18n log messages
- * FIXME compress data depends on message type and size 
- * TODO pause and resume senders
  * 
- * @author Peter Rossbach
  * @author Filip Hanik
+ * @author Peter Rossbach
  * @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb 
2006) $
  */
 public class ReplicationTransmitter implements ChannelSender,IDynamicProperty {
@@ -112,14 +100,7 @@
     /**
      * autoConnect sender when next message send
      */
-    private boolean autoConnect = false;
-
-    /**
-     * Compress message data bytes
-     */
-    private boolean compress = false;
-
-    /**
+    private boolean autoConnect = false; /**
      * doTransmitterProcessingStats
      */
     protected boolean doTransmitterProcessingStats = false;
@@ -263,22 +244,6 @@
     public ObjectName getObjectName() {
         return objectName;
     }
-
-    /**
-     * @return Returns the compress.
-     */
-    public boolean isCompress() {
-        return compress;
-    }
-
-    /**
-     * @param compressMessageData
-     *            The compress to set.
-     */
-    public void setCompress(boolean compressMessageData) {
-        this.compress = compressMessageData;
-    }
-
     /**
      * @return Returns the autoConnect.
      */
@@ -314,7 +279,7 @@
     /**
      * @return Returns the waitForAck.
      */
-    public boolean isWaitForAck() {
+    public boolean getWaitForAck() {
         return waitForAck;
     }
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
 Mon Feb 27 07:43:00 2006
@@ -37,14 +37,14 @@
  * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 
2006) $
  */
 public class TcpReplicationThread extends WorkerThread {
+    public static final int OPTION_SEND_ACK = 0x0001;
+    public static final int OPTION_SYNCHRONIZED = 0x0002;
+
     public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
     private static org.apache.commons.logging.Log log =
         org.apache.commons.logging.LogFactory.getLog( 
TcpReplicationThread.class );
     private ByteBuffer buffer = ByteBuffer.allocate (1024);
     private SelectionKey key;
-    private boolean sendAck=true;
-
-    
     TcpReplicationThread ()
     {
     }
@@ -104,10 +104,9 @@
      * to ignore read-readiness for this channel while the
      * worker thread is servicing it.
      */
-    synchronized void serviceChannel (SelectionKey key, boolean sendAck)
+    synchronized void serviceChannel (SelectionKey key)
     {
         this.key = key;
-        this.sendAck=sendAck;
         key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
         key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
         this.notify();         // awaken the thread
@@ -135,19 +134,42 @@
             reader.append(buffer.array(),0,count);
             buffer.clear();            // make buffer empty
         }
+        
+        int pkgcnt = reader.count();
+
+        
+
+        /**
+         * Use send ack here if you want to ack the request to the remote 
+         * server before completing the request
+         * This is considered an asynchronized request
+         */
+        if (sendAckAsync()) {
+            while ( pkgcnt > 0 ) {
+                sendAck(key,channel);
+                pkgcnt--;
+            }
+        }
+
         //check to see if any data is available
-        int pkgcnt = reader.execute();
+        pkgcnt = reader.execute();
+
         if (log.isTraceEnabled()) {
             log.trace("sending " + pkgcnt + " ack packages to " + 
channel.socket().getLocalPort() );
         }
 
-        
-        if (sendAck) {
+        /**
+         * Use send ack here if you want the request to complete on this 
+         * server before sending the ack to the remote server
+         * This is considered a synchronized request
+         */
+        if (sendAckSync()) {
             while ( pkgcnt > 0 ) {
                 sendAck(key,channel);
                 pkgcnt--;
             }
-        }
+        }        
+
         
         if (count < 0) {
             // close channel on EOF, invalidates the key
@@ -166,6 +188,20 @@
         }
         
     }
+    
+    
+    public boolean sendAckSync() {
+        int options = getOptions();
+        return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
+               ((OPTION_SYNCHRONIZED & options) == OPTION_SYNCHRONIZED);
+    }
+    
+    public boolean sendAckAsync() {
+        int options = getOptions();
+        return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) &&
+               ((OPTION_SYNCHRONIZED & options) != OPTION_SYNCHRONIZED);
+    }
+
 
     /**
      * send a reply-acknowledgement (6,2,3)

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
 Mon Feb 27 07:43:00 2006
@@ -35,7 +35,7 @@
     Object mutex = new Object();
     Object interestOpsMutex = null;
 
-    ThreadPool (int poolSize, Class threadClass, Object interestOpsMutex) 
throws Exception {
+    ThreadPool (int poolSize, Class threadClass, Object interestOpsMutex, int 
threadOptions) throws Exception {
         // fill up the pool with worker threads
         this.interestOpsMutex = interestOpsMutex;
         for (int i = 0; i < poolSize; i++) {
@@ -46,6 +46,7 @@
             thread.setName (threadClass.getName()+"[" + (i + 1)+"]");
             thread.setDaemon(true);
             thread.setPriority(Thread.MAX_PRIORITY);
+            thread.setOptions(threadOptions);
             thread.start();
 
             idle.add (thread);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java
 Mon Feb 27 07:43:00 2006
@@ -25,14 +25,22 @@
 {
     protected ThreadPool pool;
     protected boolean doRun = true;
-
+    private int options;
 
     public void setPool(ThreadPool pool) {
         this.pool = pool;
     }
-    
+
+    public void setOptions(int options) {
+        this.options = options;
+    }
+
     public ThreadPool getPool() {
         return pool;
+    }
+
+    public int getOptions() {
+        return options;
     }
 
     public void close()

Modified: tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml?rev=381364&r1=381363&r2=381364&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml (original)
+++ tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml Mon Feb 27 
07:43:00 2006
@@ -298,14 +298,15 @@
                     tcpListenAddress="auto"
                     tcpListenPort="4002"
                     tcpSelectorTimeout="100"
-                    tcpThreadCount="6"/>
+                    tcpThreadCount="6"
+                    sendAck="true"
+                    synchronized="true"/>
 
                 <Sender
                     
className="org.apache.catalina.tribes.tcp.ReplicationTransmitter"
                     replicationMode="pooled"
-                    ackTimeout="150000"
-                    waitForAck="true"
-                    compress="true"/>
+                    ackTimeout="15000"
+                    waitForAck="true"/>
                     
                 <Interceptor 
className="org.apache.catalina.tribes.group.interceptors.GzipInterceptor"/>
             </Channel>



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to