Author: fhanik
Date: Mon Feb 13 13:00:05 2006
New Revision: 377484

URL: http://svn.apache.org/viewcvs?rev=377484&view=rev
Log:
Started working on the cluster group, before I can fully do that, I need to 
clean up the dependencies between session replication logic and cluster core 
code.


Added:
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
Modified:
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java

Added: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java?rev=377484&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java
 Mon Feb 13 13:00:05 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster;
+
+/**
+ * Channel Exception 
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
+ */
+
+public class ChannelException
+    extends Exception {
+    public ChannelException() {
+        super();
+    }
+
+    public ChannelException(String message) {
+        super(message);
+    }
+
+    public ChannelException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ChannelException(Throwable cause) {
+        super(cause);
+    }
+
+}

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java
 Mon Feb 13 13:00:05 2006
@@ -24,4 +24,53 @@
  */
 public interface ClusterChannel {
     
+    /**
+     * Start and stop sequences can be controlled by these constants
+     */
+    public static final int DEFAULT = 15;
+    public static final int MBR_RX_SEQ = 1;
+    public static final int SND_TX_SEQ = 2;
+    public static final int SND_RX_SEQ = 4;
+    public static final int MBR_TX_SEQ = 8;
+    
+    /**
+     * Starts up the channel. This can be called multiple times for individual 
services to start
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will start all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is 
already started.
+     */
+    public void start(int svc) throws ChannelException;
+
+    /**
+     * Shuts down the channel. This can be called multiple times for 
individual services to shutdown
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will shutdown all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is 
already started.
+     */
+    public void stop(int svc) throws ChannelException;    
+    
+    /**
+     * Send a message to one or more members in the cluster
+     * @param destination Member[] - the destinations, null or zero length 
means all
+     * @param msg ClusterMessage - the message to send
+     * @param options int - sender options, see class documentation
+     * @return ClusterMessage[] - the replies from the members, if any. 
+     */
+    public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int 
options);
+
+    
+    public void setClusterSender(ClusterSender sender);
+    public void setClusterReceiver(ClusterReceiver receiver);
+    public void setMembershipService(MembershipService service);
+    
 }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java
 Mon Feb 13 13:00:05 2006
@@ -18,7 +18,9 @@
 import java.io.Serializable;
 
 /**
+ * @author Filip Hanik
  * @author Peter Rossbach
+ * 
  */
 public interface ClusterMessage extends Serializable {
     

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java
 Mon Feb 13 13:00:05 2006
@@ -28,7 +28,10 @@
 
 
 public interface MembershipService {
-
+    
+    public static final int MBR_RX = 1;
+    public static final int MBR_TX = 2;
+    
     /**
      * Sets the properties for the membership service. This must be called 
before
      * the <code>start()</code> method is called.

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
 Mon Feb 13 13:00:05 2006
@@ -16,7 +16,13 @@
 package org.apache.catalina.cluster.group;
 
 
+import org.apache.catalina.cluster.ChannelException;
 import org.apache.catalina.cluster.ClusterChannel;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.ClusterSender;
+import org.apache.catalina.cluster.MembershipService;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.Member;
 
 /**
  * Channel interface
@@ -26,7 +32,91 @@
  * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
  */
 public class GroupChannel implements ClusterChannel {
+    private ClusterReceiver clusterReceiver;
+    private ClusterSender clusterSender;
+    private MembershipService membershipService;
+
     public GroupChannel() {
+    }
+    
+    /**
+     * Send a message to one or more members in the cluster
+     * @param destination Member[] - the destinations, null or zero length 
means all
+     * @param msg ClusterMessage - the message to send
+     * @param options int - sender options, see class documentation
+     * @return ClusterMessage[] - the replies from the members, if any.
+     */
+    public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int 
options) {
+        throw new UnsupportedOperationException("Method send not yet 
implemented.");
+    }
+    
+    /**
+     * Starts up the channel. This can be called multiple times for individual 
services to start
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will start all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is 
already started.
+     */
+    public void start(int svc) throws ChannelException {
+        try {
+            if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ) 
membershipService.start(membershipService.MBR_RX);
+            if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.start();
+            if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.start();
+            if ( (svc & MBR_TX_SEQ) == MBR_TX_SEQ) 
membershipService.start(membershipService.MBR_TX);
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
+    }
+
+    /**
+     * Shuts down the channel. This can be called multiple times for 
individual services to shutdown
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will shutdown all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is 
already started.
+     */
+    public void stop(int svc) throws ChannelException {
+        try {
+            if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ) membershipService.stop();
+            if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.stop();
+            if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.stop();
+            if ( (svc & MBR_TX_SEQ) == MBR_RX_SEQ) membershipService.stop();
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
+
+    }
+
+    public ClusterReceiver getClusterReceiver() {
+        return clusterReceiver;
+    }
+
+    public ClusterSender getClusterSender() {
+        return clusterSender;
+    }
+
+    public MembershipService getMembershipService() {
+        return membershipService;
+    }
+
+    public void setClusterReceiver(ClusterReceiver clusterReceiver) {
+        this.clusterReceiver = clusterReceiver;
+    }
+
+    public void setClusterSender(ClusterSender clusterSender) {
+        this.clusterSender = clusterSender;
+    }
+
+    public void setMembershipService(MembershipService membershipService) {
+        this.membershipService = membershipService;
     }
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
 Mon Feb 13 13:00:05 2006
@@ -38,8 +38,6 @@
 
     private SocketChannel channel;
 
-    private Selector selector;
-
     private ListenCallback callback;
 
     private XByteBuffer buffer;
@@ -52,7 +50,6 @@
      */
     public ObjectReader(SocketChannel channel, Selector selector, 
ListenCallback callback) {
         this.channel = channel;
-        this.selector = selector;
         this.callback = callback;
         this.buffer = new XByteBuffer();
     }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
 Mon Feb 13 13:00:05 2006
@@ -18,6 +18,14 @@
 
 import org.apache.catalina.cluster.ClusterMessage;
 import org.apache.catalina.cluster.tcp.ClusterData;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+import org.apache.catalina.cluster.session.ReplicationStream;
 
 /**
  * The XByteBuffer provides a dual functionality.
@@ -337,10 +345,10 @@
      * @return - a full package (header,compress,size,data,footer)
      * 
      */
-    public static byte[] createDataPackage(byte[] indata, int compressed)
+    public static byte[] createDataPackage(ClusterData cdata)
             throws java.io.IOException {
-        byte[] data = indata;
-        byte[] comprdata = XByteBuffer.toBytes(compressed);
+        byte[] data = cdata.getMessage();
+        byte[] comprdata = XByteBuffer.toBytes(cdata.getCompress());
         int length = 
             START_DATA.length + //header length
             4 + //compression flag
@@ -355,4 +363,63 @@
         System.arraycopy(END_DATA, 0, result, START_DATA.length + 8 + 
data.length, END_DATA.length);
         return result;
     }
+    
+    public static ClusterMessage deserialize(ClusterData data, boolean 
compress) 
+        throws IOException, ClassNotFoundException, ClassCastException {
+        Object message = null;
+        if (data != null) {
+            InputStream instream;
+            if (compress ) {
+                instream = new GZIPInputStream(new 
ByteArrayInputStream(data.getMessage()));
+            } else {
+                instream = new ByteArrayInputStream(data.getMessage());
+            }
+            ReplicationStream stream = new 
ReplicationStream(instream,XByteBuffer.class.getClassLoader());
+            message = stream.readObject();
+            instream.close();
+        }
+        if ( message == null ) {
+            return null;
+        } else if (message instanceof ClusterMessage)
+            return (ClusterMessage) message;
+        else {
+            throw new ClassCastException("Message has the wrong class. It 
should implement ClusterMessage, instead it is:"+message.getClass().getName());
+        }
+    }
+
+    /**
+     * Serializes a message into cluster data
+     * @param msg ClusterMessage
+     * @param compress boolean
+     * @return ClusterData
+     * @throws IOException
+     */
+    public static ClusterData serialize(ClusterMessage msg, boolean compress) 
throws IOException {
+        msg.setTimestamp(System.currentTimeMillis());
+        ByteArrayOutputStream outs = new ByteArrayOutputStream();
+        ObjectOutputStream out;
+        GZIPOutputStream gout = null;
+        ClusterData data = new ClusterData();
+        data.setType(msg.getClass().getName());
+        data.setUniqueId(msg.getUniqueId());
+        data.setTimestamp(msg.getTimestamp());
+        data.setCompress(msg.getCompress());
+        data.setResend(msg.getResend());
+        if (compress) {
+            gout = new GZIPOutputStream(outs);
+            out = new ObjectOutputStream(gout);
+        } else {
+            out = new ObjectOutputStream(outs);
+        }
+        out.writeObject(msg);
+        // flush out the gzip stream to byte buffer
+        if(gout != null) {
+            gout.flush();
+            gout.close();
+        }
+        data.setMessage(outs.toByteArray());
+        return data;
+    }
+
+    
 }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java
 Mon Feb 13 13:00:05 2006
@@ -73,11 +73,11 @@
             try
             {
                 if ( tryRepFirst ) return findReplicationClass(name);
-                else return findWebappClass(name);
+                else return findExternalClass(name);
             }
             catch ( Exception x )
             {
-                if ( tryRepFirst ) return findWebappClass(name);
+                if ( tryRepFirst ) return findExternalClass(name);
                 else return findReplicationClass(name);
             }
         } catch (ClassNotFoundException e) {
@@ -90,7 +90,7 @@
         return Class.forName(name, false, getClass().getClassLoader());
     }
 
-    public Class findWebappClass(String name)
+    public Class findExternalClass(String name)
         throws ClassNotFoundException, IOException {
         return Class.forName(name, false, classLoader);
     }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
 Mon Feb 13 13:00:05 2006
@@ -19,11 +19,15 @@
 
 
 /**
+ * The cluster data class is used to transport around the byte array from
+ * a ClusterMessage object. This is just a utility class to avoid having to 
+ * serialize and deserialize the ClusterMessage more than once. 
  * @author Peter Rossbach
+ * @author Filip Hanik
  * @version $Revision$ $Date$
  * @since 5.5.10
  */
-public class ClusterData {
+public class ClusterData  {
 
     private int resend = ClusterMessage.FLAG_DEFAULT ;
     private int compress = ClusterMessage.FLAG_DEFAULT ;

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
 Mon Feb 13 13:00:05 2006
@@ -34,6 +34,7 @@
 import org.apache.catalina.cluster.session.ReplicationStream;
 import org.apache.catalina.core.StandardHost;
 import org.apache.catalina.util.StringManager;
+import org.apache.catalina.cluster.io.XByteBuffer;
 
 /**
 * FIXME i18n log messages
@@ -438,32 +439,16 @@
     //protected ClusterMessage deserialize(byte[] data)
     protected ClusterMessage deserialize(ClusterData data)
             throws IOException, ClassNotFoundException {
-        Object message = null;
+        boolean compress = isCompress() || data.getCompress() == 
ClusterMessage.FLAG_ALLOWED;
+        ClusterMessage message = null;
         if (data != null) {
-            InputStream instream;
-            if (isCompress() || data.getCompress() == 
ClusterMessage.FLAG_ALLOWED ) {
-                instream = new GZIPInputStream(new 
ByteArrayInputStream(data.getMessage()));
-            } else {
-                instream = new ByteArrayInputStream(data.getMessage());
-            }
-            ReplicationStream stream = new ReplicationStream(instream,
-                    getClass().getClassLoader());
-            message = stream.readObject();
+            message = XByteBuffer.deserialize(data, compress);
             // calc stats really received bytes
             totalReceivedBytes += data.getMessage().length;
             //totalReceivedBytes += data.length;
             nrOfMsgsReceived++;
-            instream.close();
-        }
-        if (message instanceof ClusterMessage)
-            return (ClusterMessage) message;
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Message " + message.toString() + " from type "
-                        + message.getClass().getName()
-                        + " transfered but is not a cluster message");
-            return null;
         }
+        return message;
     }
     
     // --------------------------------------------- Performance Stats

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
 Mon Feb 13 13:00:05 2006
@@ -865,9 +865,8 @@
             isMessageTransferStarted = true ;
         }
         try {
-            byte[] message = data.getMessage();
             OutputStream out = socket.getOutputStream();
-            
out.write(XByteBuffer.createDataPackage(message,data.getCompress()));
+            out.write(XByteBuffer.createDataPackage(data));
             out.flush();
             if (isWaitForAck())
                 waitForAck(ackTimeout);

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java?rev=377484&r1=377483&r2=377484&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
 Mon Feb 13 13:00:05 2006
@@ -35,13 +35,14 @@
 import org.apache.catalina.core.StandardHost;
 import org.apache.catalina.util.StringManager;
 import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.cluster.io.XByteBuffer;
 
 /**
- * Transmit message to ohter cluster members create sender from replicationMode
+ * Transmit message to other cluster members
+ * Actual senders are created based on the replicationMode
  * type 
  * FIXME i18n log messages
  * FIXME compress data depends on message type and size 
- * FIXME send very big messages at some block see FarmWarDeployer!
  * TODO pause and resume senders
  * 
  * @author Peter Rossbach
@@ -806,32 +807,9 @@
      * @since 5.5.10
      */
     protected ClusterData serialize(ClusterMessage msg) throws IOException {
-        msg.setTimestamp(System.currentTimeMillis());
-        ByteArrayOutputStream outs = new ByteArrayOutputStream();
-        ObjectOutputStream out;
-        GZIPOutputStream gout = null;
-        ClusterData data = new ClusterData();
-        data.setType(msg.getClass().getName());
-        data.setUniqueId(msg.getUniqueId());
-        data.setTimestamp(msg.getTimestamp());
-        data.setCompress(msg.getCompress());
-        data.setResend(msg.getResend());
-        // FIXME add stats: How much comress and uncompress messages and bytes 
are transfered
-        if ((isCompress() && msg.getCompress() != 
ClusterMessage.FLAG_FORBIDDEN)
-                || msg.getCompress() == ClusterMessage.FLAG_ALLOWED) {
-            gout = new GZIPOutputStream(outs);
-            out = new ObjectOutputStream(gout);
-        } else {
-            out = new ObjectOutputStream(outs);
-        }
-        out.writeObject(msg);
-        // flush out the gzip stream to byte buffer
-        if(gout != null) {
-            gout.flush();
-            gout.close();
-        }
-        data.setMessage(outs.toByteArray());
-        return data;
+        boolean compress = ((isCompress() && msg.getCompress() != 
ClusterMessage.FLAG_FORBIDDEN)
+                             || msg.getCompress() == 
ClusterMessage.FLAG_ALLOWED);
+        return XByteBuffer.serialize(msg,compress);
     }
  
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to