Author: fhanik Date: Tue Feb 19 12:45:54 2008 New Revision: 629223 URL: http://svn.apache.org/viewvc?rev=629223&view=rev Log: more UDP impl
Added: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java?rev=629223&r1=629222&r2=629223&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java Tue Feb 19 12:45:54 2008 @@ -27,6 +27,8 @@ * @version $Revision$, $Date$ */ public interface ChannelReceiver extends Heartbeat { + public static final int MAX_UDP_SIZE = 65535; + /** * Start listening for incoming messages on the host/port * @throws java.io.IOException Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=629223&r1=629222&r2=629223&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue Feb 19 12:45:54 2008 @@ -335,7 +335,10 @@ } }; } //end if - if ( t != null ) t.start(); + if ( t != null ) { + t.setDaemon(true); + t.start(); + } } } catch (SocketTimeoutException x ) { //do nothing, this is normal, we don't want to block forever Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=629223&r1=629222&r2=629223&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Tue Feb 19 12:45:54 2008 @@ -46,7 +46,6 @@ public static final int OPTION_DIRECT_BUFFER = 0x0004; - protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReceiverBase.class); private MessageListener listener; Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=629223&r1=629222&r2=629223&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Tue Feb 19 12:45:54 2008 @@ -249,7 +249,7 @@ setListen(true); if (selector!=null && datagramChannel!=null) { - ObjectReader oreader = new ObjectReader(65535); //max size for a datagram packet + ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size for a datagram packet registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=629223&r1=629222&r2=629223&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Tue Feb 19 12:45:54 2008 @@ -33,6 +33,8 @@ import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.BufferPool; import java.nio.channels.CancelledKeyException; + +import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.RemoteProcessException; import org.apache.catalina.tribes.util.Logs; @@ -68,10 +70,14 @@ // loop forever waiting for work to do public synchronized void run() { if ( buffer == null ) { + int size = getRxBufSize(); + if (key.channel() instanceof DatagramChannel) { + size = ChannelReceiver.MAX_UDP_SIZE; + } if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { - buffer = ByteBuffer.allocateDirect(getRxBufSize()); + buffer = ByteBuffer.allocateDirect(size); } else { - buffer = ByteBuffer.allocate(getRxBufSize()); + buffer = ByteBuffer.allocate(size); } } else { buffer.clear(); Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=629223&r1=629222&r2=629223&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java Tue Feb 19 12:45:54 2008 @@ -137,16 +137,24 @@ connecting = false; setRequestCount(0); setConnectTime(System.currentTimeMillis()); - socketChannel.socket().setSendBufferSize(getTxBufSize()); - socketChannel.socket().setReceiveBufferSize(getRxBufSize()); - socketChannel.socket().setSoTimeout((int)getTimeout()); - socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0); - socketChannel.socket().setTcpNoDelay(getTcpNoDelay()); - socketChannel.socket().setKeepAlive(getSoKeepAlive()); - socketChannel.socket().setReuseAddress(getSoReuseAddress()); - socketChannel.socket().setOOBInline(getOoBInline()); - socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); - socketChannel.socket().setTrafficClass(getSoTrafficClass()); + if (socketChannel!=null) { + socketChannel.socket().setSendBufferSize(getTxBufSize()); + socketChannel.socket().setReceiveBufferSize(getRxBufSize()); + socketChannel.socket().setSoTimeout((int)getTimeout()); + socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0); + socketChannel.socket().setTcpNoDelay(getTcpNoDelay()); + socketChannel.socket().setKeepAlive(getSoKeepAlive()); + socketChannel.socket().setReuseAddress(getSoReuseAddress()); + socketChannel.socket().setOOBInline(getOoBInline()); + socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); + socketChannel.socket().setTrafficClass(getSoTrafficClass()); + } else if (dataChannel!=null) { + dataChannel.socket().setSendBufferSize(getTxBufSize()); + dataChannel.socket().setReceiveBufferSize(getRxBufSize()); + dataChannel.socket().setSoTimeout((int)getTimeout()); + dataChannel.socket().setReuseAddress(getSoReuseAddress()); + dataChannel.socket().setTrafficClass(getSoTrafficClass()); + } } @@ -224,6 +232,9 @@ dataChannel = DatagramChannel.open(); dataChannel.configureBlocking(false); dataChannel.connect(daddr); + completeConnect(); + dataChannel.register(getSelector(),SelectionKey.OP_WRITE, this); + } else { InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); Modified: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=629223&r1=629222&r2=629223&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java (original) +++ tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java Tue Feb 19 12:45:54 2008 @@ -28,12 +28,12 @@ import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; /** - * <p>Title: </p> - * - * <p>Description: </p> - * + * <p>Title: </p> + * + * <p>Description: </p> + * * <p>Company: </p> - * + * * @author not attributable * @version 1.0 */ @@ -61,7 +61,7 @@ channel1.stop(GroupChannel.DEFAULT); channel2.stop(GroupChannel.DEFAULT); } - + public void testDataSendNO_ACK() throws Exception { System.err.println("Starting NO_ACK"); Thread[] threads = new Thread[threadCount]; @@ -89,7 +89,7 @@ System.err.println("Finished NO_ACK ["+listener1.count+"]"); assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); } - + public void testDataSendASYNCM() throws Exception { System.err.println("Starting ASYNC MULTI THREAD"); Thread[] threads = new Thread[threadCount]; @@ -113,7 +113,7 @@ for (int x=0; x<threads.length; x++ ) { threads[x].join();} //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); - while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); + while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); System.err.println("Finished ASYNC MULTI THREAD ["+listener1.count+"]"); assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); } @@ -148,7 +148,7 @@ public boolean accept(Serializable s, Member m) { return (s instanceof Data); } - + public void messageReceived(Serializable s, Member m) { Data d = (Data)s; if ( !Data.verify(d) ) { @@ -161,7 +161,7 @@ } } } - + public static class Data implements Serializable { public int length; public byte[] data; @@ -178,14 +178,14 @@ Arrays.fill(d.data,d.key); return d; } - + public static boolean verify(Data d) { boolean result = (d.length == d.data.length); for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; return result; } } - - + + } Added: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java?rev=629223&view=auto ============================================================================== --- tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java (added) +++ tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java Tue Feb 19 12:45:54 2008 @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.test.channel; + +import junit.framework.TestCase; +import java.io.Serializable; +import java.util.Random; +import java.util.Arrays; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener; +import org.apache.catalina.tribes.transport.AbstractSender; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.catalina.tribes.transport.ReplicationTransmitter; +import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; +import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; +import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; + +/** + */ +public class TestUdpPackages extends TestCase { + int msgCount = 500; + int threadCount = 20; + GroupChannel channel1; + GroupChannel channel2; + Listener listener1; + int threadCounter = 0; + protected void setUp() throws Exception { + super.setUp(); + channel1 = new GroupChannel(); + channel1.addInterceptor(new MessageDispatch15Interceptor()); + channel2 = new GroupChannel(); + channel2.addInterceptor(new MessageDispatch15Interceptor()); + ThroughputInterceptor tint = new ThroughputInterceptor(); + tint.setInterval(500); + ThroughputInterceptor tint2 = new ThroughputInterceptor(); + tint2.setInterval(500); + channel1.addInterceptor(tint); + channel2.addInterceptor(tint2); + listener1 = new Listener(); + ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver(); + ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver(); + rb1.setUdpPort(50000); + rb2.setUdpPort(50000); + channel2.addChannelListener(listener1); + channel1.start(GroupChannel.DEFAULT); + channel2.start(GroupChannel.DEFAULT); + } + + protected void tearDown() throws Exception { + super.tearDown(); + channel1.stop(GroupChannel.DEFAULT); + channel2.stop(GroupChannel.DEFAULT); + } + + public void testSingleDataSendNO_ACK() throws Exception { + AbstractSender s1 =(AbstractSender) ((ReplicationTransmitter)channel1.getChannelSender()).getTransport(); + AbstractSender s2 =(AbstractSender) ((ReplicationTransmitter)channel2.getChannelSender()).getTransport(); + s1.setTimeout(Long.MAX_VALUE); //for debugging + s2.setTimeout(Long.MAX_VALUE); //for debugging + + System.err.println("Starting Single package NO_ACK"); + channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP); + Thread.sleep(500); + System.err.println("Finished Single package NO_ACK ["+listener1.count+"]"); + assertEquals("Checking success messages.",1,listener1.count); + } + + + public void testDataSendNO_ACK() throws Exception { + System.err.println("Starting NO_ACK"); + Thread[] threads = new Thread[threadCount]; + for (int x=0; x<threads.length; x++ ) { + threads[x] = new Thread() { + public void run() { + try { + long start = System.currentTimeMillis(); + for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP); + System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms."); + }catch ( Exception x ) { + x.printStackTrace(); + return; + } finally { + threadCounter++; + } + } + }; + } + for (int x=0; x<threads.length; x++ ) { threads[x].start();} + for (int x=0; x<threads.length; x++ ) { threads[x].join();} + //sleep for 50 sec, let the other messages in + long start = System.currentTimeMillis(); + while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); + System.err.println("Finished NO_ACK ["+listener1.count+"]"); + assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); + } + + public void testDataSendASYNCM() throws Exception { + System.err.println("Starting ASYNC MULTI THREAD"); + Thread[] threads = new Thread[threadCount]; + for (int x=0; x<threads.length; x++ ) { + threads[x] = new Thread() { + public void run() { + try { + long start = System.currentTimeMillis(); + for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP); + System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms."); + }catch ( Exception x ) { + x.printStackTrace(); + return; + } finally { + threadCounter++; + } + } + }; + } + for (int x=0; x<threads.length; x++ ) { threads[x].start();} + for (int x=0; x<threads.length; x++ ) { threads[x].join();} + //sleep for 50 sec, let the other messages in + long start = System.currentTimeMillis(); + while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); + System.err.println("Finished ASYNC MULTI THREAD ["+listener1.count+"]"); + assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); + } + public void testDataSendASYNC() throws Exception { + System.err.println("Starting ASYNC"); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP); + //sleep for 50 sec, let the other messages in + long start = System.currentTimeMillis(); + while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500); + System.err.println("Finished ASYNC"); + assertEquals("Checking success messages.",msgCount,listener1.count); + } + + public void testDataSendACK() throws Exception { + System.err.println("Starting ACK"); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP); + Thread.sleep(250); + System.err.println("Finished ACK"); + assertEquals("Checking success messages.",msgCount,listener1.count); + } + + public void testDataSendSYNCACK() throws Exception { + System.err.println("Starting SYNC_ACK"); + for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP); + Thread.sleep(250); + System.err.println("Finished SYNC_ACK"); + assertEquals("Checking success messages.",msgCount,listener1.count); + } + + public static class Listener implements ChannelListener { + long count = 0; + public boolean accept(Serializable s, Member m) { + return (s instanceof Data); + } + + public void messageReceived(Serializable s, Member m) { + Data d = (Data)s; + if ( !Data.verify(d) ) { + System.err.println("ERROR"); + } else { + count++; + if ((count %1000) ==0 ) { + System.err.println("SUCCESS:"+count); + } + } + } + } + + public static class Data implements Serializable { + public int length; + public byte[] data; + public byte key; + public static Random r = new Random(System.currentTimeMillis()); + public static Data createRandomData() { + return createRandomData(ChannelReceiver.MAX_UDP_SIZE); + } + public static Data createRandomData(int size) { + int i = r.nextInt(); + i = ( i % 127 ); + int length = Math.abs(r.nextInt() % size); + Data d = new Data(); + d.length = length; + d.key = (byte)i; + d.data = new byte[length]; + Arrays.fill(d.data,d.key); + return d; + } + + public static boolean verify(Data d) { + boolean result = (d.length == d.data.length); + for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; + return result; + } + } + + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]