Author: fhanik
Date: Mon Mar 20 16:03:29 2006
New Revision: 387352
URL: http://svn.apache.org/viewcvs?rev=387352&view=rev
Log:
Implemented asynchronous messaging with a queue size limit in bytes, to avoid
OOM errors
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
Removed:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/IQueue.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
Mon Mar 20 16:03:29 2006
@@ -120,10 +120,10 @@
/**
* Return the member that represents this node.
- *
+ * @param incAlive - optimization, true if you want it to calculate alive
time
* @return Member
*/
- public Member getLocalMember(boolean incAlive) ;
+ public Member getLocalMember(boolean incAlive);
/**
*
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ErrorHandler.java
Mon Mar 20 16:03:29 2006
@@ -25,6 +25,6 @@
*/
public interface ErrorHandler {
- public void handleError(ChannelException x, Member[] destination,
Serializable msg);
+ public void handleError(Exception x, Member[] destination, Serializable
msg);
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
Mon Mar 20 16:03:29 2006
@@ -15,16 +15,10 @@
*/
package org.apache.catalina.tribes.group;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.MessageListener;
-import java.io.IOException;
-import org.apache.catalina.tribes.ChannelInterceptor;
-
-import org.apache.catalina.tribes.io.ClusterData;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.tcp.*;
/**
* Abstract class for the interceptor base class.
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=387352&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
Mon Mar 20 16:03:29 2006
@@ -0,0 +1,141 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.tcp.bio.util.FastQueue;
+import org.apache.catalina.tribes.tcp.bio.util.LinkObject;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ * The message dispatcher is a way to enable asynchronous communication
+ * through a channel. The dispatcher will look for the
<code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code>
+ * flag to be set, if it is, it will queue the message for delivery and
immediately return to the sender.
+ *
+ *
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class MessageDispatchInterceptor extends ChannelInterceptorBase
implements Runnable {
+ private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class);
+
+ private long maxQueueSize;
+ private FastQueue queue = new FastQueue();
+ private boolean run = false;
+ private Thread msgDispatchThread = null;
+ private AtomicLong currentSize = new AtomicLong(0);
+
+
+ public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws ChannelException {
+ boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS)
== Channel.SEND_OPTIONS_ASYNCHRONOUS;
+ if ( async && run ) {
+ if ( (currentSize.get()+msg.getMessage().getLength()) >
maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached
its limit of "+maxQueueSize+" bytes, current:"+currentSize+" bytes.");
+ //add to queue
+ queue.add(msg, destination, payload);
+ currentSize.addAndGet(msg.getMessage().getLength());
+ } else {
+ super.sendMessage(destination, msg, payload);
+ }
+ }
+
+
+
+ public void messageReceived(ChannelMessage msg) {
+ super.messageReceived(msg);
+ }
+
+ public void memberAdded(Member member) {
+ //todo, nothing
+ super.memberAdded(member);
+ }
+
+ public void memberDisappeared(Member member) {
+ super.memberDisappeared(member);
+ //clean up existing queue items
+ }
+
+ public void setMaxQueueSize(long maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ public long getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void start(int svc) throws ChannelException {
+ //start the thread
+ if (!run ) {
+ synchronized (this) {
+ if ( !run ) {
+ msgDispatchThread = new Thread(this);
+ msgDispatchThread.setName("MessageDispatchThread");
+ msgDispatchThread.setDaemon(true);
+ msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+ queue.setEnabled(true);
+ run = true;
+ msgDispatchThread.start();
+ }//end if
+ }//sync
+ }//end if
+ super.start(svc);
+ }
+
+
+ public void stop(int svc) throws ChannelException {
+ //stop the thread
+ if ( run ) {
+ synchronized (this) {
+ if ( run ) {
+ run = false;
+ queue.setEnabled(false);
+ msgDispatchThread.interrupt();
+ currentSize = new AtomicLong(0);
+ }//end if
+ }//sync
+ }//end if
+
+ super.stop(svc);
+ }
+
+ public void run() {
+ while ( run ) {
+ LinkObject link = queue.remove();
+ if ( link == null ) continue; //should not happen unless we exceed
wait time
+ while ( link != null && run ) {
+ ChannelMessage msg = link.data();
+ Member[] destination = link.getDestination();
+ try {
+ super.sendMessage(destination,msg,null);
+ } catch ( Exception x ) {
+ if ( log.isDebugEnabled() ) log.debug("Error while
processing async message.",x);
+ if ( link.getHandler() != null )
link.getHandler().handleError(x,destination,msg);
+ } finally {
+ currentSize.addAndGet(-msg.getMessage().getLength());
+ link = link.next();
+ }//try
+ }//while
+ }//while
+ }//run
+
+
+}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
Mon Mar 20 16:03:29 2006
@@ -19,11 +19,10 @@
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.tcp.*;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
Mon Mar 20 16:03:29 2006
@@ -16,6 +16,11 @@
package org.apache.catalina.tribes.tcp.bio.util;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
/**
@@ -27,7 +32,7 @@
* @author Peter Rossbach
* @version $Revision: 345567 $ $Date: 2005-11-18 15:07:23 -0600 (Fri, 18 Nov
2005) $
*/
-public class FastQueue implements IQueue {
+public class FastQueue {
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(FastQueue.class);
@@ -234,12 +239,16 @@
return size;
}
+ public SingleRemoveSynchronizedAddLock getLock() {
+ return lock;
+ }
+
/**
* Add new data to the queue
* @see org.apache.catalina.tribes.util.IQueue#add(java.lang.String,
java.lang.Object)
* FIXME extract some method
*/
- public boolean add(String key, Object data) {
+ public boolean add(ChannelMessage msg, Member[] destination,
InterceptorPayload payload) {
boolean ok = true;
long time = 0;
@@ -272,7 +281,7 @@
log.trace("FastQueue.add: Could not add, since queue is
full (" + size + ">=" + maxQueueLength + ")");
}
} else {
- LinkObject element = new LinkObject(key, data);
+ LinkObject element = new LinkObject(msg,destination, payload);
if (size == 0) {
first = last = element;
size = 1;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/LinkObject.java
Mon Mar 20 16:03:29 2006
@@ -16,6 +16,11 @@
package org.apache.catalina.tribes.tcp.bio.util;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.ErrorHandler;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+
/**
* The class <b>LinkObject</b> implements an element
* for a linked list, consisting of a general
@@ -23,16 +28,19 @@
*
* @author Rainer Jung
* @author Peter Rossbach
+ * @author Filip Hanik
* @version $Revision: 304032 $ $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul
2005) $
*/
public class LinkObject {
- private Object payload;
+ private ChannelMessage msg;
private LinkObject next;
- private String key ;
-
+ private byte[] key ;
+ private Member[] destination;
+ private InterceptorPayload payload;
+
/**
* Construct a new element from the data object.
* Sets the pointer to null.
@@ -40,10 +48,12 @@
* @param key The key
* @param payload The data object.
*/
- public LinkObject(String key,Object payload) {
- this.payload = payload;
+ public LinkObject(ChannelMessage msg, Member[] destination,
InterceptorPayload payload) {
+ this.msg = msg;
this.next = null;
- this.key = key ;
+ this.key = msg.getUniqueId();
+ this.payload = payload;
+ this.destination = destination;
}
/**
@@ -66,16 +76,28 @@
* Get the data object from the element.
* @return The data object from the element.
*/
- public Object data() {
- return payload;
+ public ChannelMessage data() {
+ return msg;
}
/**
* Get the unique message id
* @return the unique message id
*/
- public Object getKey() {
+ public byte[] getKey() {
return key;
+ }
+
+ public ErrorHandler getHandler() {
+ return payload!=null?payload.getErrorHandler():null;
+ }
+
+ public InterceptorPayload getPayload() {
+ return payload;
+ }
+
+ public Member[] getDestination() {
+ return destination;
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java?rev=387352&r1=387351&r2=387352&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/SingleRemoveSynchronizedAddLock.java
Mon Mar 20 16:03:29 2006
@@ -182,6 +182,7 @@
try {
wait(addWaitTimeout);
} catch ( InterruptedException e ) {
+ Thread.currentThread().interrupted();
}
} while ( addLocked || removeLocked );
}
@@ -203,6 +204,7 @@
try {
wait(removeWaitTimeout);
} catch ( InterruptedException e ) {
+ Thread.currentThread().interrupted();
}
} while ( ( addLocked || ! dataAvailable ) && removeEnabled );
remover=null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]