Author: fhanik Date: Thu May 17 06:19:59 2007 New Revision: 538908 URL: http://svn.apache.org/viewvc?view=rev&rev=538908 Log: Added in a unit test for ordering messages
Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?view=diff&rev=538908&r1=538907&r2=538908 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Thu May 17 06:19:59 2007 @@ -24,6 +24,7 @@ import org.apache.catalina.tribes.group.ChannelInterceptorBase; import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.io.XByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; @@ -59,7 +60,7 @@ private boolean forwardExpired = true; private int maxQueue = Integer.MAX_VALUE; - public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( !okToProcess(msg.getOptions()) ) { super.sendMessage(destination, msg, payload); return; @@ -76,7 +77,7 @@ } } - public void messageReceived(ChannelMessage msg) { + public synchronized void messageReceived(ChannelMessage msg) { if ( !okToProcess(msg.getOptions()) ) { super.messageReceived(msg); return; @@ -87,7 +88,7 @@ if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false); } - public synchronized void processLeftOvers(Member member, boolean force) { + public void processLeftOvers(Member member, boolean force) { MessageOrder tmp = (MessageOrder)incoming.get(member); if ( force ) { Counter cnt = getInCounter(member); @@ -100,7 +101,7 @@ * @param order MessageOrder * @return boolean - true if a message expired and was processed */ - public synchronized boolean processIncoming(MessageOrder order) { + public boolean processIncoming(MessageOrder order) { boolean result = false; Member member = order.getMessage().getAddress(); Counter cnt = getInCounter(member); @@ -130,7 +131,8 @@ //reset the head if ( tmp == head ) head = tmp.next; cnt.setCounter(tmp.getMsgNr()+1); - if ( getForwardExpired() ) super.messageReceived(tmp.getMessage()); + if ( getForwardExpired() ) + super.messageReceived(tmp.getMessage()); tmp.setMessage(null); tmp = tmp.next; if ( prev != null ) prev.next = tmp; @@ -145,14 +147,14 @@ return result; } - public void memberAdded(Member member) { + public synchronized void memberAdded(Member member) { //notify upwards getInCounter(member); getOutCounter(member); super.memberAdded(member); } - public void memberDisappeared(Member member) { + public synchronized void memberDisappeared(Member member) { //notify upwards outcounter.remove(member); incounter.remove(member); @@ -166,7 +168,7 @@ return cnt.inc(); } - public synchronized Counter getInCounter(Member mbr) { + public Counter getInCounter(Member mbr) { Counter cnt = (Counter)incounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); @@ -176,7 +178,7 @@ return cnt; } - public synchronized Counter getOutCounter(Member mbr) { + public Counter getOutCounter(Member mbr) { Counter cnt = (Counter)outcounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); @@ -186,18 +188,18 @@ } public static class Counter { - private int value = 0; + private AtomicInteger value = new AtomicInteger(0); public int getCounter() { - return value; + return value.get(); } - public synchronized void setCounter(int counter) { - this.value = counter; + public void setCounter(int counter) { + this.value.set(counter); } - public synchronized int inc() { - return ++value; + public int inc() { + return value.addAndGet(1); } } Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java?view=auto&rev=538908 ============================================================================== --- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java (added) +++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java Thu May 17 06:19:59 2007 @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.test.interceptors; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; +import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import junit.framework.TestCase; +import junit.framework.TestResult; +import junit.framework.TestSuite; +import org.apache.catalina.tribes.ChannelListener; +import java.io.Serializable; +import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.group.InterceptorPayload; +import org.apache.catalina.tribes.ChannelException; + +public class TestOrderInterceptor extends TestCase { + + GroupChannel[] channels = null; + OrderInterceptor[] orderitcs = null; + MangleOrderInterceptor[] mangleitcs = null; + TestListener[] test = null; + int channelCount = 2; + Thread[] threads = null; + protected void setUp() throws Exception { + System.out.println("Setup"); + super.setUp(); + channels = new GroupChannel[channelCount]; + orderitcs = new OrderInterceptor[channelCount]; + mangleitcs = new MangleOrderInterceptor[channelCount]; + test = new TestListener[channelCount]; + threads = new Thread[channelCount]; + for ( int i=0; i<channelCount; i++ ) { + channels[i] = new GroupChannel(); + orderitcs[i] = new OrderInterceptor(); + mangleitcs[i] = new MangleOrderInterceptor(); + orderitcs[i].setExpire(Long.MAX_VALUE); + channels[i].addInterceptor(orderitcs[i]); + channels[i].addInterceptor(mangleitcs[i]); + test[i] = new TestListener(i); + channels[i].addChannelListener(test[i]); + final int j = i; + threads[i] = new Thread() { + public void run() { + try { + channels[j].start(Channel.DEFAULT); + Thread.sleep(50); + } catch (Exception x) { + x.printStackTrace(); + } + } + }; + } + for ( int i=0; i<channelCount; i++ ) threads[i].start(); + for ( int i=0; i<channelCount; i++ ) threads[i].join(); + Thread.sleep(1000); + } + + public void testOrder1() throws Exception { + Member[] dest = channels[0].getMembers(); + for ( int i=0; i<100; i++ ) { + channels[0].send(dest,new Integer(i),0); + } + Thread.sleep(5000); + for ( int i=0; i<test.length; i++ ) { + super.assertEquals(false,test[i].fail); + } + } + + protected void tearDown() throws Exception { + System.out.println("tearDown"); + super.tearDown(); + for ( int i=0; i<channelCount; i++ ) { + channels[i].stop(Channel.DEFAULT); + } + } + + public static void main(String[] args) throws Exception { + TestSuite suite = new TestSuite(); + suite.addTestSuite(TestOrderInterceptor.class); + suite.run(new TestResult()); + } + + public static class TestListener implements ChannelListener { + int id = -1; + public TestListener(int id) { + this.id = id; + } + int cnt = 0; + int total = 0; + boolean fail = false; + public synchronized void messageReceived(Serializable msg, Member sender) { + total++; + Integer i = (Integer)msg; + if ( i.intValue() != cnt ) fail = true; + else cnt++; + System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total); + + } + + public boolean accept(Serializable msg, Member sender) { + return (msg instanceof Integer); + } + } + + public static class MangleOrderInterceptor extends ChannelInterceptorBase { + int cnt = 1; + ChannelMessage hold = null; + Member[] dest = null; + public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + if ( hold == null ) { + //System.out.println("Skipping message:"+msg); + hold = (ChannelMessage)msg.deepclone(); + dest = new Member[destination.length]; + System.arraycopy(destination,0,dest,0,dest.length); + } else { + //System.out.println("Sending message:"+msg); + super.sendMessage(destination,msg,payload); + //System.out.println("Sending message:"+hold); + super.sendMessage(dest,hold,null); + hold = null; + dest = null; + } + } + } + + + + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]