Added: 
tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java
URL: 
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java?rev=640855&view=auto
==============================================================================
--- 
tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java
 (added)
+++ 
tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioBlockingSelector.java
 Tue Mar 25 08:23:12 2008
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.MutableInteger;
+import org.apache.tomcat.util.net.NioEndpoint.KeyAttachment;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NioBlockingSelector {
+    
+    protected static Log log = LogFactory.getLog(NioBlockingSelector.class);
+    
+    private static int threadCounter = 0;
+    
+    protected Selector sharedSelector;
+    
+    protected BlockPoller poller;
+    public NioBlockingSelector() {
+        
+    }
+    
+    public void open(Selector selector) {
+        sharedSelector = selector;
+        poller = new BlockPoller();
+        poller.selector = sharedSelector;
+        poller.setDaemon(true);
+        poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter));
+        poller.start();
+    }
+    
+    public void close() {
+        if (poller!=null) {
+            poller.disable();
+            poller.interrupt();
+            poller = null;
+        }
+    }
+
+    /**
+     * Performs a blocking write using the bytebuffer for data to be written
+     * If the <code>selector</code> parameter is null, then it will perform a 
busy write that could
+     * take up a lot of CPU cycles.
+     * @param buf ByteBuffer - the buffer containing the data, we will write 
as long as <code>(buf.hasRemaining()==true)</code>
+     * @param socket SocketChannel - the socket to write data to
+     * @param writeTimeout long - the timeout for this write operation in 
milliseconds, -1 means no timeout
+     * @return int - returns the number of bytes written
+     * @throws EOFException if write returns -1
+     * @throws SocketTimeoutException if the write times out
+     * @throws IOException if an IO Exception occurs in the underlying socket 
logic
+     */
+    public int write(ByteBuffer buf, NioChannel socket, long 
writeTimeout,MutableInteger lastWrite) throws IOException {
+        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+        if ( key == null ) throw new IOException("Key no longer registered");
+        KeyAttachment att = (KeyAttachment) key.attachment();
+        int written = 0;
+        boolean timedout = false;
+        int keycount = 1; //assume we can write
+        long time = System.currentTimeMillis(); //start the timeout timer
+        try {
+            while ( (!timedout) && buf.hasRemaining()) {
+                if (keycount > 0) { //only write if we were registered for a 
write
+                    int cnt = socket.write(buf); //write the data
+                    lastWrite.set(cnt);
+                    if (cnt == -1)
+                        throw new EOFException();
+                    written += cnt;
+                    if (cnt > 0) {
+                        time = System.currentTimeMillis(); //reset our timeout 
timer
+                        continue; //we successfully wrote, try again without a 
selector
+                    }
+                }
+                try {
+                    if ( att.getWriteLatch()==null || 
att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
+                    poller.add(att,SelectionKey.OP_WRITE);
+                    att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
+                }catch (InterruptedException ignore) {
+                    Thread.interrupted();
+                }
+                if ( att.getWriteLatch()!=null && 
att.getWriteLatch().getCount()> 0) {
+                    //we got interrupted, but we haven't received notification 
from the poller.
+                    keycount = 0;
+                }else {
+                    //latch countdown has happened
+                    keycount = 1;
+                    att.resetWriteLatch();
+                }
+
+                if (writeTimeout > 0 && (keycount == 0))
+                    timedout = (System.currentTimeMillis() - time) >= 
writeTimeout;
+            } //while
+            if (timedout) 
+                throw new SocketTimeoutException();
+        } finally {
+            poller.remove(att,SelectionKey.OP_WRITE);
+            if (timedout && key != null) {
+                poller.cancelKey(socket, key);
+            }
+        }
+        return written;
+    }
+
+    /**
+     * Performs a blocking read using the bytebuffer for data to be read
+     * If the <code>selector</code> parameter is null, then it will perform a 
busy read that could
+     * take up a lot of CPU cycles.
+     * @param buf ByteBuffer - the buffer containing the data, we will read as 
until we have read at least one byte or we timed out
+     * @param socket SocketChannel - the socket to write data to
+     * @param selector Selector - the selector to use for blocking, if null 
then a busy read will be initiated
+     * @param readTimeout long - the timeout for this read operation in 
milliseconds, -1 means no timeout
+     * @return int - returns the number of bytes read
+     * @throws EOFException if read returns -1
+     * @throws SocketTimeoutException if the read times out
+     * @throws IOException if an IO Exception occurs in the underlying socket 
logic
+     */
+    public int read(ByteBuffer buf, NioChannel socket, long readTimeout) 
throws IOException {
+        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+        if ( key == null ) throw new IOException("Key no longer registered");
+        KeyAttachment att = (KeyAttachment) key.attachment();
+        int read = 0;
+        boolean timedout = false;
+        int keycount = 1; //assume we can write
+        long time = System.currentTimeMillis(); //start the timeout timer
+        try {
+            while ( (!timedout) && read == 0) {
+                if (keycount > 0) { //only read if we were registered for a 
read
+                    int cnt = socket.read(buf);
+                    if (cnt == -1)
+                        throw new EOFException();
+                    read += cnt;
+                    if (cnt > 0)
+                        break;
+                }
+                try {
+                    if ( att.getReadLatch()==null || 
att.getReadLatch().getCount()==0) att.startReadLatch(1);
+                    poller.add(att,SelectionKey.OP_READ);
+                    att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
+                }catch (InterruptedException ignore) {
+                    Thread.interrupted();
+                }
+                if ( att.getReadLatch()!=null && 
att.getReadLatch().getCount()> 0) {
+                    //we got interrupted, but we haven't received notification 
from the poller.
+                    keycount = 0;
+                }else {
+                    //latch countdown has happened
+                    keycount = 1;
+                    att.resetReadLatch();
+                }
+                if (readTimeout > 0 && (keycount == 0))
+                    timedout = (System.currentTimeMillis() - time) >= 
readTimeout;
+            } //while
+            if (timedout)
+                throw new SocketTimeoutException();
+        } finally {
+            poller.remove(att,SelectionKey.OP_READ);
+            if (timedout && key != null) {
+                poller.cancelKey(socket,key);
+            }
+        }
+        return read;
+    }
+
+    
+    protected class BlockPoller extends Thread {
+        protected boolean run = true;
+        protected Selector selector = null;
+        protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();
+        public void disable() { run = false; selector.wakeup();}
+        protected AtomicInteger wakeupCounter = new AtomicInteger(0);
+        public void cancelKey(final NioChannel socket, final SelectionKey key) 
{
+            Runnable r = new Runnable() {
+                public void run() {
+                    key.cancel();
+                }
+            };
+            events.offer(r);
+            wakeup();
+        }
+
+        public void wakeup() {
+            if (wakeupCounter.addAndGet(1)==0) selector.wakeup();
+        }
+
+        public void cancel(SelectionKey sk, KeyAttachment key, int ops){
+            if (sk!=null) {
+                sk.cancel();
+                sk.attach(null);
+                if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) 
countDown(key.getWriteLatch());
+                if 
(SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+            }
+        }
+        
+        public void add(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    if ( key == null ) return;
+                    NioChannel nch = key.getChannel();
+                    if ( nch == null ) return;
+                    SocketChannel ch = nch.getIOChannel();
+                    if ( ch == null ) return;
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            sk = ch.register(selector, ops, key);
+                        } else {
+                            sk.interestOps(sk.interestOps() | ops);
+                        }
+                    }catch (CancelledKeyException cx) {
+                        cancel(sk,key,ops);
+                    }catch (ClosedChannelException cx) {
+                        cancel(sk,key,ops);
+                    }
+                }
+            };
+            events.offer(r);
+            wakeup();
+        }
+        
+        public void remove(final KeyAttachment key, final int ops) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    if ( key == null ) return;
+                    NioChannel nch = key.getChannel();
+                    if ( nch == null ) return;
+                    SocketChannel ch = nch.getIOChannel();
+                    if ( ch == null ) return;
+                    SelectionKey sk = ch.keyFor(selector);
+                    try {
+                        if (sk == null) {
+                            if 
(SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) 
countDown(key.getWriteLatch());
+                            if 
(SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                        } else {
+                            sk.interestOps(sk.interestOps() & (~ops));
+                            if 
(SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) 
countDown(key.getWriteLatch());
+                            if 
(SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch());
+                            if (sk.interestOps()==0) {
+                                sk.cancel();
+                                sk.attach(null);
+                            }
+                        }
+                    }catch (CancelledKeyException cx) {
+                        if (sk!=null) {
+                            sk.cancel();
+                            sk.attach(null);
+                        }
+                    }
+                }
+            };
+            events.offer(r);
+            wakeup();
+        }
+
+
+        public boolean events() {
+            boolean result = false;
+            Runnable r = null;
+            result = (events.size() > 0);
+            while ( (r = (Runnable)events.poll()) != null ) {
+                r.run();
+                result = true;
+            }
+            return result;
+        }
+
+        public void run() {
+            while (run) {
+                try {
+                    events();
+                    int keyCount = 0;
+                    try {
+                        int i = wakeupCounter.get();
+                        if (i>0) 
+                            keyCount = selector.selectNow();
+                        else {
+                            wakeupCounter.set(-1);
+                            keyCount = selector.select(1000);
+                        }
+                        wakeupCounter.set(0);
+                        if (!run) break;
+                    }catch ( NullPointerException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        if (selector==null) throw x;
+                        if ( log.isDebugEnabled() ) log.debug("Possibly 
encountered sun bug 5076772 on windows JDK 1.5",x);
+                        continue;
+                    } catch ( CancelledKeyException x ) {
+                        //sun bug 5076772 on windows JDK 1.5
+                        if ( log.isDebugEnabled() ) log.debug("Possibly 
encountered sun bug 5076772 on windows JDK 1.5",x);
+                        continue;
+                    } catch (Throwable x) {
+                        log.error("",x);
+                        continue;
+                    }
+
+                    Iterator iterator = keyCount > 0 ? 
selector.selectedKeys().iterator() : null;
+
+                    // Walk through the collection of ready keys and dispatch
+                    // any active event.
+                    while (run && iterator != null && iterator.hasNext()) {
+                        SelectionKey sk = (SelectionKey) iterator.next();
+                        KeyAttachment attachment = 
(KeyAttachment)sk.attachment();
+                        try {
+                            attachment.access();
+                            iterator.remove(); ;
+                            sk.interestOps(sk.interestOps() & 
(~sk.readyOps()));
+                            if ( sk.isReadable() ) {
+                                countDown(attachment.getReadLatch());
+                            }
+                            if (sk.isWritable()) {
+                                countDown(attachment.getWriteLatch());
+                            }
+                        }catch (CancelledKeyException ckx) {
+                            if (sk!=null) sk.cancel();
+                            countDown(attachment.getReadLatch());
+                            countDown(attachment.getWriteLatch());
+                        }
+                    }//while
+                }catch ( Throwable t ) {
+                    log.error("",t);
+                }
+            }
+            events.clear();
+            try {
+                selector.selectNow();//cancel all remaining keys
+            }catch( Exception ignore ) {
+                if (log.isDebugEnabled())log.debug("",ignore);
+            }
+        }
+        
+        public void countDown(CountDownLatch latch) {
+            if ( latch == null ) return;
+            latch.countDown();
+        }
+        
+        
+        
+    }
+
+}
\ No newline at end of file

Added: 
tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java?rev=640855&view=auto
==============================================================================
--- 
tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java
 (added)
+++ 
tomcat/sandbox/tomcat-lite/coyote-nio/org/apache/tomcat/util/net/NioChannel.java
 Tue Mar 25 08:23:12 2008
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.tomcat.util.net.NioEndpoint.Poller;
+import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import org.apache.tomcat.util.MutableInteger;
+
+/**
+ * 
+ * Base class for a SocketChannel wrapper used by the endpoint.
+ * This way, logic for a SSL socket channel remains the same as for
+ * a non SSL, making sure we don't need to code for any exception cases.
+ * 
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class NioChannel implements ByteChannel{
+
+    protected static ByteBuffer emptyBuf = ByteBuffer.allocate(0);
+
+    protected SocketChannel sc = null;
+
+    protected ApplicationBufferHandler bufHandler;
+
+    protected Poller poller;
+
+    public NioChannel(SocketChannel channel, ApplicationBufferHandler 
bufHandler) throws IOException {
+        this.sc = channel;
+        this.bufHandler = bufHandler;
+    }
+
+    public void reset() throws IOException {
+        bufHandler.getReadBuffer().clear();
+        bufHandler.getWriteBuffer().clear();
+    }
+    
+    public int getBufferSize() {
+        if ( bufHandler == null ) return 0;
+        int size = 0;
+        size += 
bufHandler.getReadBuffer()!=null?bufHandler.getReadBuffer().capacity():0;
+        size += 
bufHandler.getWriteBuffer()!=null?bufHandler.getWriteBuffer().capacity():0;
+        return size;
+    }
+
+    /**
+     * returns true if the network buffer has 
+     * been flushed out and is empty
+     * @return boolean
+     */
+    public boolean flush(boolean block, Selector s, long 
timeout,MutableInteger lastWrite) throws IOException {
+        if (lastWrite!=null) lastWrite.set(1);
+        return true; //no network buffer in the regular channel
+    }
+
+
+    /**
+     * Closes this channel.
+     *
+     * @throws IOException If an I/O error occurs
+     * @todo Implement this java.nio.channels.Channel method
+     */
+    public void close() throws IOException {
+        getIOChannel().socket().close();
+        getIOChannel().close();
+    }
+
+    public void close(boolean force) throws IOException {
+        if (isOpen() || force ) close();
+    }
+    /**
+     * Tells whether or not this channel is open.
+     *
+     * @return <tt>true</tt> if, and only if, this channel is open
+     * @todo Implement this java.nio.channels.Channel method
+     */
+    public boolean isOpen() {
+        return sc.isOpen();
+    }
+
+    /**
+     * Writes a sequence of bytes to this channel from the given buffer.
+     *
+     * @param src The buffer from which bytes are to be retrieved
+     * @return The number of bytes written, possibly zero
+     * @throws IOException If some other I/O error occurs
+     * @todo Implement this java.nio.channels.WritableByteChannel method
+     */
+    public int write(ByteBuffer src) throws IOException {
+        return sc.write(src);
+    }
+
+    /**
+     * Reads a sequence of bytes from this channel into the given buffer.
+     *
+     * @param dst The buffer into which bytes are to be transferred
+     * @return The number of bytes read, possibly zero, or <tt>-1</tt> if the 
channel has reached end-of-stream
+     * @throws IOException If some other I/O error occurs
+     * @todo Implement this java.nio.channels.ReadableByteChannel method
+     */
+    public int read(ByteBuffer dst) throws IOException {
+        return sc.read(dst);
+    }
+
+    public Object getAttachment(boolean remove) {
+        Poller pol = getPoller();
+        Selector sel = pol!=null?pol.getSelector():null;
+        SelectionKey key = sel!=null?getIOChannel().keyFor(sel):null;
+        Object att = key!=null?key.attachment():null;
+        if (key != null && att != null && remove ) key.attach(null);
+        return att;
+    }
+    /**
+     * getBufHandler
+     *
+     * @return ApplicationBufferHandler
+     * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method
+     */
+    public ApplicationBufferHandler getBufHandler() {
+        return bufHandler;
+    }
+
+    public Poller getPoller() {
+        return poller;
+    }
+    /**
+     * getIOChannel
+     *
+     * @return SocketChannel
+     * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method
+     */
+    public SocketChannel getIOChannel() {
+        return sc;
+    }
+
+    /**
+     * isClosing
+     *
+     * @return boolean
+     * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method
+     */
+    public boolean isClosing() {
+        return false;
+    }
+
+    /**
+     * isInitHandshakeComplete
+     *
+     * @return boolean
+     * @todo Implement this org.apache.tomcat.util.net.SecureNioChannel method
+     */
+    public boolean isInitHandshakeComplete() {
+        return true;
+    }
+
+    public int handshake(boolean read, boolean write) throws IOException {
+        return 0;
+    }
+
+    public void setPoller(Poller poller) {
+        this.poller = poller;
+    }
+
+    public void setIOChannel(SocketChannel IOChannel) {
+        this.sc = IOChannel;
+    }
+
+    public String toString() {
+        return super.toString()+":"+this.sc.toString();
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to