Author: fhanik
Date: Mon Mar 20 16:03:29 2006
New Revision: 387352

URL: http://svn.apache.org/viewcvs?rev=387352&view=rev
Log:
Implemented asynchronous messaging with a queue size limit in bytes, to avoid 
OOM errors

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
Removed:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/IQueue.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
 Mon Mar 20 16:03:29 2006
@@ -120,10 +120,10 @@
 
     /**
      * Return the member that represents this node.
-     * 
+     * @param incAlive - optimization, true if you want it to calculate alive 
time
      * @return Member
      */
-    public Member getLocalMember(boolean incAlive) ;
+    public Member getLocalMember(boolean incAlive);
     
     /**
      * 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
 Mon Mar 20 16:03:29 2006
@@ -25,6 +25,6 @@
  */
 public interface ErrorHandler {
     
-    public void handleError(ChannelException x, Member[] destination, 
Serializable msg);
+    public void handleError(Exception x, Member[] destination, Serializable 
msg);
     
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
 Mon Mar 20 16:03:29 2006
@@ -15,16 +15,10 @@
  */
 package org.apache.catalina.tribes.group;
 
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.MessageListener;
-import java.io.IOException;
-import org.apache.catalina.tribes.ChannelInterceptor;
-
-import org.apache.catalina.tribes.io.ClusterData;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.tcp.*;
 
 /**
  * Abstract class for the interceptor base class.

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=387352&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
 Mon Mar 20 16:03:29 2006
@@ -0,0 +1,141 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.tcp.bio.util.FastQueue;
+import org.apache.catalina.tribes.tcp.bio.util.LinkObject;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ * The message dispatcher is a way to enable asynchronous communication
+ * through a channel. The dispatcher will look for the 
<code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code>
+ * flag to be set, if it is, it will queue the message for delivery and 
immediately return to the sender.
+ * 
+ * 
+ * 
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class MessageDispatchInterceptor extends ChannelInterceptorBase 
implements Runnable {
+    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class);
+
+    private long maxQueueSize;
+    private FastQueue queue = new FastQueue();
+    private boolean run = false;
+    private Thread msgDispatchThread = null;
+    private AtomicLong currentSize = new AtomicLong(0);
+
+
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
+        boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) 
== Channel.SEND_OPTIONS_ASYNCHRONOUS;
+        if ( async && run ) {
+            if ( (currentSize.get()+msg.getMessage().getLength()) > 
maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached 
its limit of "+maxQueueSize+" bytes, current:"+currentSize+" bytes.");
+            //add to queue
+            queue.add(msg, destination, payload);
+            currentSize.addAndGet(msg.getMessage().getLength());
+        } else {
+            super.sendMessage(destination, msg, payload);
+        }
+    }
+    
+    
+
+    public void messageReceived(ChannelMessage msg) {
+        super.messageReceived(msg);
+    }
+
+    public void memberAdded(Member member) {
+        //todo, nothing
+        super.memberAdded(member);
+    }
+
+    public void memberDisappeared(Member member) {
+        super.memberDisappeared(member);
+        //clean up existing queue items
+    }
+
+    public void setMaxQueueSize(long maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+    }
+
+    public long getMaxQueueSize() {
+        return maxQueueSize;
+    }
+    
+    public void start(int svc) throws ChannelException {
+        //start the thread
+        if (!run ) {
+            synchronized (this) {
+                if ( !run ) {
+                    msgDispatchThread = new Thread(this);
+                    msgDispatchThread.setName("MessageDispatchThread");
+                    msgDispatchThread.setDaemon(true);
+                    msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+                    queue.setEnabled(true);
+                    run = true;
+                    msgDispatchThread.start();
+                }//end if
+            }//sync
+        }//end if
+        super.start(svc);
+    }
+
+    
+    public void stop(int svc) throws ChannelException {
+        //stop the thread
+        if ( run ) {
+            synchronized (this) {
+                if ( run ) {
+                    run = false;
+                    queue.setEnabled(false);
+                    msgDispatchThread.interrupt();
+                    currentSize = new AtomicLong(0);
+                }//end if
+            }//sync
+        }//end if
+
+        super.stop(svc);
+    }
+    
+    public void run() {
+        while ( run ) {
+            LinkObject link = queue.remove();
+            if ( link == null ) continue; //should not happen unless we exceed 
wait time
+            while ( link != null && run ) {
+                ChannelMessage msg = link.data();
+                Member[] destination = link.getDestination();
+                try {
+                    super.sendMessage(destination,msg,null);
+                } catch ( Exception x ) {
+                    if ( log.isDebugEnabled() ) log.debug("Error while 
processing async message.",x);
+                    if ( link.getHandler() != null ) 
link.getHandler().handleError(x,destination,msg);
+                } finally {
+                    currentSize.addAndGet(-msg.getMessage().getLength());
+                    link = link.next();
+                }//try
+            }//while
+        }//while
+    }//run
+
+
+}

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 Mon Mar 20 16:03:29 2006
@@ -19,11 +19,10 @@
 
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.tcp.*;
 
 
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
 Mon Mar 20 16:03:29 2006
@@ -16,6 +16,11 @@
 
 package org.apache.catalina.tribes.tcp.bio.util;
 
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
 
 
 /**
@@ -27,7 +32,7 @@
  * @author Peter Rossbach
  * @version $Revision: 345567 $ $Date: 2005-11-18 15:07:23 -0600 (Fri, 18 Nov 
2005) $
  */
-public class FastQueue implements IQueue {
+public class FastQueue {
 
     private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(FastQueue.class);
 
@@ -234,12 +239,16 @@
         return size;
     }
 
+    public SingleRemoveSynchronizedAddLock getLock() {
+        return lock;
+    }
+
     /**
      * Add new data to the queue
      * @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String, 
java.lang.Object)
      * FIXME extract some method
      */
-    public boolean add(String key, Object data) {
+    public boolean add(ChannelMessage msg, Member[] destination, 
InterceptorPayload payload) {
         boolean ok = true;
         long time = 0;
 
@@ -272,7 +281,7 @@
                     log.trace("FastQueue.add: Could not add, since queue is 
full (" + size + ">=" + maxQueueLength + ")");
                 }
             } else {
-                LinkObject element = new LinkObject(key, data);
+                LinkObject element = new LinkObject(msg,destination, payload);
                 if (size == 0) {
                     first = last = element;
                     size = 1;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
 Mon Mar 20 16:03:29 2006
@@ -16,6 +16,11 @@
 
 package org.apache.catalina.tribes.tcp.bio.util;
 
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
 /**
  * The class <b>LinkObject</b> implements an element
  * for a linked list, consisting of a general
@@ -23,16 +28,19 @@
  *
  * @author Rainer Jung
  * @author Peter Rossbach
+ * @author Filip Hanik
  * @version $Revision: 304032 $ $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
 
  */
 
 public class LinkObject {
 
-    private Object payload;
+    private ChannelMessage msg;
     private LinkObject next;
-    private String key ;
-    
+    private byte[] key ;
+    private Member[] destination;
+    private InterceptorPayload payload;
+
     /**
      * Construct a new element from the data object.
      * Sets the pointer to null.
@@ -40,10 +48,12 @@
      * @param key The key
      * @param payload The data object.
      */
-    public LinkObject(String key,Object payload) {
-        this.payload = payload;
+    public LinkObject(ChannelMessage msg, Member[] destination, 
InterceptorPayload payload) {
+        this.msg = msg;
         this.next = null;
-        this.key = key ;
+        this.key = msg.getUniqueId();
+        this.payload = payload;
+        this.destination = destination;
     }
 
     /**
@@ -66,16 +76,28 @@
      * Get the data object from the element.
      * @return The data object from the element.
      */
-    public Object data() {
-        return payload;
+    public ChannelMessage data() {
+        return msg;
     }
 
     /**
      * Get the unique message id
      * @return the unique message id
      */
-    public Object getKey() {
+    public byte[] getKey() {
         return key;
+    }
+
+    public ErrorHandler getHandler() {
+        return payload!=null?payload.getErrorHandler():null;
+    }
+
+    public InterceptorPayload getPayload() {
+        return payload;
+    }
+
+    public Member[] getDestination() {
+        return destination;
     }
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java
 Mon Mar 20 16:03:29 2006
@@ -182,6 +182,7 @@
                 try {
                     wait(addWaitTimeout);
                 } catch ( InterruptedException e ) {
+                    Thread.currentThread().interrupted();
                 }
             } while ( addLocked || removeLocked );
         }
@@ -203,6 +204,7 @@
                 try {
                     wait(removeWaitTimeout);
                 } catch ( InterruptedException e ) {
+                    Thread.currentThread().interrupted();
                 }
             } while ( ( addLocked || ! dataAvailable ) && removeEnabled );
             remover=null;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to