Author: fhanik
Date: Tue Feb 28 11:14:54 2006
New Revision: 381747

URL: http://svn.apache.org/viewcvs?rev=381747&view=rev
Log:
Documented the OrderInterceptor that guarantees ordering for messages sent and 
received.

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=381747&r1=381746&r2=381747&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
 Tue Feb 28 11:14:54 2006
@@ -55,7 +55,7 @@
            .append("\n\t\t[-mdrop multicastdroptime]")
            .append("\n\t\t[-gzip]")
            .append("\n\t\t[-order]")
-           ;
+           .append("\n\t\t[-ordersize maxorderqueuesize]");
        return buf;
 
     }
@@ -77,6 +77,7 @@
         long mcastfreq = 500;
         long mcastdrop = 2000;
         boolean order = false;
+        int ordersize = Integer.MAX_VALUE;
         
         for (int i = 0; i < args.length; i++) {
             if ("-bind".equals(args[i])) {
@@ -93,6 +94,9 @@
                 gzip = true;
             } else if ("-order".equals(args[i])) {
                 order = true;
+            } else if ("-ordersize".equals(args[i])) {
+                ordersize = Integer.parseInt(args[++i]);
+                System.out.println("Setting 
OrderInterceptor.maxQueue="+ordersize);
             } else if ("-ack".equals(args[i])) {
                 ack = Boolean.parseBoolean(args[++i]);
             } else if ("-ackto".equals(args[i])) {
@@ -142,7 +146,11 @@
         channel.setMembershipService(service);
 
         if (gzip) channel.addInterceptor(new GzipInterceptor());
-        if (order) channel.addInterceptor(new OrderInterceptor());
+        if (order) {
+            OrderInterceptor oi = new OrderInterceptor();
+            oi.setMaxQueue(ordersize);
+            channel.addInterceptor(oi);
+        }
         return channel;
         
     }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381747&r1=381746&r2=381747&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 Tue Feb 28 11:14:54 2006
@@ -28,6 +28,24 @@
 
 /**
  *
+ * The order interceptor guarantees that messages are received in the same 
order they were 
+ * sent.
+ * This interceptor works best with the ack=true setting. <br>
+ * There is no point in 
+ * using this with the replicationMode="fastasynchqueue" as this mode 
guarantees ordering.<BR>
+ * If you are using the mode ack=false replicationMode=pooled, and have a lot 
of concurrent threads,
+ * this interceptor can really slow you down, as many messages will be 
completely out of order
+ * and the queue might become rather large. If this is the case, then you 
might want to set 
+ * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep 
more than 25 messages in our queue)
+ * <br><b>Configuration Options</b><br>
+ * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, 
how long before we act on it <b>default=3000ms</b><br>
+ * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to 
ensure ordering. 
+ *   This setting is useful to avoid 
OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
+ * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor 
what to 
+ * do when a message has expired or the queue has grown larger than the 
maxQueue value.
+ * true means that the message is sent up the stack to the receiver that will 
receive and out of order message
+ * false means, forget the message and reset the message counter. 
<b>default=true</b>
+ * 
  * 
  * @author Filip Hanik
  * @version 1.0
@@ -38,6 +56,7 @@
     private HashMap incoming = new HashMap();
     private long expire = 3000;
     private boolean forwardExpired = true;
+    private int maxQueue = Integer.MAX_VALUE;
 
     public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
         for ( int i=0; i<destination.length; i++ ) {
@@ -53,7 +72,6 @@
         msg.getMessage().trim(4);
         MessageOrder order = new MessageOrder(msgnr,msg);
         if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
-        //getPrevious().messageReceived(msg);
     }
     
     public synchronized void processLeftOvers(Member member, boolean force) {
@@ -79,13 +97,11 @@
             order = MessageOrder.add(tmp,order);
         }
         
-//        if ( order.getMsgNr() != cnt.getCounter() ) {
-//            System.out.println("Found out of order message.");
-//        }
         
-        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) {
+        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter())  ) {
             //we are right on target. process orders
             if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
+            else if ( order.getMsgNr() > cnt.getCounter() ) 
cnt.setCounter(order.getMsgNr());
             super.messageReceived(order.getMessage());
             order.setMessage(null);
             order = order.next;
@@ -93,11 +109,11 @@
         MessageOrder head = order;
         MessageOrder prev = null;
         tmp = order;
+        //flag to empty out the queue when it larger than maxQueue
+        boolean empty = order!=null?order.getCount()>=maxQueue:false;
         while ( tmp != null ) {
-            //process expired messages
-            //TODO, when a message expires, what do we do?
-            //just send one?
-            if ( tmp.isExpired(expire) ) {
+            //process expired messages or empty out the queue
+            if ( tmp.isExpired(expire) || empty ) {
                 //reset the head
                 if ( tmp == head ) head = tmp.next;
                 cnt.setCounter(tmp.getMsgNr()+1);
@@ -201,7 +217,7 @@
             return next;
         }
         
-        public int count() {
+        public int getCount() {
             int counter = 1;
             MessageOrder tmp = next;
             while ( tmp != null ) {
@@ -259,12 +275,20 @@
         this.forwardExpired = forwardExpired;
     }
 
+    public void setMaxQueue(int maxQueue) {
+        this.maxQueue = maxQueue;
+    }
+
     public long getExpire() {
         return expire;
     }
 
     public boolean getForwardExpired() {
         return forwardExpired;
+    }
+
+    public int getMaxQueue() {
+        return maxQueue;
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to