Author: fhanik
Date: Thu May 17 06:19:59 2007
New Revision: 538908

URL: http://svn.apache.org/viewvc?view=rev&rev=538908
Log:
Added in a unit test for ordering messages

Added:
    
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
Modified:
    
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?view=diff&rev=538908&r1=538907&r2=538908
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 (original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 Thu May 17 06:19:59 2007
@@ -24,6 +24,7 @@
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 
@@ -59,7 +60,7 @@
     private boolean forwardExpired = true;
     private int maxQueue = Integer.MAX_VALUE;
 
-    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
+    public synchronized void sendMessage(Member[] destination, ChannelMessage 
msg, InterceptorPayload payload) throws ChannelException {
         if ( !okToProcess(msg.getOptions()) ) {
             super.sendMessage(destination, msg, payload);
             return;
@@ -76,7 +77,7 @@
         }
     }
 
-    public void messageReceived(ChannelMessage msg) {
+    public synchronized void messageReceived(ChannelMessage msg) {
         if ( !okToProcess(msg.getOptions()) ) {
             super.messageReceived(msg);
             return;
@@ -87,7 +88,7 @@
         if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
     }
     
-    public synchronized void processLeftOvers(Member member, boolean force) {
+    public void processLeftOvers(Member member, boolean force) {
         MessageOrder tmp = (MessageOrder)incoming.get(member);
         if ( force ) {
             Counter cnt = getInCounter(member);
@@ -100,7 +101,7 @@
      * @param order MessageOrder
      * @return boolean - true if a message expired and was processed
      */
-    public synchronized boolean processIncoming(MessageOrder order) {
+    public boolean processIncoming(MessageOrder order) {
         boolean result = false;
         Member member = order.getMessage().getAddress();
         Counter cnt = getInCounter(member);
@@ -130,7 +131,8 @@
                 //reset the head
                 if ( tmp == head ) head = tmp.next;
                 cnt.setCounter(tmp.getMsgNr()+1);
-                if ( getForwardExpired() ) 
super.messageReceived(tmp.getMessage());
+                if ( getForwardExpired() ) 
+                    super.messageReceived(tmp.getMessage());
                 tmp.setMessage(null);
                 tmp = tmp.next;
                 if ( prev != null ) prev.next = tmp;  
@@ -145,14 +147,14 @@
         return result;
     }
     
-    public void memberAdded(Member member) {
+    public synchronized void memberAdded(Member member) {
         //notify upwards
         getInCounter(member);
         getOutCounter(member);
         super.memberAdded(member);
     }
 
-    public void memberDisappeared(Member member) {
+    public synchronized void memberDisappeared(Member member) {
         //notify upwards
         outcounter.remove(member);
         incounter.remove(member);
@@ -166,7 +168,7 @@
         return cnt.inc();
     }
     
-    public synchronized Counter getInCounter(Member mbr) {
+    public Counter getInCounter(Member mbr) {
         Counter cnt = (Counter)incounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -176,7 +178,7 @@
         return cnt;
     }
 
-    public synchronized Counter getOutCounter(Member mbr) {
+    public Counter getOutCounter(Member mbr) {
         Counter cnt = (Counter)outcounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -186,18 +188,18 @@
     }
 
     public static class Counter {
-        private int value = 0;
+        private AtomicInteger value = new AtomicInteger(0);
         
         public int getCounter() {
-            return value;
+            return value.get();
         }
         
-        public synchronized void setCounter(int counter) {
-            this.value = counter;
+        public void setCounter(int counter) {
+            this.value.set(counter);
         }
         
-        public synchronized int inc() {
-            return ++value;
+        public int inc() {
+            return value.addAndGet(1);
         }
     }
     

Added: 
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java?view=auto&rev=538908
==============================================================================
--- 
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
 (added)
+++ 
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
 Thu May 17 06:19:59 2007
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.test.interceptors;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import junit.framework.TestCase;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+import org.apache.catalina.tribes.ChannelListener;
+import java.io.Serializable;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.ChannelException;
+
+public class TestOrderInterceptor extends TestCase {
+
+    GroupChannel[] channels = null;
+    OrderInterceptor[] orderitcs = null;
+    MangleOrderInterceptor[] mangleitcs = null;
+    TestListener[] test = null;
+    int channelCount = 2;
+    Thread[] threads = null;
+    protected void setUp() throws Exception {
+        System.out.println("Setup");
+        super.setUp();
+        channels = new GroupChannel[channelCount];
+        orderitcs = new OrderInterceptor[channelCount];
+        mangleitcs = new MangleOrderInterceptor[channelCount];
+        test = new TestListener[channelCount];
+        threads = new Thread[channelCount];
+        for ( int i=0; i<channelCount; i++ ) {
+            channels[i] = new GroupChannel();
+            orderitcs[i] = new OrderInterceptor();
+            mangleitcs[i] = new MangleOrderInterceptor();
+            orderitcs[i].setExpire(Long.MAX_VALUE);
+            channels[i].addInterceptor(orderitcs[i]);
+            channels[i].addInterceptor(mangleitcs[i]);
+            test[i] = new TestListener(i);
+            channels[i].addChannelListener(test[i]);
+            final int j = i;
+            threads[i] = new Thread() {
+                public void run() {
+                    try {
+                        channels[j].start(Channel.DEFAULT);
+                        Thread.sleep(50);
+                    } catch (Exception x) {
+                        x.printStackTrace();
+                    }
+                }
+            };
+        }
+        for ( int i=0; i<channelCount; i++ ) threads[i].start();
+        for ( int i=0; i<channelCount; i++ ) threads[i].join();
+        Thread.sleep(1000);
+    }
+    
+    public void testOrder1() throws Exception {
+        Member[] dest = channels[0].getMembers();
+        for ( int i=0; i<100; i++ ) {
+            channels[0].send(dest,new Integer(i),0);
+        }
+        Thread.sleep(5000);
+        for ( int i=0; i<test.length; i++ ) {
+            super.assertEquals(false,test[i].fail);
+        }
+    }
+    
+    protected void tearDown() throws Exception {
+        System.out.println("tearDown");
+        super.tearDown();
+        for ( int i=0; i<channelCount; i++ ) {
+            channels[i].stop(Channel.DEFAULT);
+        }
+    }
+    
+    public static void main(String[] args) throws Exception {
+        TestSuite suite = new TestSuite();
+        suite.addTestSuite(TestOrderInterceptor.class);
+        suite.run(new TestResult());
+    }
+    
+    public static class TestListener implements ChannelListener {
+        int id = -1;
+        public TestListener(int id) {
+            this.id = id;
+        }
+        int cnt = 0;
+        int total = 0;
+        boolean fail = false;
+        public synchronized void messageReceived(Serializable msg, Member 
sender) {
+            total++;
+            Integer i = (Integer)msg;
+            if ( i.intValue() != cnt ) fail = true;
+            else cnt++;
+            System.out.println("Listener["+id+"] Message received:"+i+" 
Count:"+total);
+
+        }
+
+        public boolean accept(Serializable msg, Member sender) {
+            return (msg instanceof Integer);
+        }
+    }
+    
+    public static class MangleOrderInterceptor extends ChannelInterceptorBase {
+        int cnt = 1;
+        ChannelMessage hold = null;
+        Member[] dest = null;
+        public synchronized void sendMessage(Member[] destination, 
ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+            if ( hold == null ) {
+                //System.out.println("Skipping message:"+msg);
+                hold = (ChannelMessage)msg.deepclone();
+                dest = new Member[destination.length];
+                System.arraycopy(destination,0,dest,0,dest.length);
+            } else {
+                //System.out.println("Sending message:"+msg);
+                super.sendMessage(destination,msg,payload);
+                //System.out.println("Sending message:"+hold);
+                super.sendMessage(dest,hold,null);
+                hold = null;
+                dest = null;
+            }
+        }
+    }
+    
+    
+    
+    
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to