Author: fhanik
Date: Wed Feb  8 09:23:08 2006
New Revision: 376003

URL: http://svn.apache.org/viewcvs?rev=376003&view=rev
Log:
Undeprected the XByteBuffer.createDataPackage, I want to avoid spreading code 
that belongs together across multiple classes.
That way, when changes are needed for the protocol, it can be implemented in 
this place, and not affect everything else.
Fixed some error messages
In 5.5 the Sender.isSuspect has become cluttered, error messages are reported 
and ignoring this flag.
I have started to clean up the code to use the suspect flag again, to avoid 
over flowing the logs.




Modified:
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java?rev=376003&r1=376002&r2=376003&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
 Wed Feb  8 09:23:08 2006
@@ -195,9 +195,7 @@
     public ClusterData extractPackage(boolean clearFromBuffer)
             throws java.io.IOException {
         int psize = countPackages();
-        if (psize == 0)
-            throw new java.lang.IllegalStateException(
-                    "No package exists in XByteBuffer");
+        if (psize == 0) throw new java.lang.IllegalStateException("No package 
exists in XByteBuffer");
         int compress = toInt(buf, START_DATA.length);
         int size = toInt(buf, START_DATA.length +4);
         byte[] data = new byte[size];
@@ -335,22 +333,26 @@
     /**
      * Creates a complete data package
      * @param indata - the message data to be contained within the package
+     * @param compressed - compression flag for the indata buffer
      * @return - a full package (header,compress,size,data,footer)
-     * @deprecated since 5.5.10
+     * 
      */
-    public static byte[] createDataPackage(byte[] indata)
+    public static byte[] createDataPackage(byte[] indata, int compressed)
             throws java.io.IOException {
-        byte[] data;
-        data = indata;
-        byte[] result = new byte[START_DATA.length + 8 + data.length
-                + END_DATA.length];
+        byte[] data = indata;
+        byte[] comprdata = XByteBuffer.toBytes(compressed);
+        int length = 
+            START_DATA.length + //header length
+            4 + //compression flag
+            4 + //data length indicator
+            data.length + //actual data length
+            END_DATA.length; //footer length
+        byte[] result = new byte[length];
         System.arraycopy(START_DATA, 0, result, 0, START_DATA.length);
-        System.arraycopy(toBytes(ClusterMessage.FLAG_FORBIDDEN), 0, result, 
START_DATA.length, 4);
+        System.arraycopy(comprdata, 0, result, START_DATA.length, 4);
         System.arraycopy(toBytes(data.length), 0, result, START_DATA.length + 
4, 4);
         System.arraycopy(data, 0, result, START_DATA.length + 8, data.length);
-        System.arraycopy(END_DATA, 0, result, START_DATA.length + 8
-                + data.length, END_DATA.length);
-
+        System.arraycopy(END_DATA, 0, result, START_DATA.length + 8 + 
data.length, END_DATA.length);
         return result;
     }
 }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java?rev=376003&r1=376002&r2=376003&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
 Wed Feb  8 09:23:08 2006
@@ -87,7 +87,7 @@
     /**
      * sender is in suspect state (last transfer failed)
      */
-    private boolean suspect;
+    private boolean suspect = false;
 
     /**
      * wait time for ack
@@ -213,7 +213,6 @@
      * After failure make a resend
      */
     private boolean resend = false ;
-
     
     // ------------------------------------------------------------- 
Constructor
     
@@ -794,7 +793,7 @@
             else if(keepAliveTimeout > -1)
                 this.keepAliveConnectTime = System.currentTimeMillis();
         }
-        Exception exception = null;
+        IOException exception = null;
         try {
              writeData(data);
              messageTransfered = true ;
@@ -838,9 +837,7 @@
                 }
             } else {
                 dataFailureCounter++;
-                if (log.isWarnEnabled())
-                    log.warn(sm.getString("IDataSender.send.lost",  
address.getHostAddress(),
-                            new Integer(port), data.getType(), 
data.getUniqueId()),exception);
+                throw exception;
             }
         }
     }
@@ -857,12 +854,9 @@
             isMessageTransferStarted = true ;
         }
         try {
+            byte[] message = data.getMessage();
             OutputStream out = socket.getOutputStream();
-            out.write(XByteBuffer.START_DATA);
-            out.write(XByteBuffer.toBytes(data.getCompress()));
-            out.write(XByteBuffer.toBytes(data.getMessage().length));
-            out.write(data.getMessage());
-            out.write(XByteBuffer.END_DATA);
+            
out.write(XByteBuffer.createDataPackage(message,data.getCompress()));
             out.flush();
             if (isWaitForAck())
                 waitForAck(ackTimeout);
@@ -909,9 +903,12 @@
             }
         } catch (IOException x) {
             missingAckCounter++;
-            log.warn(sm.getString("IDataSender.ack.missing", getAddress(),
-                    new Integer(socket.getLocalPort()), new Long(
-                            this.ackTimeout)),x);
+            if ( !this.isSuspect() ) {
+                log.warn(sm.getString("IDataSender.ack.missing", getAddress(),
+                                      new Integer(socket.getLocalPort()), 
+                                      new Long(this.ackTimeout)), x);
+                this.setSuspect(true);
+            }
             throw x;
         } finally {
             if(doWaitAckStats) {

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?rev=376003&r1=376002&r2=376003&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
 Wed Feb  8 09:23:08 2006
@@ -8,7 +8,7 @@
 FastAsyncSocketSender.max.exception=[{0}:{1,number,integer}] new priority {2} 
> MAX_PRIORITY
 IDataSender.ack.eof=EOF reached at local port [{0}:{1,number,integer}]
 IDataSender.ack.receive=Got ACK at local port [{0}:{1,number,integer}]
-IDataSender.ack.missing=Wasn't able to read acknowledgement from 
[{0}:{1,number,integer}] in {2,number,integer} ms. Disconnecting socket, and 
trying again.
+IDataSender.ack.missing=Unable to read acknowledgement from 
[{0}:{1,number,integer}] in {2,number,integer} ms. Disconnecting socket, and 
trying again.
 IDataSender.ack.read=Read wait ack char '{2}' [{0}:{1,number,integer}]
 IDataSender.ack.start=Waiting for ACK message [{0}:{1,number,integer}]
 IDataSender.ack.wrong=Missing correct ACK after 10 bytes read at local port 
[{0}:{1,number,integer}]

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java?rev=376003&r1=376002&r2=376003&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
 Wed Feb  8 09:23:08 2006
@@ -45,7 +45,7 @@
     private int maxPoolSocketLimit = 25;
 
     private SenderQueue senderQueue = null;
-
+    
     //  ----------------------------------------------------- Constructor
 
    /**

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java?rev=376003&r1=376002&r2=376003&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
 Wed Feb  8 09:23:08 2006
@@ -458,9 +458,9 @@
                 IDataSender sender = senders[i];
                 if(domain.equals(sender.getDomain())) {
                     try {
-                        sendMessageData(data, sender);
+                        boolean success = sendMessageData(data, sender);
                     } catch (Exception x) {
-                        // FIXME remember exception and send it at finally
+                        //THIS WILL NEVER HAPPEN, as sendMessageData swallows 
the error
                     }
                 }
             }
@@ -846,13 +846,13 @@
      * 
      * @param data message Data
      * @param sender concrete message sender
+     * @return true if the message got sent, false otherwise
      * @throws java.io.IOException If an error occurs
      */
-    protected void sendMessageData(ClusterData data,
-            IDataSender sender) throws java.io.IOException {
+    protected boolean sendMessageData(ClusterData data,
+                                   IDataSender sender) throws 
java.io.IOException {
         if (sender == null)
-            throw new java.io.IOException(
-                    "Sender not available. Make sure sender information is 
available to the ReplicationTransmitter.");
+            throw new java.io.IOException("Sender not available. Make sure 
sender information is available to the ReplicationTransmitter.");
         try {
             // deprecated not needed DataSender#pushMessage can handle 
connection
             if (autoConnect) {
@@ -864,14 +864,16 @@
             sender.sendMessage(data);
             sender.setSuspect(false);
             addStats(data.getMessage().length);
+            return true;
         } catch (Exception x) {
-            if (log.isWarnEnabled()) {
+            if (log.isErrorEnabled()) {
                 if (!sender.getSuspect()) {
-                    log.warn("Unable to send replicated message, is server 
down?",x);
+                    log.error("Unable to send replicated message, is member 
["+sender.toString()+"] down?",x);
                 }
             }
             sender.setSuspect(true);
             failureCounter++;
+            return false;
         }
 
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to