Author: fhanik Date: Mon Mar 20 16:03:29 2006 New Revision: 387352 URL: http://svn.apache.org/viewcvs?rev=387352&view=rev Log: Implemented asynchronous messaging with a queue size limit in bytes, to avoid OOM errors
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Removed: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/IQueue.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=387352&r1=387351&r2=387352&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java Mon Mar 20 16:03:29 2006 @@ -120,10 +120,10 @@ /** * Return the member that represents this node. - * + * @param incAlive - optimization, true if you want it to calculate alive time * @return Member */ - public Member getLocalMember(boolean incAlive) ; + public Member getLocalMember(boolean incAlive); /** * Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java?rev=387352&r1=387351&r2=387352&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java Mon Mar 20 16:03:29 2006 @@ -25,6 +25,6 @@ */ public interface ErrorHandler { - public void handleError(ChannelException x, Member[] destination, Serializable msg); + public void handleError(Exception x, Member[] destination, Serializable msg); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=387352&r1=387351&r2=387352&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java Mon Mar 20 16:03:29 2006 @@ -15,16 +15,10 @@ */ package org.apache.catalina.tribes.group; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelInterceptor; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.MembershipListener; -import org.apache.catalina.tribes.MessageListener; -import java.io.IOException; -import org.apache.catalina.tribes.ChannelInterceptor; - -import org.apache.catalina.tribes.io.ClusterData; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.tcp.*; /** * Abstract class for the interceptor base class. Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=387352&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Mon Mar 20 16:03:29 2006 @@ -0,0 +1,141 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +package org.apache.catalina.tribes.group.interceptors; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.group.InterceptorPayload; +import org.apache.catalina.tribes.tcp.bio.util.FastQueue; +import org.apache.catalina.tribes.tcp.bio.util.LinkObject; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + * The message dispatcher is a way to enable asynchronous communication + * through a channel. The dispatcher will look for the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> + * flag to be set, if it is, it will queue the message for delivery and immediately return to the sender. + * + * + * + * @author Filip Hanik + * @version 1.0 + */ +public class MessageDispatchInterceptor extends ChannelInterceptorBase implements Runnable { + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class); + + private long maxQueueSize; + private FastQueue queue = new FastQueue(); + private boolean run = false; + private Thread msgDispatchThread = null; + private AtomicLong currentSize = new AtomicLong(0); + + + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS; + if ( async && run ) { + if ( (currentSize.get()+msg.getMessage().getLength()) > maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached its limit of "+maxQueueSize+" bytes, current:"+currentSize+" bytes."); + //add to queue + queue.add(msg, destination, payload); + currentSize.addAndGet(msg.getMessage().getLength()); + } else { + super.sendMessage(destination, msg, payload); + } + } + + + + public void messageReceived(ChannelMessage msg) { + super.messageReceived(msg); + } + + public void memberAdded(Member member) { + //todo, nothing + super.memberAdded(member); + } + + public void memberDisappeared(Member member) { + super.memberDisappeared(member); + //clean up existing queue items + } + + public void setMaxQueueSize(long maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public long getMaxQueueSize() { + return maxQueueSize; + } + + public void start(int svc) throws ChannelException { + //start the thread + if (!run ) { + synchronized (this) { + if ( !run ) { + msgDispatchThread = new Thread(this); + msgDispatchThread.setName("MessageDispatchThread"); + msgDispatchThread.setDaemon(true); + msgDispatchThread.setPriority(Thread.MAX_PRIORITY); + queue.setEnabled(true); + run = true; + msgDispatchThread.start(); + }//end if + }//sync + }//end if + super.start(svc); + } + + + public void stop(int svc) throws ChannelException { + //stop the thread + if ( run ) { + synchronized (this) { + if ( run ) { + run = false; + queue.setEnabled(false); + msgDispatchThread.interrupt(); + currentSize = new AtomicLong(0); + }//end if + }//sync + }//end if + + super.stop(svc); + } + + public void run() { + while ( run ) { + LinkObject link = queue.remove(); + if ( link == null ) continue; //should not happen unless we exceed wait time + while ( link != null && run ) { + ChannelMessage msg = link.data(); + Member[] destination = link.getDestination(); + try { + super.sendMessage(destination,msg,null); + } catch ( Exception x ) { + if ( log.isDebugEnabled() ) log.debug("Error while processing async message.",x); + if ( link.getHandler() != null ) link.getHandler().handleError(x,destination,msg); + } finally { + currentSize.addAndGet(-msg.getMessage().getLength()); + link = link.next(); + }//try + }//while + }//while + }//run + + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=387352&r1=387351&r2=387352&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Mon Mar 20 16:03:29 2006 @@ -19,11 +19,10 @@ import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelMessage; -import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.tcp.*; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java?rev=387352&r1=387351&r2=387352&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java Mon Mar 20 16:03:29 2006 @@ -16,6 +16,11 @@ package org.apache.catalina.tribes.tcp.bio.util; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.ErrorHandler; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.InterceptorPayload; + /** @@ -27,7 +32,7 @@ * @author Peter Rossbach * @version $Revision: 345567 $ $Date: 2005-11-18 15:07:23 -0600 (Fri, 18 Nov 2005) $ */ -public class FastQueue implements IQueue { +public class FastQueue { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(FastQueue.class); @@ -234,12 +239,16 @@ return size; } + public SingleRemoveSynchronizedAddLock getLock() { + return lock; + } + /** * Add new data to the queue * @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String, java.lang.Object) * FIXME extract some method */ - public boolean add(String key, Object data) { + public boolean add(ChannelMessage msg, Member[] destination, InterceptorPayload payload) { boolean ok = true; long time = 0; @@ -272,7 +281,7 @@ log.trace("FastQueue.add: Could not add, since queue is full (" + size + ">=" + maxQueueLength + ")"); } } else { - LinkObject element = new LinkObject(key, data); + LinkObject element = new LinkObject(msg,destination, payload); if (size == 0) { first = last = element; size = 1; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java?rev=387352&r1=387351&r2=387352&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java Mon Mar 20 16:03:29 2006 @@ -16,6 +16,11 @@ package org.apache.catalina.tribes.tcp.bio.util; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.ErrorHandler; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.InterceptorPayload; + /** * The class <b>LinkObject</b> implements an element * for a linked list, consisting of a general @@ -23,16 +28,19 @@ * * @author Rainer Jung * @author Peter Rossbach + * @author Filip Hanik * @version $Revision: 304032 $ $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ public class LinkObject { - private Object payload; + private ChannelMessage msg; private LinkObject next; - private String key ; - + private byte[] key ; + private Member[] destination; + private InterceptorPayload payload; + /** * Construct a new element from the data object. * Sets the pointer to null. @@ -40,10 +48,12 @@ * @param key The key * @param payload The data object. */ - public LinkObject(String key,Object payload) { - this.payload = payload; + public LinkObject(ChannelMessage msg, Member[] destination, InterceptorPayload payload) { + this.msg = msg; this.next = null; - this.key = key ; + this.key = msg.getUniqueId(); + this.payload = payload; + this.destination = destination; } /** @@ -66,16 +76,28 @@ * Get the data object from the element. * @return The data object from the element. */ - public Object data() { - return payload; + public ChannelMessage data() { + return msg; } /** * Get the unique message id * @return the unique message id */ - public Object getKey() { + public byte[] getKey() { return key; + } + + public ErrorHandler getHandler() { + return payload!=null?payload.getErrorHandler():null; + } + + public InterceptorPayload getPayload() { + return payload; + } + + public Member[] getDestination() { + return destination; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java?rev=387352&r1=387351&r2=387352&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java Mon Mar 20 16:03:29 2006 @@ -182,6 +182,7 @@ try { wait(addWaitTimeout); } catch ( InterruptedException e ) { + Thread.currentThread().interrupted(); } } while ( addLocked || removeLocked ); } @@ -203,6 +204,7 @@ try { wait(removeWaitTimeout); } catch ( InterruptedException e ) { + Thread.currentThread().interrupted(); } } while ( ( addLocked || ! dataAvailable ) && removeEnabled ); remover=null; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]