Author: fhanik Date: Tue Feb 28 11:14:54 2006 New Revision: 381747 URL: http://svn.apache.org/viewcvs?rev=381747&view=rev Log: Documented the OrderInterceptor that guarantees ordering for messages sent and received.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=381747&r1=381746&r2=381747&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java Tue Feb 28 11:14:54 2006 @@ -55,7 +55,7 @@ .append("\n\t\t[-mdrop multicastdroptime]") .append("\n\t\t[-gzip]") .append("\n\t\t[-order]") - ; + .append("\n\t\t[-ordersize maxorderqueuesize]"); return buf; } @@ -77,6 +77,7 @@ long mcastfreq = 500; long mcastdrop = 2000; boolean order = false; + int ordersize = Integer.MAX_VALUE; for (int i = 0; i < args.length; i++) { if ("-bind".equals(args[i])) { @@ -93,6 +94,9 @@ gzip = true; } else if ("-order".equals(args[i])) { order = true; + } else if ("-ordersize".equals(args[i])) { + ordersize = Integer.parseInt(args[++i]); + System.out.println("Setting OrderInterceptor.maxQueue="+ordersize); } else if ("-ack".equals(args[i])) { ack = Boolean.parseBoolean(args[++i]); } else if ("-ackto".equals(args[i])) { @@ -142,7 +146,11 @@ channel.setMembershipService(service); if (gzip) channel.addInterceptor(new GzipInterceptor()); - if (order) channel.addInterceptor(new OrderInterceptor()); + if (order) { + OrderInterceptor oi = new OrderInterceptor(); + oi.setMaxQueue(ordersize); + channel.addInterceptor(oi); + } return channel; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381747&r1=381746&r2=381747&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Tue Feb 28 11:14:54 2006 @@ -28,6 +28,24 @@ /** * + * The order interceptor guarantees that messages are received in the same order they were + * sent. + * This interceptor works best with the ack=true setting. <br> + * There is no point in + * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR> + * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads, + * this interceptor can really slow you down, as many messages will be completely out of order + * and the queue might become rather large. If this is the case, then you might want to set + * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue) + * <br><b>Configuration Options</b><br> + * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br> + * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering. + * This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br> + * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to + * do when a message has expired or the queue has grown larger than the maxQueue value. + * true means that the message is sent up the stack to the receiver that will receive and out of order message + * false means, forget the message and reset the message counter. <b>default=true</b> + * * * @author Filip Hanik * @version 1.0 @@ -38,6 +56,7 @@ private HashMap incoming = new HashMap(); private long expire = 3000; private boolean forwardExpired = true; + private int maxQueue = Integer.MAX_VALUE; public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { for ( int i=0; i<destination.length; i++ ) { @@ -53,7 +72,6 @@ msg.getMessage().trim(4); MessageOrder order = new MessageOrder(msgnr,msg); if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false); - //getPrevious().messageReceived(msg); } public synchronized void processLeftOvers(Member member, boolean force) { @@ -79,13 +97,11 @@ order = MessageOrder.add(tmp,order); } -// if ( order.getMsgNr() != cnt.getCounter() ) { -// System.out.println("Found out of order message."); -// } - while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) { + while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) { //we are right on target. process orders if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc(); + else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr()); super.messageReceived(order.getMessage()); order.setMessage(null); order = order.next; @@ -93,11 +109,11 @@ MessageOrder head = order; MessageOrder prev = null; tmp = order; + //flag to empty out the queue when it larger than maxQueue + boolean empty = order!=null?order.getCount()>=maxQueue:false; while ( tmp != null ) { - //process expired messages - //TODO, when a message expires, what do we do? - //just send one? - if ( tmp.isExpired(expire) ) { + //process expired messages or empty out the queue + if ( tmp.isExpired(expire) || empty ) { //reset the head if ( tmp == head ) head = tmp.next; cnt.setCounter(tmp.getMsgNr()+1); @@ -201,7 +217,7 @@ return next; } - public int count() { + public int getCount() { int counter = 1; MessageOrder tmp = next; while ( tmp != null ) { @@ -259,12 +275,20 @@ this.forwardExpired = forwardExpired; } + public void setMaxQueue(int maxQueue) { + this.maxQueue = maxQueue; + } + public long getExpire() { return expire; } public boolean getForwardExpired() { return forwardExpired; + } + + public int getMaxQueue() { + return maxQueue; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]