Author: fhanik
Date: Mon Feb 18 16:57:54 2008
New Revision: 628940

URL: http://svn.apache.org/viewvc?rev=628940&view=rev
Log:
more UDP code

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/Member.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
    
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java

Modified: tomcat/trunk/java/org/apache/catalina/tribes/Member.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Member.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/Member.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/Member.java Mon Feb 18 
16:57:54 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,7 +24,7 @@
  * The host is what interface the member is listening to, to receive data<br>
  * The port is what port the member is listening to, to receive data<br>
  * The uniqueId defines the session id for the member. This is an important 
feature
- * since a member that has crashed and the starts up again on the same 
port/host is 
+ * since a member that has crashed and the starts up again on the same 
port/host is
  * not guaranteed to be the same member, so no state transfers will ever be 
confused
  * @author Filip Hanik
  * @version $Revision$, $Date$
@@ -32,18 +32,18 @@
 
 
 public interface Member {
-    
+
     /**
      * When a member leaves the cluster, the payload of the memberDisappeared 
member
      * will be the following bytes. This indicates a soft shutdown, and not a 
crash
      */
     public static final byte[] SHUTDOWN_PAYLOAD = new byte[] {66, 65, 66, 89, 
45, 65, 76, 69, 88};
-    
+
     /**
      * Returns the name of this node, should be unique within the group.
      */
     public String getName();
-  
+
     /**
      * Returns the listen host for the ChannelReceiver implementation
      * @return IPv4 or IPv6 representation of the host address this member 
listens to incoming data
@@ -57,7 +57,7 @@
      * @see ChannelReceiver
      */
     public int getPort();
-    
+
     /**
      * Returns the secure listen port for the ChannelReceiver implementation.
      * Returns -1 if its not listening to a secure port.
@@ -65,7 +65,13 @@
      * @see ChannelReceiver
      */
     public int getSecurePort();
-    
+
+    /**
+     * Returns the UDP port that this member is listening to for UDP messages.
+     * @return the listen UDP port for this member, -1 if its not listening on 
a UDP port
+     */
+    public int getUdpPort();
+
 
     /**
      * Contains information on how long this member has been online.
@@ -74,7 +80,7 @@
      * @return nr of milliseconds since this member started.
      */
     public long getMemberAliveTime();
-    
+
     /**
      * The current state of the member
      * @return boolean - true if the member is functioning correctly
@@ -85,32 +91,32 @@
      * @return boolean - true if the member is suspect, but the crash has not 
been confirmed
      */
     public boolean isSuspect();
-    
+
     /**
-     * 
-     * @return boolean - true if the member has been confirmed to malfunction 
+     *
+     * @return boolean - true if the member has been confirmed to malfunction
      */
     public boolean isFailing();
-    
+
     /**
      * returns a UUID unique for this member over all sessions.
      * If the member crashes and restarts, the uniqueId will be different.
      * @return byte[]
      */
     public byte[] getUniqueId();
-    
+
     /**
      * returns the payload associated with this member
      * @return byte[]
      */
     public byte[] getPayload();
-    
+
     /**
      * returns the command associated with this member
      * @return byte[]
      */
     public byte[] getCommand();
-    
+
     /**
      * Domain for this cluster
      * @return byte[]

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java 
Mon Feb 18 16:57:54 2008
@@ -116,7 +116,8 @@
      */
     public boolean keepalive() {
         boolean disconnect = false;
-        if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect = 
true;
+        if (isUdpBased()) disconnect = true; //always disconnect UDP, TODO 
optimize the keepalive handling
+        else if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) 
disconnect = true;
         else if ( keepAliveTime >= 0 && 
(System.currentTimeMillis()-connectTime)>keepAliveTime ) disconnect = true;
         if ( disconnect ) disconnect();
         return disconnect;
@@ -299,6 +300,7 @@
         this.destination = destination;
         this.address = InetAddress.getByAddress(destination.getHost());
         this.port = destination.getPort();
+        this.udpPort = destination.getUdpPort();
 
     }
 

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
Mon Feb 18 16:57:54 2008
@@ -17,6 +17,7 @@
 package org.apache.catalina.tribes.transport;
 
 import java.io.IOException;
+import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -220,6 +221,38 @@
         }
         return retries;
     }
+
+    /**
+     * Same as bind() except it does it for the UDP port
+     * @param socket
+     * @param portstart
+     * @param retries
+     * @return
+     * @throws IOException
+     */
+    protected int bindUdp(DatagramSocket socket, int portstart, int retries) 
throws IOException {
+        InetSocketAddress addr = null;
+        while ( retries > 0 ) {
+            try {
+                addr = new InetSocketAddress(getBind(), portstart);
+                socket.bind(addr);
+                setUdpPort(portstart);
+                log.info("UDP Receiver Server Socket bound to:"+addr);
+                return 0;
+            }catch ( IOException x) {
+                retries--;
+                if ( retries <= 0 ) {
+                    log.info("Unable to bind UDP socket to:"+addr+" throwing 
error.");
+                    throw x;
+                }
+                portstart++;
+                try {Thread.sleep(25);}catch( InterruptedException 
ti){Thread.currentThread().interrupted();}
+                retries = bindUdp(socket,portstart,retries);
+            }
+        }
+        return retries;
+    }
+
 
     public void messageDataReceived(ChannelMessage data) {
         if ( this.listener != null ) {

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
Mon Feb 18 16:57:54 2008
@@ -144,10 +144,7 @@
             datagramChannel = DatagramChannel.open();
             datagramChannel.configureBlocking(false);
             //bind to the address to avoid security checks
-            InetSocketAddress daddr = new 
InetSocketAddress(getBind(),getUdpPort());
-            //TODO should we auto increment the UDP port to avoid collisions?
-            //we could auto increment with the offset from the tcp listen port
-            datagramChannel.connect(daddr);
+            bindUdp(datagramChannel.socket(),getUdpPort(),getAutoBind());
         }
 
 
@@ -188,7 +185,10 @@
         }
         key.cancel();
         key.attach(null);
-        try { ((SocketChannel)key.channel()).socket().close(); } catch 
(IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
+        if (key.channel() instanceof SocketChannel)
+            try { ((SocketChannel)key.channel()).socket().close(); } catch 
(IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
+        if (key.channel() instanceof DatagramChannel)
+            try { ((DatagramChannel)key.channel()).socket().close(); } catch 
(Exception e) { if (log.isDebugEnabled()) log.debug("", e); }
         try { key.channel().close(); } catch (IOException e) { if 
(log.isDebugEnabled()) log.debug("", e); }
 
     }
@@ -249,7 +249,7 @@
 
         setListen(true);
         if (selector!=null && datagramChannel!=null) {
-            ObjectReader oreader = new ObjectReader(1024*65);
+            ObjectReader oreader = new ObjectReader(65535); //max size for a 
datagram packet
             
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
         }
 

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 Mon Feb 18 16:57:54 2008
@@ -17,6 +17,7 @@
 
 package org.apache.catalina.tribes.transport.nio;
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.ReadableByteChannel;
@@ -147,19 +148,33 @@
         reader.setLastAccess(System.currentTimeMillis());
         reader.access();
         ReadableByteChannel channel = (ReadableByteChannel) key.channel();
-        int count;
+        int count=-1;
         buffer.clear();                        // make buffer empty
+        SocketAddress saddr = null;
 
-        // loop while data available, channel is non-blocking
-        while ((count = channel.read (buffer)) > 0) {
-            buffer.flip();             // make buffer readable
+        if (channel instanceof SocketChannel) {
+            // loop while data available, channel is non-blocking
+            while ((count = channel.read (buffer)) > 0) {
+                buffer.flip();         // make buffer readable
+                if ( buffer.hasArray() )
+                    reader.append(buffer.array(),0,count,false);
+                else
+                    reader.append(buffer,count,false);
+                buffer.clear();                // make buffer empty
+                //do we have at least one package?
+                if ( reader.hasPackage() ) break;
+            }
+        } else if (channel instanceof DatagramChannel) {
+            DatagramChannel dchannel = (DatagramChannel)channel;
+            saddr = dchannel.receive(buffer);
+            buffer.flip();      // make buffer readable
             if ( buffer.hasArray() )
-                reader.append(buffer.array(),0,count,false);
+                
reader.append(buffer.array(),0,buffer.limit()-buffer.position(),false);
             else
-                reader.append(buffer,count,false);
-            buffer.clear();            // make buffer empty
-            //do we have at least one package?
-            if ( reader.hasPackage() ) break;
+                reader.append(buffer,buffer.limit()-buffer.position(),false);
+            buffer.clear();     // make buffer empty
+            //did we get a package
+            count = reader.hasPackage()?1:-1;
         }
 
         int pkgcnt = reader.count();
@@ -180,7 +195,7 @@
              * server before completing the request
              * This is considered an asynchronized request
              */
-            if (ChannelData.sendAckAsync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
+            if (ChannelData.sendAckAsync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr);
             try {
                 if ( Logs.MESSAGES.isTraceEnabled() ) {
                     try {
@@ -194,13 +209,13 @@
                  * server before sending the ack to the remote server
                  * This is considered a synchronized request
                  */
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr);
             }catch ( RemoteProcessException e ) {
                 if ( log.isDebugEnabled() ) log.error("Processing of cluster 
message failed.",e);
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr);
             }catch ( Exception e ) {
                 log.error("Processing of cluster message failed.",e);
-                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) 
sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr);
             }
             if ( getUseBufferPool() ) {
                 BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
@@ -275,17 +290,25 @@
 
 
     /**
-     * send a reply-acknowledgement (6,2,3)
+     * send a reply-acknowledgement (6,2,3), sends it doing a busy write, the 
ACK is so small
+     * that it should always go to the buffer
      * @param key
      * @param channel
      */
-    protected void sendAck(SelectionKey key, WritableByteChannel channel, 
byte[] command) {
-
+    protected void sendAck(SelectionKey key, WritableByteChannel channel, 
byte[] command, SocketAddress udpaddr) {
         try {
+
             ByteBuffer buf = ByteBuffer.wrap(command);
             int total = 0;
-            while ( total < command.length ) {
-                total += channel.write(buf);
+            if (channel instanceof DatagramChannel) {
+                DatagramChannel dchannel = (DatagramChannel)channel;
+                while ( total < command.length ) {
+                    total += dchannel.send(buf, udpaddr);
+                }
+            } else {
+                while ( total < command.length ) {
+                    total += channel.write(buf);
+                }
             }
             if (log.isTraceEnabled()) {
                 log.trace("ACK sent to " +

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
Mon Feb 18 16:57:54 2008
@@ -154,7 +154,7 @@
     protected boolean read(SelectionKey key) throws IOException {
         //if there is no message here, we are done
         if ( current == null ) return true;
-        int read = socketChannel.read(readbuf);
+        int read = isUdpBased()?dataChannel.read(readbuf) : 
socketChannel.read(readbuf);
         //end of stream
         if ( read == -1 ) throw new IOException("Unable to receive an ack 
message. EOF on socket channel has been reached.");
         //no data read
@@ -175,14 +175,14 @@
 
 
     protected boolean write(SelectionKey key) throws IOException {
-        if ( (!isConnected()) || (this.socketChannel==null)) {
+        if ( (!isConnected()) || (this.socketChannel==null && 
this.dataChannel==null)) {
             throw new IOException("NioSender is not connected, this should not 
occur.");
         }
         if ( current != null ) {
             if ( remaining > 0 ) {
                 //weve written everything, or we are starting a new package
                 //protect against buffer overwrite
-                int byteswritten = socketChannel.write(writebuf);
+                int byteswritten = isUdpBased()?dataChannel.write(writebuf) : 
socketChannel.write(writebuf);
                 if (byteswritten == -1 ) throw new EOFException();
                 remaining -= byteswritten;
                 //if the entire message was written from the buffer
@@ -204,7 +204,7 @@
      * @todo Implement this org.apache.catalina.tribes.transport.IDataSender 
method
      */
     public synchronized void connect() throws IOException {
-        if ( connecting ) return;
+        if ( connecting || isConnected()) return;
         connecting = true;
         if ( isConnected() ) throw new IOException("NioSender is already in 
connected state.");
         if ( readbuf == null ) {
@@ -218,15 +218,23 @@
             writebuf.clear();
         }
 
-        InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort());
-        if ( socketChannel != null ) throw new IOException("Socket channel has 
already been established. Connection might be in progress.");
-        socketChannel = SocketChannel.open();
-        socketChannel.configureBlocking(false);
-        if ( socketChannel.connect(addr) ) {
-            completeConnect();
-            socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this);
+        if (isUdpBased()) {
+            InetSocketAddress daddr = new 
InetSocketAddress(getAddress(),getUdpPort());
+            if ( dataChannel != null ) throw new IOException("Datagram channel 
has already been established. Connection might be in progress.");
+            dataChannel = DatagramChannel.open();
+            dataChannel.configureBlocking(false);
+            dataChannel.connect(daddr);
         } else {
-            socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, 
this);
+            InetSocketAddress addr = new 
InetSocketAddress(getAddress(),getPort());
+            if ( socketChannel != null ) throw new IOException("Socket channel 
has already been established. Connection might be in progress.");
+            socketChannel = SocketChannel.open();
+            socketChannel.configureBlocking(false);
+            if ( socketChannel.connect(addr) ) {
+                completeConnect();
+                socketChannel.register(getSelector(), SelectionKey.OP_WRITE, 
this);
+            } else {
+                socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, 
this);
+            }
         }
     }
 
@@ -252,6 +260,18 @@
                     socketChannel = null;
                 }
             }
+            if ( dataChannel != null ) {
+                try {
+                    try {dataChannel.socket().close();}catch ( Exception x){}
+                    //error free close, all the way
+                    //try {socket.shutdownOutput();}catch ( Exception x){}
+                    //try {socket.shutdownInput();}catch ( Exception x){}
+                    //try {socket.close();}catch ( Exception x){}
+                    try {dataChannel.close();}catch ( Exception x){}
+                }finally {
+                    dataChannel = null;
+                }
+            }
         } catch ( Exception x ) {
             log.error("Unable to disconnect NioSender. msg="+x.getMessage());
             if ( log.isDebugEnabled() ) log.debug("Unable to disconnect 
NioSender. msg="+x.getMessage(),x);
@@ -273,6 +293,7 @@
         setAttempt(0);
         setRequestCount(0);
         setConnectTime(-1);
+        setUdpBased(false);
     }
 
     private ByteBuffer getReadBuffer() {
@@ -312,7 +333,10 @@
            //writebuf.limit(length);
            writebuf.flip();
            if (isConnected()) {
-               socketChannel.register(getSelector(), SelectionKey.OP_WRITE, 
this);
+               if (isUdpBased())
+                   dataChannel.register(getSelector(), SelectionKey.OP_WRITE, 
this);
+               else
+                   socketChannel.register(getSelector(), 
SelectionKey.OP_WRITE, this);
            }
        }
    }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=628940&r1=628939&r2=628940&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
 Mon Feb 18 16:57:54 2008
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,7 +48,7 @@
  * @version 1.0
  */
 public class ParallelNioSender extends AbstractSender implements 
MultiPointSender {
-    
+
     protected static org.apache.juli.logging.Log log = 
org.apache.juli.logging.LogFactory.getLog(ParallelNioSender.class);
     protected long selectTimeout = 5000; //default 5 seconds, same as send 
timeout
     protected Selector selector;
@@ -58,15 +58,16 @@
         selector = Selector.open();
         setConnected(true);
     }
-    
-    
+
+
     public synchronized void sendMessage(Member[] destination, ChannelMessage 
msg) throws ChannelException {
         long start = System.currentTimeMillis();
+        this.setUdpBased((msg.getOptions()&Channel.SEND_OPTIONS_UDP) == 
Channel.SEND_OPTIONS_UDP);
         byte[] data = XByteBuffer.createDataPackage((ChannelData)msg);
         NioSender[] senders = setupForSend(destination);
         connect(senders);
         setData(senders,data);
-        
+
         int remaining = senders.length;
         ChannelException cx = null;
         try {
@@ -108,17 +109,17 @@
             if ( x instanceof ChannelException ) throw (ChannelException)x;
             else throw new ChannelException(x);
         }
-        
+
     }
-    
+
     private int doLoop(long selectTimeOut, int maxAttempts, boolean 
waitForAck, ChannelMessage msg) throws IOException, ChannelException {
         int completed = 0;
         int selectedKeys = selector.select(selectTimeOut);
-        
+
         if (selectedKeys == 0) {
             return 0;
         }
-        
+
         Iterator it = selector.selectedKeys().iterator();
         while (it.hasNext()) {
             SelectionKey sk = (SelectionKey) it.next();
@@ -140,16 +141,16 @@
                 int attempt = sender.getAttempt()+1;
                 boolean retry = (sender.getAttempt() <= maxAttempts && 
maxAttempts>0);
                 synchronized (state) {
-                
+
                     //sk.cancel();
                     if (state.isSuspect()) state.setFailing();
                     if (state.isReady()) {
                         state.setSuspect();
-                        if ( retry ) 
+                        if ( retry )
                             log.warn("Member send is failing for:" + 
sender.getDestination().getName() +" ; Setting to suspect and retrying.");
-                        else 
+                        else
                             log.warn("Member send is failing for:" + 
sender.getDestination().getName() +" ; Setting to suspect.", x);
-                    }                    
+                    }
                 }
                 if ( !isConnected() ) {
                     log.warn("Not retrying send for:" + 
sender.getDestination().getName() + "; Sender is disconnected.");
@@ -157,11 +158,11 @@
                     cx.addFaultyMember(sender.getDestination(),x);
                     throw cx;
                 }
-                
+
                 byte[] data = sender.getMessage();
                 if ( retry ) {
-                    try { 
-                        sender.disconnect(); 
+                    try {
+                        sender.disconnect();
                         sender.connect();
                         sender.setAttempt(attempt);
                         sender.setMessage(data);
@@ -178,12 +179,12 @@
         return completed;
 
     }
-    
+
     private void connect(NioSender[] senders) throws ChannelException {
         ChannelException x = null;
         for (int i=0; i<senders.length; i++ ) {
             try {
-                if (!senders[i].isConnected()) senders[i].connect();
+                senders[i].connect();
             }catch ( IOException io ) {
                 if ( x==null ) x = new ChannelException(io);
                 x.addFaultyMember(senders[i].getDestination(),io);
@@ -191,7 +192,7 @@
         }
         if ( x != null ) throw x;
     }
-    
+
     private void setData(NioSender[] senders, byte[] data) throws 
ChannelException {
         ChannelException x = null;
         for (int i=0; i<senders.length; i++ ) {
@@ -204,8 +205,8 @@
         }
         if ( x != null ) throw x;
     }
-    
-    
+
+
     private NioSender[] setupForSend(Member[] destination) throws 
ChannelException {
         ChannelException cx = null;
         NioSender[] result = new NioSender[destination.length];
@@ -222,6 +223,7 @@
                     sender.reset();
                     sender.setDestination(destination[i]);
                     sender.setSelector(selector);
+                    sender.setUdpBased(isUdpBased());
                     result[i] = sender;
                 }
             }catch ( UnknownHostException x ) {
@@ -232,13 +234,13 @@
         if ( cx != null ) throw cx;
         else return result;
     }
-    
+
     public void connect() {
         //do nothing, we connect on demand
         setConnected(true);
     }
-    
-    
+
+
     private synchronized void close() throws ChannelException  {
         ChannelException x = null;
         Object[] members = nioSenders.keySet().toArray();
@@ -255,24 +257,24 @@
         }
         if ( x != null ) throw x;
     }
-    
+
     public void add(Member member) {
-        
+
     }
-    
+
     public void remove(Member member) {
         //disconnect senders
         NioSender sender = (NioSender)nioSenders.remove(member);
         if ( sender != null ) sender.disconnect();
     }
 
-    
+
     public synchronized void disconnect() {
         setConnected(false);
         try {close(); }catch (Exception x){}
-        
+
     }
-    
+
     public void finalize() {
         try {disconnect(); }catch ( Exception ignore){}
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to