Author: fhanik Date: Mon Feb 13 13:00:05 2006 New Revision: 377484 URL: http://svn.apache.org/viewcvs?rev=377484&view=rev Log: Started working on the cluster group, before I can fully do that, I need to clean up the dependencies between session replication logic and cluster core code.
Added: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Added: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java?rev=377484&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java (added) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ChannelException.java Mon Feb 13 13:00:05 2006 @@ -0,0 +1,42 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.cluster; + +/** + * Channel Exception + * @author Filip Hanik + * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ + */ + +public class ChannelException + extends Exception { + public ChannelException() { + super(); + } + + public ChannelException(String message) { + super(message); + } + + public ChannelException(String message, Throwable cause) { + super(message, cause); + } + + public ChannelException(Throwable cause) { + super(cause); + } + +} Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterChannel.java Mon Feb 13 13:00:05 2006 @@ -24,4 +24,53 @@ */ public interface ClusterChannel { + /** + * Start and stop sequences can be controlled by these constants + */ + public static final int DEFAULT = 15; + public static final int MBR_RX_SEQ = 1; + public static final int SND_TX_SEQ = 2; + public static final int SND_RX_SEQ = 4; + public static final int MBR_TX_SEQ = 8; + + /** + * Starts up the channel. This can be called multiple times for individual services to start + * The svc parameter can be the logical or value of any constants + * @param svc int value of <BR> + * DEFAULT - will start all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * @throws ChannelException if a startup error occurs or the service is already started. + */ + public void start(int svc) throws ChannelException; + + /** + * Shuts down the channel. This can be called multiple times for individual services to shutdown + * The svc parameter can be the logical or value of any constants + * @param svc int value of <BR> + * DEFAULT - will shutdown all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * @throws ChannelException if a startup error occurs or the service is already started. + */ + public void stop(int svc) throws ChannelException; + + /** + * Send a message to one or more members in the cluster + * @param destination Member[] - the destinations, null or zero length means all + * @param msg ClusterMessage - the message to send + * @param options int - sender options, see class documentation + * @return ClusterMessage[] - the replies from the members, if any. + */ + public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options); + + + public void setClusterSender(ClusterSender sender); + public void setClusterReceiver(ClusterReceiver receiver); + public void setMembershipService(MembershipService service); + } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterMessage.java Mon Feb 13 13:00:05 2006 @@ -18,7 +18,9 @@ import java.io.Serializable; /** + * @author Filip Hanik * @author Peter Rossbach + * */ public interface ClusterMessage extends Serializable { Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/MembershipService.java Mon Feb 13 13:00:05 2006 @@ -28,7 +28,10 @@ public interface MembershipService { - + + public static final int MBR_RX = 1; + public static final int MBR_TX = 2; + /** * Sets the properties for the membership service. This must be called before * the <code>start()</code> method is called. Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java Mon Feb 13 13:00:05 2006 @@ -16,7 +16,13 @@ package org.apache.catalina.cluster.group; +import org.apache.catalina.cluster.ChannelException; import org.apache.catalina.cluster.ClusterChannel; +import org.apache.catalina.cluster.ClusterReceiver; +import org.apache.catalina.cluster.ClusterSender; +import org.apache.catalina.cluster.MembershipService; +import org.apache.catalina.cluster.ClusterMessage; +import org.apache.catalina.cluster.Member; /** * Channel interface @@ -26,7 +32,91 @@ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ public class GroupChannel implements ClusterChannel { + private ClusterReceiver clusterReceiver; + private ClusterSender clusterSender; + private MembershipService membershipService; + public GroupChannel() { + } + + /** + * Send a message to one or more members in the cluster + * @param destination Member[] - the destinations, null or zero length means all + * @param msg ClusterMessage - the message to send + * @param options int - sender options, see class documentation + * @return ClusterMessage[] - the replies from the members, if any. + */ + public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options) { + throw new UnsupportedOperationException("Method send not yet implemented."); + } + + /** + * Starts up the channel. This can be called multiple times for individual services to start + * The svc parameter can be the logical or value of any constants + * @param svc int value of <BR> + * DEFAULT - will start all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * @throws ChannelException if a startup error occurs or the service is already started. + */ + public void start(int svc) throws ChannelException { + try { + if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ) membershipService.start(membershipService.MBR_RX); + if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.start(); + if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.start(); + if ( (svc & MBR_TX_SEQ) == MBR_TX_SEQ) membershipService.start(membershipService.MBR_TX); + }catch ( Exception x ) { + throw new ChannelException(x); + } + } + + /** + * Shuts down the channel. This can be called multiple times for individual services to shutdown + * The svc parameter can be the logical or value of any constants + * @param svc int value of <BR> + * DEFAULT - will shutdown all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * @throws ChannelException if a startup error occurs or the service is already started. + */ + public void stop(int svc) throws ChannelException { + try { + if ( (svc & MBR_RX_SEQ) == MBR_RX_SEQ) membershipService.stop(); + if ( (svc & SND_RX_SEQ) == SND_RX_SEQ) clusterReceiver.stop(); + if ( (svc & SND_TX_SEQ) == SND_TX_SEQ) clusterSender.stop(); + if ( (svc & MBR_TX_SEQ) == MBR_RX_SEQ) membershipService.stop(); + }catch ( Exception x ) { + throw new ChannelException(x); + } + + } + + public ClusterReceiver getClusterReceiver() { + return clusterReceiver; + } + + public ClusterSender getClusterSender() { + return clusterSender; + } + + public MembershipService getMembershipService() { + return membershipService; + } + + public void setClusterReceiver(ClusterReceiver clusterReceiver) { + this.clusterReceiver = clusterReceiver; + } + + public void setClusterSender(ClusterSender clusterSender) { + this.clusterSender = clusterSender; + } + + public void setMembershipService(MembershipService membershipService) { + this.membershipService = membershipService; } } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java Mon Feb 13 13:00:05 2006 @@ -38,8 +38,6 @@ private SocketChannel channel; - private Selector selector; - private ListenCallback callback; private XByteBuffer buffer; @@ -52,7 +50,6 @@ */ public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback) { this.channel = channel; - this.selector = selector; this.callback = callback; this.buffer = new XByteBuffer(); } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java Mon Feb 13 13:00:05 2006 @@ -18,6 +18,14 @@ import org.apache.catalina.cluster.ClusterMessage; import org.apache.catalina.cluster.tcp.ClusterData; +import java.io.ObjectOutputStream; +import java.util.zip.GZIPOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; +import org.apache.catalina.cluster.session.ReplicationStream; /** * The XByteBuffer provides a dual functionality. @@ -337,10 +345,10 @@ * @return - a full package (header,compress,size,data,footer) * */ - public static byte[] createDataPackage(byte[] indata, int compressed) + public static byte[] createDataPackage(ClusterData cdata) throws java.io.IOException { - byte[] data = indata; - byte[] comprdata = XByteBuffer.toBytes(compressed); + byte[] data = cdata.getMessage(); + byte[] comprdata = XByteBuffer.toBytes(cdata.getCompress()); int length = START_DATA.length + //header length 4 + //compression flag @@ -355,4 +363,63 @@ System.arraycopy(END_DATA, 0, result, START_DATA.length + 8 + data.length, END_DATA.length); return result; } + + public static ClusterMessage deserialize(ClusterData data, boolean compress) + throws IOException, ClassNotFoundException, ClassCastException { + Object message = null; + if (data != null) { + InputStream instream; + if (compress ) { + instream = new GZIPInputStream(new ByteArrayInputStream(data.getMessage())); + } else { + instream = new ByteArrayInputStream(data.getMessage()); + } + ReplicationStream stream = new ReplicationStream(instream,XByteBuffer.class.getClassLoader()); + message = stream.readObject(); + instream.close(); + } + if ( message == null ) { + return null; + } else if (message instanceof ClusterMessage) + return (ClusterMessage) message; + else { + throw new ClassCastException("Message has the wrong class. It should implement ClusterMessage, instead it is:"+message.getClass().getName()); + } + } + + /** + * Serializes a message into cluster data + * @param msg ClusterMessage + * @param compress boolean + * @return ClusterData + * @throws IOException + */ + public static ClusterData serialize(ClusterMessage msg, boolean compress) throws IOException { + msg.setTimestamp(System.currentTimeMillis()); + ByteArrayOutputStream outs = new ByteArrayOutputStream(); + ObjectOutputStream out; + GZIPOutputStream gout = null; + ClusterData data = new ClusterData(); + data.setType(msg.getClass().getName()); + data.setUniqueId(msg.getUniqueId()); + data.setTimestamp(msg.getTimestamp()); + data.setCompress(msg.getCompress()); + data.setResend(msg.getResend()); + if (compress) { + gout = new GZIPOutputStream(outs); + out = new ObjectOutputStream(gout); + } else { + out = new ObjectOutputStream(outs); + } + out.writeObject(msg); + // flush out the gzip stream to byte buffer + if(gout != null) { + gout.flush(); + gout.close(); + } + data.setMessage(outs.toByteArray()); + return data; + } + + } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicationStream.java Mon Feb 13 13:00:05 2006 @@ -73,11 +73,11 @@ try { if ( tryRepFirst ) return findReplicationClass(name); - else return findWebappClass(name); + else return findExternalClass(name); } catch ( Exception x ) { - if ( tryRepFirst ) return findWebappClass(name); + if ( tryRepFirst ) return findExternalClass(name); else return findReplicationClass(name); } } catch (ClassNotFoundException e) { @@ -90,7 +90,7 @@ return Class.forName(name, false, getClass().getClassLoader()); } - public Class findWebappClass(String name) + public Class findExternalClass(String name) throws ClassNotFoundException, IOException { return Class.forName(name, false, classLoader); } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterData.java Mon Feb 13 13:00:05 2006 @@ -19,11 +19,15 @@ /** + * The cluster data class is used to transport around the byte array from + * a ClusterMessage object. This is just a utility class to avoid having to + * serialize and deserialize the ClusterMessage more than once. * @author Peter Rossbach + * @author Filip Hanik * @version $Revision$ $Date$ * @since 5.5.10 */ -public class ClusterData { +public class ClusterData { private int resend = ClusterMessage.FLAG_DEFAULT ; private int compress = ClusterMessage.FLAG_DEFAULT ; Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java Mon Feb 13 13:00:05 2006 @@ -34,6 +34,7 @@ import org.apache.catalina.cluster.session.ReplicationStream; import org.apache.catalina.core.StandardHost; import org.apache.catalina.util.StringManager; +import org.apache.catalina.cluster.io.XByteBuffer; /** * FIXME i18n log messages @@ -438,32 +439,16 @@ //protected ClusterMessage deserialize(byte[] data) protected ClusterMessage deserialize(ClusterData data) throws IOException, ClassNotFoundException { - Object message = null; + boolean compress = isCompress() || data.getCompress() == ClusterMessage.FLAG_ALLOWED; + ClusterMessage message = null; if (data != null) { - InputStream instream; - if (isCompress() || data.getCompress() == ClusterMessage.FLAG_ALLOWED ) { - instream = new GZIPInputStream(new ByteArrayInputStream(data.getMessage())); - } else { - instream = new ByteArrayInputStream(data.getMessage()); - } - ReplicationStream stream = new ReplicationStream(instream, - getClass().getClassLoader()); - message = stream.readObject(); + message = XByteBuffer.deserialize(data, compress); // calc stats really received bytes totalReceivedBytes += data.getMessage().length; //totalReceivedBytes += data.length; nrOfMsgsReceived++; - instream.close(); - } - if (message instanceof ClusterMessage) - return (ClusterMessage) message; - else { - if (log.isDebugEnabled()) - log.debug("Message " + message.toString() + " from type " - + message.getClass().getName() - + " transfered but is not a cluster message"); - return null; } + return message; } // --------------------------------------------- Performance Stats Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java Mon Feb 13 13:00:05 2006 @@ -865,9 +865,8 @@ isMessageTransferStarted = true ; } try { - byte[] message = data.getMessage(); OutputStream out = socket.getOutputStream(); - out.write(XByteBuffer.createDataPackage(message,data.getCompress())); + out.write(XByteBuffer.createDataPackage(data)); out.flush(); if (isWaitForAck()) waitForAck(ackTimeout); Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java?rev=377484&r1=377483&r2=377484&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Mon Feb 13 13:00:05 2006 @@ -35,13 +35,14 @@ import org.apache.catalina.core.StandardHost; import org.apache.catalina.util.StringManager; import org.apache.tomcat.util.IntrospectionUtils; +import org.apache.catalina.cluster.io.XByteBuffer; /** - * Transmit message to ohter cluster members create sender from replicationMode + * Transmit message to other cluster members + * Actual senders are created based on the replicationMode * type * FIXME i18n log messages * FIXME compress data depends on message type and size - * FIXME send very big messages at some block see FarmWarDeployer! * TODO pause and resume senders * * @author Peter Rossbach @@ -806,32 +807,9 @@ * @since 5.5.10 */ protected ClusterData serialize(ClusterMessage msg) throws IOException { - msg.setTimestamp(System.currentTimeMillis()); - ByteArrayOutputStream outs = new ByteArrayOutputStream(); - ObjectOutputStream out; - GZIPOutputStream gout = null; - ClusterData data = new ClusterData(); - data.setType(msg.getClass().getName()); - data.setUniqueId(msg.getUniqueId()); - data.setTimestamp(msg.getTimestamp()); - data.setCompress(msg.getCompress()); - data.setResend(msg.getResend()); - // FIXME add stats: How much comress and uncompress messages and bytes are transfered - if ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN) - || msg.getCompress() == ClusterMessage.FLAG_ALLOWED) { - gout = new GZIPOutputStream(outs); - out = new ObjectOutputStream(gout); - } else { - out = new ObjectOutputStream(outs); - } - out.writeObject(msg); - // flush out the gzip stream to byte buffer - if(gout != null) { - gout.flush(); - gout.close(); - } - data.setMessage(outs.toByteArray()); - return data; + boolean compress = ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN) + || msg.getCompress() == ClusterMessage.FLAG_ALLOWED); + return XByteBuffer.serialize(msg,compress); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]