Author: fhanik Date: Mon Feb 18 16:57:54 2008 New Revision: 628940 URL: http://svn.apache.org/viewvc?rev=628940&view=rev Log: more UDP code
Modified: tomcat/trunk/java/org/apache/catalina/tribes/Member.java tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/Member.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/Member.java?rev=628940&r1=628939&r2=628940&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/Member.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/Member.java Mon Feb 18 16:57:54 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,7 +24,7 @@ * The host is what interface the member is listening to, to receive data<br> * The port is what port the member is listening to, to receive data<br> * The uniqueId defines the session id for the member. This is an important feature - * since a member that has crashed and the starts up again on the same port/host is + * since a member that has crashed and the starts up again on the same port/host is * not guaranteed to be the same member, so no state transfers will ever be confused * @author Filip Hanik * @version $Revision$, $Date$ @@ -32,18 +32,18 @@ public interface Member { - + /** * When a member leaves the cluster, the payload of the memberDisappeared member * will be the following bytes. This indicates a soft shutdown, and not a crash */ public static final byte[] SHUTDOWN_PAYLOAD = new byte[] {66, 65, 66, 89, 45, 65, 76, 69, 88}; - + /** * Returns the name of this node, should be unique within the group. */ public String getName(); - + /** * Returns the listen host for the ChannelReceiver implementation * @return IPv4 or IPv6 representation of the host address this member listens to incoming data @@ -57,7 +57,7 @@ * @see ChannelReceiver */ public int getPort(); - + /** * Returns the secure listen port for the ChannelReceiver implementation. * Returns -1 if its not listening to a secure port. @@ -65,7 +65,13 @@ * @see ChannelReceiver */ public int getSecurePort(); - + + /** + * Returns the UDP port that this member is listening to for UDP messages. + * @return the listen UDP port for this member, -1 if its not listening on a UDP port + */ + public int getUdpPort(); + /** * Contains information on how long this member has been online. @@ -74,7 +80,7 @@ * @return nr of milliseconds since this member started. */ public long getMemberAliveTime(); - + /** * The current state of the member * @return boolean - true if the member is functioning correctly @@ -85,32 +91,32 @@ * @return boolean - true if the member is suspect, but the crash has not been confirmed */ public boolean isSuspect(); - + /** - * - * @return boolean - true if the member has been confirmed to malfunction + * + * @return boolean - true if the member has been confirmed to malfunction */ public boolean isFailing(); - + /** * returns a UUID unique for this member over all sessions. * If the member crashes and restarts, the uniqueId will be different. * @return byte[] */ public byte[] getUniqueId(); - + /** * returns the payload associated with this member * @return byte[] */ public byte[] getPayload(); - + /** * returns the command associated with this member * @return byte[] */ public byte[] getCommand(); - + /** * Domain for this cluster * @return byte[] Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=628940&r1=628939&r2=628940&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java Mon Feb 18 16:57:54 2008 @@ -116,7 +116,8 @@ */ public boolean keepalive() { boolean disconnect = false; - if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect = true; + if (isUdpBased()) disconnect = true; //always disconnect UDP, TODO optimize the keepalive handling + else if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect = true; else if ( keepAliveTime >= 0 && (System.currentTimeMillis()-connectTime)>keepAliveTime ) disconnect = true; if ( disconnect ) disconnect(); return disconnect; @@ -299,6 +300,7 @@ this.destination = destination; this.address = InetAddress.getByAddress(destination.getHost()); this.port = destination.getPort(); + this.udpPort = destination.getUdpPort(); } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=628940&r1=628939&r2=628940&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Mon Feb 18 16:57:54 2008 @@ -17,6 +17,7 @@ package org.apache.catalina.tribes.transport; import java.io.IOException; +import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -220,6 +221,38 @@ } return retries; } + + /** + * Same as bind() except it does it for the UDP port + * @param socket + * @param portstart + * @param retries + * @return + * @throws IOException + */ + protected int bindUdp(DatagramSocket socket, int portstart, int retries) throws IOException { + InetSocketAddress addr = null; + while ( retries > 0 ) { + try { + addr = new InetSocketAddress(getBind(), portstart); + socket.bind(addr); + setUdpPort(portstart); + log.info("UDP Receiver Server Socket bound to:"+addr); + return 0; + }catch ( IOException x) { + retries--; + if ( retries <= 0 ) { + log.info("Unable to bind UDP socket to:"+addr+" throwing error."); + throw x; + } + portstart++; + try {Thread.sleep(25);}catch( InterruptedException ti){Thread.currentThread().interrupted();} + retries = bindUdp(socket,portstart,retries); + } + } + return retries; + } + public void messageDataReceived(ChannelMessage data) { if ( this.listener != null ) { Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=628940&r1=628939&r2=628940&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Mon Feb 18 16:57:54 2008 @@ -144,10 +144,7 @@ datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); //bind to the address to avoid security checks - InetSocketAddress daddr = new InetSocketAddress(getBind(),getUdpPort()); - //TODO should we auto increment the UDP port to avoid collisions? - //we could auto increment with the offset from the tcp listen port - datagramChannel.connect(daddr); + bindUdp(datagramChannel.socket(),getUdpPort(),getAutoBind()); } @@ -188,7 +185,10 @@ } key.cancel(); key.attach(null); - try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } + if (key.channel() instanceof SocketChannel) + try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } + if (key.channel() instanceof DatagramChannel) + try { ((DatagramChannel)key.channel()).socket().close(); } catch (Exception e) { if (log.isDebugEnabled()) log.debug("", e); } try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); } } @@ -249,7 +249,7 @@ setListen(true); if (selector!=null && datagramChannel!=null) { - ObjectReader oreader = new ObjectReader(1024*65); + ObjectReader oreader = new ObjectReader(65535); //max size for a datagram packet registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=628940&r1=628939&r2=628940&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Mon Feb 18 16:57:54 2008 @@ -17,6 +17,7 @@ package org.apache.catalina.tribes.transport.nio; import java.io.IOException; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.ReadableByteChannel; @@ -147,19 +148,33 @@ reader.setLastAccess(System.currentTimeMillis()); reader.access(); ReadableByteChannel channel = (ReadableByteChannel) key.channel(); - int count; + int count=-1; buffer.clear(); // make buffer empty + SocketAddress saddr = null; - // loop while data available, channel is non-blocking - while ((count = channel.read (buffer)) > 0) { - buffer.flip(); // make buffer readable + if (channel instanceof SocketChannel) { + // loop while data available, channel is non-blocking + while ((count = channel.read (buffer)) > 0) { + buffer.flip(); // make buffer readable + if ( buffer.hasArray() ) + reader.append(buffer.array(),0,count,false); + else + reader.append(buffer,count,false); + buffer.clear(); // make buffer empty + //do we have at least one package? + if ( reader.hasPackage() ) break; + } + } else if (channel instanceof DatagramChannel) { + DatagramChannel dchannel = (DatagramChannel)channel; + saddr = dchannel.receive(buffer); + buffer.flip(); // make buffer readable if ( buffer.hasArray() ) - reader.append(buffer.array(),0,count,false); + reader.append(buffer.array(),0,buffer.limit()-buffer.position(),false); else - reader.append(buffer,count,false); - buffer.clear(); // make buffer empty - //do we have at least one package? - if ( reader.hasPackage() ) break; + reader.append(buffer,buffer.limit()-buffer.position(),false); + buffer.clear(); // make buffer empty + //did we get a package + count = reader.hasPackage()?1:-1; } int pkgcnt = reader.count(); @@ -180,7 +195,7 @@ * server before completing the request * This is considered an asynchronized request */ - if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND); + if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr); try { if ( Logs.MESSAGES.isTraceEnabled() ) { try { @@ -194,13 +209,13 @@ * server before sending the ack to the remote server * This is considered a synchronized request */ - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.ACK_COMMAND,saddr); }catch ( RemoteProcessException e ) { if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr); }catch ( Exception e ) { log.error("Processing of cluster message failed.",e); - if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND); + if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,(WritableByteChannel)channel,Constants.FAIL_ACK_COMMAND,saddr); } if ( getUseBufferPool() ) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); @@ -275,17 +290,25 @@ /** - * send a reply-acknowledgement (6,2,3) + * send a reply-acknowledgement (6,2,3), sends it doing a busy write, the ACK is so small + * that it should always go to the buffer * @param key * @param channel */ - protected void sendAck(SelectionKey key, WritableByteChannel channel, byte[] command) { - + protected void sendAck(SelectionKey key, WritableByteChannel channel, byte[] command, SocketAddress udpaddr) { try { + ByteBuffer buf = ByteBuffer.wrap(command); int total = 0; - while ( total < command.length ) { - total += channel.write(buf); + if (channel instanceof DatagramChannel) { + DatagramChannel dchannel = (DatagramChannel)channel; + while ( total < command.length ) { + total += dchannel.send(buf, udpaddr); + } + } else { + while ( total < command.length ) { + total += channel.write(buf); + } } if (log.isTraceEnabled()) { log.trace("ACK sent to " + Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=628940&r1=628939&r2=628940&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java Mon Feb 18 16:57:54 2008 @@ -154,7 +154,7 @@ protected boolean read(SelectionKey key) throws IOException { //if there is no message here, we are done if ( current == null ) return true; - int read = socketChannel.read(readbuf); + int read = isUdpBased()?dataChannel.read(readbuf) : socketChannel.read(readbuf); //end of stream if ( read == -1 ) throw new IOException("Unable to receive an ack message. EOF on socket channel has been reached."); //no data read @@ -175,14 +175,14 @@ protected boolean write(SelectionKey key) throws IOException { - if ( (!isConnected()) || (this.socketChannel==null)) { + if ( (!isConnected()) || (this.socketChannel==null && this.dataChannel==null)) { throw new IOException("NioSender is not connected, this should not occur."); } if ( current != null ) { if ( remaining > 0 ) { //weve written everything, or we are starting a new package //protect against buffer overwrite - int byteswritten = socketChannel.write(writebuf); + int byteswritten = isUdpBased()?dataChannel.write(writebuf) : socketChannel.write(writebuf); if (byteswritten == -1 ) throw new EOFException(); remaining -= byteswritten; //if the entire message was written from the buffer @@ -204,7 +204,7 @@ * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method */ public synchronized void connect() throws IOException { - if ( connecting ) return; + if ( connecting || isConnected()) return; connecting = true; if ( isConnected() ) throw new IOException("NioSender is already in connected state."); if ( readbuf == null ) { @@ -218,15 +218,23 @@ writebuf.clear(); } - InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); - if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); - socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); - if ( socketChannel.connect(addr) ) { - completeConnect(); - socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + if (isUdpBased()) { + InetSocketAddress daddr = new InetSocketAddress(getAddress(),getUdpPort()); + if ( dataChannel != null ) throw new IOException("Datagram channel has already been established. Connection might be in progress."); + dataChannel = DatagramChannel.open(); + dataChannel.configureBlocking(false); + dataChannel.connect(daddr); } else { - socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this); + InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); + if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); + socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + if ( socketChannel.connect(addr) ) { + completeConnect(); + socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + } else { + socketChannel.register(getSelector(), SelectionKey.OP_CONNECT, this); + } } } @@ -252,6 +260,18 @@ socketChannel = null; } } + if ( dataChannel != null ) { + try { + try {dataChannel.socket().close();}catch ( Exception x){} + //error free close, all the way + //try {socket.shutdownOutput();}catch ( Exception x){} + //try {socket.shutdownInput();}catch ( Exception x){} + //try {socket.close();}catch ( Exception x){} + try {dataChannel.close();}catch ( Exception x){} + }finally { + dataChannel = null; + } + } } catch ( Exception x ) { log.error("Unable to disconnect NioSender. msg="+x.getMessage()); if ( log.isDebugEnabled() ) log.debug("Unable to disconnect NioSender. msg="+x.getMessage(),x); @@ -273,6 +293,7 @@ setAttempt(0); setRequestCount(0); setConnectTime(-1); + setUdpBased(false); } private ByteBuffer getReadBuffer() { @@ -312,7 +333,10 @@ //writebuf.limit(length); writebuf.flip(); if (isConnected()) { - socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + if (isUdpBased()) + dataChannel.register(getSelector(), SelectionKey.OP_WRITE, this); + else + socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=628940&r1=628939&r2=628940&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Mon Feb 18 16:57:54 2008 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -48,7 +48,7 @@ * @version 1.0 */ public class ParallelNioSender extends AbstractSender implements MultiPointSender { - + protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ParallelNioSender.class); protected long selectTimeout = 5000; //default 5 seconds, same as send timeout protected Selector selector; @@ -58,15 +58,16 @@ selector = Selector.open(); setConnected(true); } - - + + public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException { long start = System.currentTimeMillis(); + this.setUdpBased((msg.getOptions()&Channel.SEND_OPTIONS_UDP) == Channel.SEND_OPTIONS_UDP); byte[] data = XByteBuffer.createDataPackage((ChannelData)msg); NioSender[] senders = setupForSend(destination); connect(senders); setData(senders,data); - + int remaining = senders.length; ChannelException cx = null; try { @@ -108,17 +109,17 @@ if ( x instanceof ChannelException ) throw (ChannelException)x; else throw new ChannelException(x); } - + } - + private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) throws IOException, ChannelException { int completed = 0; int selectedKeys = selector.select(selectTimeOut); - + if (selectedKeys == 0) { return 0; } - + Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = (SelectionKey) it.next(); @@ -140,16 +141,16 @@ int attempt = sender.getAttempt()+1; boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0); synchronized (state) { - + //sk.cancel(); if (state.isSuspect()) state.setFailing(); if (state.isReady()) { state.setSuspect(); - if ( retry ) + if ( retry ) log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying."); - else + else log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x); - } + } } if ( !isConnected() ) { log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected."); @@ -157,11 +158,11 @@ cx.addFaultyMember(sender.getDestination(),x); throw cx; } - + byte[] data = sender.getMessage(); if ( retry ) { - try { - sender.disconnect(); + try { + sender.disconnect(); sender.connect(); sender.setAttempt(attempt); sender.setMessage(data); @@ -178,12 +179,12 @@ return completed; } - + private void connect(NioSender[] senders) throws ChannelException { ChannelException x = null; for (int i=0; i<senders.length; i++ ) { try { - if (!senders[i].isConnected()) senders[i].connect(); + senders[i].connect(); }catch ( IOException io ) { if ( x==null ) x = new ChannelException(io); x.addFaultyMember(senders[i].getDestination(),io); @@ -191,7 +192,7 @@ } if ( x != null ) throw x; } - + private void setData(NioSender[] senders, byte[] data) throws ChannelException { ChannelException x = null; for (int i=0; i<senders.length; i++ ) { @@ -204,8 +205,8 @@ } if ( x != null ) throw x; } - - + + private NioSender[] setupForSend(Member[] destination) throws ChannelException { ChannelException cx = null; NioSender[] result = new NioSender[destination.length]; @@ -222,6 +223,7 @@ sender.reset(); sender.setDestination(destination[i]); sender.setSelector(selector); + sender.setUdpBased(isUdpBased()); result[i] = sender; } }catch ( UnknownHostException x ) { @@ -232,13 +234,13 @@ if ( cx != null ) throw cx; else return result; } - + public void connect() { //do nothing, we connect on demand setConnected(true); } - - + + private synchronized void close() throws ChannelException { ChannelException x = null; Object[] members = nioSenders.keySet().toArray(); @@ -255,24 +257,24 @@ } if ( x != null ) throw x; } - + public void add(Member member) { - + } - + public void remove(Member member) { //disconnect senders NioSender sender = (NioSender)nioSenders.remove(member); if ( sender != null ) sender.disconnect(); } - + public synchronized void disconnect() { setConnected(false); try {close(); }catch (Exception x){} - + } - + public void finalize() { try {disconnect(); }catch ( Exception ignore){} } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]