Added: tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java?rev=640855&view=auto ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java (added) +++ tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java Tue Mar 25 08:23:12 2008 @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import java.io.EOFException; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.MutableInteger; +import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment; +import java.util.concurrent.atomic.AtomicInteger; + +public class NioBlockingSelector { + + protected static Log log = LogFactory.getLog(NioBlockingSelector.class); + + private static int threadCounter = 0; + + protected Selector sharedSelector; + + protected BlockPoller poller; + public NioBlockingSelector() { + + } + + public void open(Selector selector) { + sharedSelector = selector; + poller = new BlockPoller(); + poller.selector = sharedSelector; + poller.setDaemon(true); + poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter)); + poller.start(); + } + + public void close() { + if (poller!=null) { + poller.disable(); + poller.interrupt(); + poller = null; + } + } + + /** + * Performs a blocking write using the bytebuffer for data to be written + * If the <code>selector</code> parameter is null, then it will perform a busy write that could + * take up a lot of CPU cycles. + * @param buf ByteBuffer - the buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code> + * @param socket SocketChannel - the socket to write data to + * @param writeTimeout long - the timeout for this write operation in milliseconds, -1 means no timeout + * @return int - returns the number of bytes written + * @throws EOFException if write returns -1 + * @throws SocketTimeoutException if the write times out + * @throws IOException if an IO Exception occurs in the underlying socket logic + */ + public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + if ( key == null ) throw new IOException("Key no longer registered"); + KeyAttachment att = (KeyAttachment) key.attachment(); + int written = 0; + boolean timedout = false; + int keycount = 1; //assume we can write + long time = System.currentTimeMillis(); //start the timeout timer + try { + while ( (!timedout) && buf.hasRemaining()) { + if (keycount > 0) { //only write if we were registered for a write + int cnt = socket.write(buf); //write the data + lastWrite.set(cnt); + if (cnt == -1) + throw new EOFException(); + written += cnt; + if (cnt > 0) { + time = System.currentTimeMillis(); //reset our timeout timer + continue; //we successfully wrote, try again without a selector + } + } + try { + if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); + poller.add(att,SelectionKey.OP_WRITE); + att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); + }catch (InterruptedException ignore) { + Thread.interrupted(); + } + if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) { + //we got interrupted, but we haven't received notification from the poller. + keycount = 0; + }else { + //latch countdown has happened + keycount = 1; + att.resetWriteLatch(); + } + + if (writeTimeout > 0 && (keycount == 0)) + timedout = (System.currentTimeMillis() - time) >= writeTimeout; + } //while + if (timedout) + throw new SocketTimeoutException(); + } finally { + poller.remove(att,SelectionKey.OP_WRITE); + if (timedout && key != null) { + poller.cancelKey(socket, key); + } + } + return written; + } + + /** + * Performs a blocking read using the bytebuffer for data to be read + * If the <code>selector</code> parameter is null, then it will perform a busy read that could + * take up a lot of CPU cycles. + * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out + * @param socket SocketChannel - the socket to write data to + * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated + * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout + * @return int - returns the number of bytes read + * @throws EOFException if read returns -1 + * @throws SocketTimeoutException if the read times out + * @throws IOException if an IO Exception occurs in the underlying socket logic + */ + public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException { + SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); + if ( key == null ) throw new IOException("Key no longer registered"); + KeyAttachment att = (KeyAttachment) key.attachment(); + int read = 0; + boolean timedout = false; + int keycount = 1; //assume we can write + long time = System.currentTimeMillis(); //start the timeout timer + try { + while ( (!timedout) && read == 0) { + if (keycount > 0) { //only read if we were registered for a read + int cnt = socket.read(buf); + if (cnt == -1) + throw new EOFException(); + read += cnt; + if (cnt > 0) + break; + } + try { + if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1); + poller.add(att,SelectionKey.OP_READ); + att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS); + }catch (InterruptedException ignore) { + Thread.interrupted(); + } + if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) { + //we got interrupted, but we haven't received notification from the poller. + keycount = 0; + }else { + //latch countdown has happened + keycount = 1; + att.resetReadLatch(); + } + if (readTimeout > 0 && (keycount == 0)) + timedout = (System.currentTimeMillis() - time) >= readTimeout; + } //while + if (timedout) + throw new SocketTimeoutException(); + } finally { + poller.remove(att,SelectionKey.OP_READ); + if (timedout && key != null) { + poller.cancelKey(socket,key); + } + } + return read; + } + + + protected class BlockPoller extends Thread { + protected boolean run = true; + protected Selector selector = null; + protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue(); + public void disable() { run = false; selector.wakeup();} + protected AtomicInteger wakeupCounter = new AtomicInteger(0); + public void cancelKey(final NioChannel socket, final SelectionKey key) { + Runnable r = new Runnable() { + public void run() { + key.cancel(); + } + }; + events.offer(r); + wakeup(); + } + + public void wakeup() { + if (wakeupCounter.addAndGet(1)==0) selector.wakeup(); + } + + public void cancel(SelectionKey sk, KeyAttachment key, int ops){ + if (sk!=null) { + sk.cancel(); + sk.attach(null); + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + } + } + + public void add(final KeyAttachment key, final int ops) { + Runnable r = new Runnable() { + public void run() { + if ( key == null ) return; + NioChannel nch = key.getChannel(); + if ( nch == null ) return; + SocketChannel ch = nch.getIOChannel(); + if ( ch == null ) return; + SelectionKey sk = ch.keyFor(selector); + try { + if (sk == null) { + sk = ch.register(selector, ops, key); + } else { + sk.interestOps(sk.interestOps() | ops); + } + }catch (CancelledKeyException cx) { + cancel(sk,key,ops); + }catch (ClosedChannelException cx) { + cancel(sk,key,ops); + } + } + }; + events.offer(r); + wakeup(); + } + + public void remove(final KeyAttachment key, final int ops) { + Runnable r = new Runnable() { + public void run() { + if ( key == null ) return; + NioChannel nch = key.getChannel(); + if ( nch == null ) return; + SocketChannel ch = nch.getIOChannel(); + if ( ch == null ) return; + SelectionKey sk = ch.keyFor(selector); + try { + if (sk == null) { + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + } else { + sk.interestOps(sk.interestOps() & (~ops)); + if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); + if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); + if (sk.interestOps()==0) { + sk.cancel(); + sk.attach(null); + } + } + }catch (CancelledKeyException cx) { + if (sk!=null) { + sk.cancel(); + sk.attach(null); + } + } + } + }; + events.offer(r); + wakeup(); + } + + + public boolean events() { + boolean result = false; + Runnable r = null; + result = (events.size() > 0); + while ( (r = (Runnable)events.poll()) != null ) { + r.run(); + result = true; + } + return result; + } + + public void run() { + while (run) { + try { + events(); + int keyCount = 0; + try { + int i = wakeupCounter.get(); + if (i>0) + keyCount = selector.selectNow(); + else { + wakeupCounter.set(-1); + keyCount = selector.select(1000); + } + wakeupCounter.set(0); + if (!run) break; + }catch ( NullPointerException x ) { + //sun bug 5076772 on windows JDK 1.5 + if (selector==null) throw x; + if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); + continue; + } catch ( CancelledKeyException x ) { + //sun bug 5076772 on windows JDK 1.5 + if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x); + continue; + } catch (Throwable x) { + log.error("",x); + continue; + } + + Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; + + // Walk through the collection of ready keys and dispatch + // any active event. + while (run && iterator != null && iterator.hasNext()) { + SelectionKey sk = (SelectionKey) iterator.next(); + KeyAttachment attachment = (KeyAttachment)sk.attachment(); + try { + attachment.access(); + iterator.remove(); ; + sk.interestOps(sk.interestOps() & (~sk.readyOps())); + if ( sk.isReadable() ) { + countDown(attachment.getReadLatch()); + } + if (sk.isWritable()) { + countDown(attachment.getWriteLatch()); + } + }catch (CancelledKeyException ckx) { + if (sk!=null) sk.cancel(); + countDown(attachment.getReadLatch()); + countDown(attachment.getWriteLatch()); + } + }//while + }catch ( Throwable t ) { + log.error("",t); + } + } + events.clear(); + try { + selector.selectNow();//cancel all remaining keys + }catch( Exception ignore ) { + if (log.isDebugEnabled())log.debug("",ignore); + } + } + + public void countDown(CountDownLatch latch) { + if ( latch == null ) return; + latch.countDown(); + } + + + + } + +} \ No newline at end of file
Added: tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java?rev=640855&view=auto ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java (added) +++ tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java Tue Mar 25 08:23:12 2008 @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SocketChannel; + +import org.apache.tomcat.util.net.NioEndpoint.Poller; +import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import org.apache.tomcat.util.MutableInteger; + +/** + * + * Base class for a SocketChannel wrapper used by the endpoint. + * This way, logic for a SSL socket channel remains the same as for + * a non SSL, making sure we don't need to code for any exception cases. + * + * @author Filip Hanik + * @version 1.0 + */ +public class NioChannel implements ByteChannel{ + + protected static ByteBuffer emptyBuf = ByteBuffer.allocate(0); + + protected SocketChannel sc = null; + + protected ApplicationBufferHandler bufHandler; + + protected Poller poller; + + public NioChannel(SocketChannel channel, ApplicationBufferHandler bufHandler) throws IOException { + this.sc = channel; + this.bufHandler = bufHandler; + } + + public void reset() throws IOException { + bufHandler.getReadBuffer().clear(); + bufHandler.getWriteBuffer().clear(); + } + + public int getBufferSize() { + if ( bufHandler == null ) return 0; + int size = 0; + size += bufHandler.getReadBuffer()!=null?bufHandler.getReadBuffer().capacity():0; + size += bufHandler.getWriteBuffer()!=null?bufHandler.getWriteBuffer().capacity():0; + return size; + } + + /** + * returns true if the network buffer has + * been flushed out and is empty + * @return boolean + */ + public boolean flush(boolean block, Selector s, long timeout,MutableInteger lastWrite) throws IOException { + if (lastWrite!=null) lastWrite.set(1); + return true; //no network buffer in the regular channel + } + + + /** + * Closes this channel. + * + * @throws IOException If an I/O error occurs + * @todo Implement this java.nio.channels.Channel method + */ + public void close() throws IOException { + getIOChannel().socket().close(); + getIOChannel().close(); + } + + public void close(boolean force) throws IOException { + if (isOpen() || force ) close(); + } + /** + * Tells whether or not this channel is open. + * + * @return <tt>true</tt> if, and only if, this channel is open + * @todo Implement this java.nio.channels.Channel method + */ + public boolean isOpen() { + return sc.isOpen(); + } + + /** + * Writes a sequence of bytes to this channel from the given buffer. + * + * @param src The buffer from which bytes are to be retrieved + * @return The number of bytes written, possibly zero + * @throws IOException If some other I/O error occurs + * @todo Implement this java.nio.channels.WritableByteChannel method + */ + public int write(ByteBuffer src) throws IOException { + return sc.write(src); + } + + /** + * Reads a sequence of bytes from this channel into the given buffer. + * + * @param dst The buffer into which bytes are to be transferred + * @return The number of bytes read, possibly zero, or <tt>-1</tt> if the channel has reached end-of-stream + * @throws IOException If some other I/O error occurs + * @todo Implement this java.nio.channels.ReadableByteChannel method + */ + public int read(ByteBuffer dst) throws IOException { + return sc.read(dst); + } + + public Object getAttachment(boolean remove) { + Poller pol = getPoller(); + Selector sel = pol!=null?pol.getSelector():null; + SelectionKey key = sel!=null?getIOChannel().keyFor(sel):null; + Object att = key!=null?key.attachment():null; + if (key != null && att != null && remove ) key.attach(null); + return att; + } + /** + * getBufHandler + * + * @return ApplicationBufferHandler + * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method + */ + public ApplicationBufferHandler getBufHandler() { + return bufHandler; + } + + public Poller getPoller() { + return poller; + } + /** + * getIOChannel + * + * @return SocketChannel + * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method + */ + public SocketChannel getIOChannel() { + return sc; + } + + /** + * isClosing + * + * @return boolean + * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method + */ + public boolean isClosing() { + return false; + } + + /** + * isInitHandshakeComplete + * + * @return boolean + * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method + */ + public boolean isInitHandshakeComplete() { + return true; + } + + public int handshake(boolean read, boolean write) throws IOException { + return 0; + } + + public void setPoller(Poller poller) { + this.poller = poller; + } + + public void setIOChannel(SocketChannel IOChannel) { + this.sc = IOChannel; + } + + public String toString() { + return super.toString()+":"+this.sc.toString(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]