Author: fhanik
Date: Tue Feb 19 12:45:54 2008
New Revision: 629223

URL: http://svn.apache.org/viewvc?rev=629223&view=rev
Log:
more UDP impl

Added:
    
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
    
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java

Modified: tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java Tue Feb 
19 12:45:54 2008
@@ -27,6 +27,8 @@
  * @version $Revision$, $Date$
  */
 public interface ChannelReceiver extends Heartbeat {
+    public static final int MAX_UDP_SIZE = 65535;
+    
     /**
      * Start listening for incoming messages on the host/port
      * @throws java.io.IOException

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
(original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
Tue Feb 19 12:45:54 2008
@@ -335,7 +335,10 @@
                         }
                     };
                 } //end if
-                if ( t != null ) t.start();
+                if ( t != null ) {
+                    t.setDaemon(true);
+                    t.start();
+                }
             }
         } catch (SocketTimeoutException x ) { 
             //do nothing, this is normal, we don't want to block forever

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
Tue Feb 19 12:45:54 2008
@@ -46,7 +46,6 @@
 
     public static final int OPTION_DIRECT_BUFFER = 0x0004;
 
-
     protected static org.apache.juli.logging.Log log = 
org.apache.juli.logging.LogFactory.getLog(ReceiverBase.class);
 
     private MessageListener listener;

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
Tue Feb 19 12:45:54 2008
@@ -249,7 +249,7 @@
 
         setListen(true);
         if (selector!=null && datagramChannel!=null) {
-            ObjectReader oreader = new ObjectReader(65535); //max size for a 
datagram packet
+            ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size 
for a datagram packet
             
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
         }
 

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 Tue Feb 19 12:45:54 2008
@@ -33,6 +33,8 @@
 import org.apache.catalina.tribes.io.ChannelData;
 import org.apache.catalina.tribes.io.BufferPool;
 import java.nio.channels.CancelledKeyException;
+
+import org.apache.catalina.tribes.ChannelReceiver;
 import org.apache.catalina.tribes.UniqueId;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.util.Logs;
@@ -68,10 +70,14 @@
     // loop forever waiting for work to do
     public synchronized void run() {
         if ( buffer == null ) {
+            int size = getRxBufSize();
+            if (key.channel() instanceof DatagramChannel) {
+                size = ChannelReceiver.MAX_UDP_SIZE;
+            }
             if ( (getOptions() & OPTION_DIRECT_BUFFER) == 
OPTION_DIRECT_BUFFER) {
-                buffer = ByteBuffer.allocateDirect(getRxBufSize());
+                buffer = ByteBuffer.allocateDirect(size);
             } else {
-                buffer = ByteBuffer.allocate(getRxBufSize());
+                buffer = ByteBuffer.allocate(size);
             }
         } else {
             buffer.clear();

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
Tue Feb 19 12:45:54 2008
@@ -137,16 +137,24 @@
         connecting = false;
         setRequestCount(0);
         setConnectTime(System.currentTimeMillis());
-        socketChannel.socket().setSendBufferSize(getTxBufSize());
-        socketChannel.socket().setReceiveBufferSize(getRxBufSize());
-        socketChannel.socket().setSoTimeout((int)getTimeout());
-        
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
-        socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
-        socketChannel.socket().setKeepAlive(getSoKeepAlive());
-        socketChannel.socket().setReuseAddress(getSoReuseAddress());
-        socketChannel.socket().setOOBInline(getOoBInline());
-        socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
-        socketChannel.socket().setTrafficClass(getSoTrafficClass());
+        if (socketChannel!=null) {
+            socketChannel.socket().setSendBufferSize(getTxBufSize());
+            socketChannel.socket().setReceiveBufferSize(getRxBufSize());
+            socketChannel.socket().setSoTimeout((int)getTimeout());
+            
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
+            socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
+            socketChannel.socket().setKeepAlive(getSoKeepAlive());
+            socketChannel.socket().setReuseAddress(getSoReuseAddress());
+            socketChannel.socket().setOOBInline(getOoBInline());
+            
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
+            socketChannel.socket().setTrafficClass(getSoTrafficClass());
+        } else if (dataChannel!=null) {
+            dataChannel.socket().setSendBufferSize(getTxBufSize());
+            dataChannel.socket().setReceiveBufferSize(getRxBufSize());
+            dataChannel.socket().setSoTimeout((int)getTimeout());
+            dataChannel.socket().setReuseAddress(getSoReuseAddress());
+            dataChannel.socket().setTrafficClass(getSoTrafficClass());
+        }
     }
 
 
@@ -224,6 +232,9 @@
             dataChannel = DatagramChannel.open();
             dataChannel.configureBlocking(false);
             dataChannel.connect(daddr);
+            completeConnect();
+            dataChannel.register(getSelector(),SelectionKey.OP_WRITE, this);
+            
         } else {
             InetSocketAddress addr = new 
InetSocketAddress(getAddress(),getPort());
             if ( socketChannel != null ) throw new IOException("Socket channel 
has already been established. Connection might be in progress.");

Modified: 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
 (original)
+++ 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
 Tue Feb 19 12:45:54 2008
@@ -28,12 +28,12 @@
 import 
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
 
 /**
- * <p>Title: </p> 
- * 
- * <p>Description: </p> 
- * 
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
  * <p>Company: </p>
- * 
+ *
  * @author not attributable
  * @version 1.0
  */
@@ -61,7 +61,7 @@
         channel1.stop(GroupChannel.DEFAULT);
         channel2.stop(GroupChannel.DEFAULT);
     }
-    
+
     public void testDataSendNO_ACK() throws Exception {
         System.err.println("Starting NO_ACK");
         Thread[] threads = new Thread[threadCount];
@@ -89,7 +89,7 @@
         System.err.println("Finished NO_ACK ["+listener1.count+"]");
         assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
     }
-    
+
     public void testDataSendASYNCM() throws Exception {
             System.err.println("Starting ASYNC MULTI THREAD");
             Thread[] threads = new Thread[threadCount];
@@ -113,7 +113,7 @@
             for (int x=0; x<threads.length; x++ ) { threads[x].join();}
             //sleep for 50 sec, let the other messages in
             long start = System.currentTimeMillis();
-            while ( (System.currentTimeMillis()-start)<15000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+            while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
             System.err.println("Finished ASYNC MULTI THREAD 
["+listener1.count+"]");
             assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
     }
@@ -148,7 +148,7 @@
         public boolean accept(Serializable s, Member m) {
             return (s instanceof Data);
         }
-        
+
         public void messageReceived(Serializable s, Member m) {
             Data d = (Data)s;
             if ( !Data.verify(d) ) {
@@ -161,7 +161,7 @@
             }
         }
     }
-    
+
     public static class Data implements Serializable {
         public int length;
         public byte[] data;
@@ -178,14 +178,14 @@
             Arrays.fill(d.data,d.key);
             return d;
         }
-        
+
         public static boolean verify(Data d) {
             boolean result = (d.length == d.data.length);
             for ( int i=0; result && (i<d.data.length); i++ ) result = result 
&& d.data[i] == d.key;
             return result;
         }
     }
-    
-    
+
+
 
 }

Added: 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java?rev=629223&view=auto
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java 
(added)
+++ 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java 
Tue Feb 19 12:45:54 2008
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
+import org.apache.catalina.tribes.transport.AbstractSender;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import 
org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import 
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+
+/**
+ */
+public class TestUdpPackages extends TestCase {
+    int msgCount = 500;
+    int threadCount = 20;
+    GroupChannel channel1;
+    GroupChannel channel2;
+    Listener listener1;
+    int threadCounter = 0;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channel1 = new GroupChannel();
+        channel1.addInterceptor(new MessageDispatch15Interceptor());
+        channel2 = new GroupChannel();
+        channel2.addInterceptor(new MessageDispatch15Interceptor());
+        ThroughputInterceptor tint = new ThroughputInterceptor();
+        tint.setInterval(500);
+        ThroughputInterceptor tint2 = new ThroughputInterceptor();
+        tint2.setInterval(500);
+        channel1.addInterceptor(tint);
+        channel2.addInterceptor(tint2);
+        listener1 = new Listener();
+        ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+        ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+        rb1.setUdpPort(50000);
+        rb2.setUdpPort(50000);
+        channel2.addChannelListener(listener1);
+        channel1.start(GroupChannel.DEFAULT);
+        channel2.start(GroupChannel.DEFAULT);
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        channel1.stop(GroupChannel.DEFAULT);
+        channel2.stop(GroupChannel.DEFAULT);
+    }
+
+    public void testSingleDataSendNO_ACK() throws Exception {
+        AbstractSender s1 =(AbstractSender) 
((ReplicationTransmitter)channel1.getChannelSender()).getTransport();
+        AbstractSender s2 =(AbstractSender) 
((ReplicationTransmitter)channel2.getChannelSender()).getTransport();
+        s1.setTimeout(Long.MAX_VALUE); //for debugging
+        s2.setTimeout(Long.MAX_VALUE); //for debugging
+        
+        System.err.println("Starting Single package NO_ACK");
+        channel1.send(new Member[] {channel2.getLocalMember(false)}, 
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
+        Thread.sleep(500);
+        System.err.println("Finished Single package NO_ACK 
["+listener1.count+"]");
+        assertEquals("Checking success messages.",1,listener1.count);
+    }
+
+    
+    public void testDataSendNO_ACK() throws Exception {
+        System.err.println("Starting NO_ACK");
+        Thread[] threads = new Thread[threadCount];
+        for (int x=0; x<threads.length; x++ ) {
+            threads[x] = new Thread() {
+                public void run() {
+                    try {
+                        long start = System.currentTimeMillis();
+                        for (int i = 0; i < msgCount; i++) channel1.send(new 
Member[] {channel2.getLocalMember(false)}, 
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
+                        System.out.println("Thread["+this.getName()+"] sent 
"+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+                    }catch ( Exception x ) {
+                        x.printStackTrace();
+                        return;
+                    } finally {
+                        threadCounter++;
+                    }
+                }
+            };
+        } 
+        for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+        for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+        //sleep for 50 sec, let the other messages in
+        long start = System.currentTimeMillis();
+        while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+        System.err.println("Finished NO_ACK ["+listener1.count+"]");
+        assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
+    }
+
+    public void testDataSendASYNCM() throws Exception {
+            System.err.println("Starting ASYNC MULTI THREAD");
+            Thread[] threads = new Thread[threadCount];
+            for (int x=0; x<threads.length; x++ ) {
+                threads[x] = new Thread() {
+                    public void run() {
+                        try {
+                            long start = System.currentTimeMillis();
+                            for (int i = 0; i < msgCount; i++) 
channel1.send(new Member[] {channel2.getLocalMember(false)}, 
Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);
+                            System.out.println("Thread["+this.getName()+"] 
sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+                        }catch ( Exception x ) {
+                            x.printStackTrace();
+                            return;
+                        } finally {
+                            threadCounter++;
+                        }
+                    }
+                };
+            }
+            for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+            for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+            //sleep for 50 sec, let the other messages in
+            long start = System.currentTimeMillis();
+            while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+            System.err.println("Finished ASYNC MULTI THREAD 
["+listener1.count+"]");
+            assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
+    }
+    public void testDataSendASYNC() throws Exception {
+        System.err.println("Starting ASYNC");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);
+        //sleep for 50 sec, let the other messages in
+        long start = System.currentTimeMillis();
+        while ( (System.currentTimeMillis()-start)<5000 && 
msgCount!=listener1.count) Thread.sleep(500);
+        System.err.println("Finished ASYNC");
+        assertEquals("Checking success messages.",msgCount,listener1.count);
+    }
+
+    public void testDataSendACK() throws Exception {
+        System.err.println("Starting ACK");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
+        Thread.sleep(250);
+        System.err.println("Finished ACK");
+        assertEquals("Checking success messages.",msgCount,listener1.count);
+    }
+
+    public void testDataSendSYNCACK() throws Exception {
+        System.err.println("Starting SYNC_ACK");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
+        Thread.sleep(250);
+        System.err.println("Finished SYNC_ACK");
+        assertEquals("Checking success messages.",msgCount,listener1.count);
+    }
+
+    public static class Listener implements ChannelListener {
+        long count = 0;
+        public boolean accept(Serializable s, Member m) {
+            return (s instanceof Data);
+        }
+
+        public void messageReceived(Serializable s, Member m) {
+            Data d = (Data)s;
+            if ( !Data.verify(d) ) {
+                System.err.println("ERROR");
+            } else {
+                count++;
+                if ((count %1000) ==0 ) {
+                    System.err.println("SUCCESS:"+count);
+                }
+            }
+        }
+    }
+
+    public static class Data implements Serializable {
+        public int length;
+        public byte[] data;
+        public byte key;
+        public static Random r = new Random(System.currentTimeMillis());
+        public static Data createRandomData() {
+            return createRandomData(ChannelReceiver.MAX_UDP_SIZE);
+        }
+        public static Data createRandomData(int size) {
+            int i = r.nextInt();
+            i = ( i % 127 );
+            int length = Math.abs(r.nextInt() % size);
+            Data d = new Data();
+            d.length = length;
+            d.key = (byte)i;
+            d.data = new byte[length];
+            Arrays.fill(d.data,d.key);
+            return d;
+        }
+
+        public static boolean verify(Data d) {
+            boolean result = (d.length == d.data.length);
+            for ( int i=0; result && (i<d.data.length); i++ ) result = result 
&& d.data[i] == d.key;
+            return result;
+        }
+    }
+
+
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to