Author: fhanik Date: Tue Feb 28 10:38:31 2006 New Revision: 381732 URL: http://svn.apache.org/viewcvs?rev=381732&view=rev Log: finished the order protocol. still to do is to fix the expiration, and add in a reset of the counter, when such happens
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381732&r1=381731&r2=381732&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Tue Feb 28 10:38:31 2006 @@ -55,8 +55,8 @@ processIncoming(order); //getPrevious().messageReceived(msg); } - public synchronized void processIncoming(MessageOrder order) { +int val = order.getMsgNr(); Member member = order.getMessage().getAddress(); Counter cnt = getInCounter(member); @@ -64,9 +64,14 @@ if ( tmp != null ) { order = MessageOrder.add(tmp,order); } - while ( (order!=null) && (order.getMsgNr() == cnt.getCounter()) ) { + +// if ( order.getMsgNr() != cnt.getCounter() ) { +// System.out.println("Found out of order message."); +// } + + while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) { //we are right on target. process orders - cnt.inc(); + if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc(); super.messageReceived(order.getMessage()); order.setMessage(null); order = order.next; @@ -76,20 +81,22 @@ tmp = order; while ( tmp != null ) { //process expired messages + //TODO, when a message expires, what do we do? + //just send one? if ( tmp.isExpired(expire) ) { - System.out.println("Found expired message"); //reset the head if ( tmp == head ) head = tmp.next; if ( getForwardExpired() ) super.messageReceived(tmp.getMessage()); tmp.setMessage(null); tmp = tmp.next; - if ( prev != null ) prev.next = tmp; + if ( prev != null ) prev.next = tmp; } else { prev = tmp; + tmp = tmp.next; } } if ( head == null ) incoming.remove(member); - else incoming.put(member,head); + else incoming.put(member, head); } public void memberAdded(Member member) { @@ -137,6 +144,10 @@ return value; } + public synchronized void setCounter(int counter) { + this.value = counter; + } + public synchronized int inc() { return ++value; } @@ -171,16 +182,23 @@ return next; } + public int count() { + int counter = 1; + MessageOrder tmp = next; + while ( tmp != null ) { + counter++; + tmp = tmp.next; + } + return counter; + } + public static MessageOrder add(MessageOrder head, MessageOrder add) { if ( head == null ) return add; if ( add == null ) return head; if ( head == add ) return add; if ( head.getMsgNr() > add.getMsgNr() ) { - //add before - MessageOrder tmp = add.next; add.next = head; - head.next = tmp; return add; } @@ -202,6 +220,7 @@ } else { throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor"); } + return head; } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]