Author: costin Date: Wed Apr 23 10:29:58 2008 New Revision: 650945 URL: http://svn.apache.org/viewvc?rev=650945&view=rev Log: Ok, finally - the first part of the new connector, or the last part of tomcat-lite experiment :-) It is obviously quite independent of the rest of tomcat-lite, probably the last to move out of sandbox.
I have an Apr impl as well, but it's broken now, need to fix it again. This is the IO abstraction - the connector goal is to do all I/O in non-blocking mode and allow completely non-blocking adapters ( i.e. a bit more than regular coyote ). Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (with props) tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (with props) tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (with props) Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java?rev=650945&view=auto ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (added) +++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java Wed Apr 23 10:29:58 2008 @@ -0,0 +1,87 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.nio.channels.Channel; + +/** + * Notiy user code of events. All methods are called from the selector thread, + * they should not block. The reason is to allow parsing and non-blocking + * processing to be done as fast as possible, and avoid the need for + * SelectorThread to deal with a thread pool. + * + * This class wraps the Channel. For APR we wrap the socket and few other + * fields we need for non-blocking operation in a ByteChannel, the code + * seems cleaner and it's nice to be able to use APR more portably. + * ( older version used long - but non-blocking connect needs a second param ) + */ +public class SelectorCallback { + protected SelectorThread.SelectorData selectorData = new SelectorThread.SelectorData(this); + + public SelectorThread getSelector() { + return selectorData.sel; + } + + /** + * Called when the protocol is connected. + */ + public void connected(SelectorThread selThread) + throws IOException { + } + + /** + * It is possible to write data. + * For both read and write - re-enable interest if you want more data. + */ + public void dataWriteable(SelectorThread selThread) throws IOException { + } + + /** + * Data available for read. + * For both read and write - re-enable interest if you want more data. + */ + public void dataReceived(SelectorThread selThread) throws IOException { + } + + /** + * nextTimeEvent reached. + */ + public void timeEvent(SelectorThread selThread) { + } + + /** + * Close was detected, or an unhandled exception happened while processing + * this callback. + */ + public void channelClosed(SelectorThread selThread, Throwable ex) { + } + + /** + * Called on a callback created with acceptor() or inetdAcceptor() when + * a new connection is accepted. + * + * @return callback to use on the new channel. + * + * TODO: is there any case where something else besides registering read + * interest on the new connection is needed ? Maybe it could read some data ? + */ + public SelectorCallback connectionAccepted(SelectorThread selThread, + Channel sockC) { + return null; + } + +} \ No newline at end of file Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java?rev=650945&view=auto ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (added) +++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java Wed Apr 23 10:29:58 2008 @@ -0,0 +1,172 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.nio.channels.Channel; + +import org.apache.tomcat.util.buf.ByteChunk; + +/** + * Abstract NIO/APR to avoid some of the complexity and allow more code + * sharing and experiments. + * + * SelectorThread provides non-blocking methods for read/write and generates + * callbacks using SelectorCallback. It has no buffers of its own. + * + * TODO: add SSL support + * + * @author Costin Manolache + */ +public abstract class SelectorThread { + + /* Similar interfaces: + * - Apr/Nio Endpoints in coyote + * - twisted reactor + * - mina IoProcessor - also has an Apr/nio impl, ProtocolCallback->IoSession, + */ + + /** + * This is stored as the attachment in the selector. + */ + static class SelectorData { + SelectorData(SelectorCallback selectorCallback) { + this.callback = selectorCallback; + } + + // APR long is wrapped in a ByteChannel as well - with few other longs. + Channel channelData; + + SelectorThread sel; + Object selKey; + SelectorCallback callback; + + // Current interest, used internally to avoid waking up if no change + // Also used for connect and accept. + int interest; + + // True if the callback wants to be notified of read/write + boolean writeInterest; + boolean readInterest; + + /** + * If != 0 - the callback will be notified closely after this time. + * Used for timeouts. + */ + long nextTimeEvent = 0; + + // Saved to allow debug messages for bad interest/looping + int lastReadResult; + int lastWriteResult; + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("SelData: ") + .append(writeInterest ? "W/" : "") + .append(readInterest ? "R/" : "").append(selKey).append("/") + .append(channelData); + return sb.toString(); + } + } + + // ----------- IO handling ----------- + protected long inactivityTimeout = 5000; + protected Thread selectorThread; + + public boolean isSelectorThread() { + return Thread.currentThread() == selectorThread; + } + + + /** + * Close all resources, stop accepting, stop the thread. + * The actual stop will happen in background. + */ + public void stop() { + } + + /** + * This may be blocking - involves host resolution, connect. + * If the IP address is provided - it shouldn't block. + */ + public void connect(String host, int port, + SelectorCallback sc) throws IOException { + + } + + /** + * Request a timer event. The thread will generate the events at + * a configurable interval - for example no more often than 0.5 sec. + * + * @param sc + * @param nextTimer time to call the timeEvent() callback + */ + public void setTimerEventTime(SelectorCallback sc, long nextTimer) { + sc.selectorData.nextTimeEvent = nextTimer; + } + + public int readNonBlocking(SelectorCallback sc, ByteBuffer bb) + throws IOException { + return 0; + } + + public int writeNonBlocking(SelectorCallback sc, ByteBuffer reqBuf) + throws IOException { + return 0; + } + + /** + * + */ + public int close(SelectorCallback sc) throws IOException { + return 0; + } + + /** + * Create a new server socket, register the callback. + */ + public void acceptor(SelectorCallback sc, + int port, + InetAddress inet, + int backlog, + int serverTimeout) + throws IOException + { + } + + /** + * For use with daemon tools, inetd - the server socket will be created + * externally and passed as a file descriptor. This is needed to run on + * port 80 without root. + */ + public void inetdAcceptor(SelectorCallback sc) throws IOException { + } + + /** + * Change the callback associated with the socket. + */ + public void updateCallback(SelectorCallback old, SelectorCallback sc) { + } + + public void writeInterest(SelectorCallback sc, boolean writeInterest) { + } + + public void readInterest(SelectorCallback sc, boolean readInterest) { + } + +} \ No newline at end of file Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java?rev=650945&view=auto ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (added) +++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java Wed Apr 23 10:29:58 2008 @@ -0,0 +1,608 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.Channel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tomcat.util.modeler.Registry; + +/** + * NIO implementation. + * + * @author Costin Manolache + */ +public class SelectorThreadNio extends SelectorThread implements Runnable { + + static Logger log = Logger.getLogger("SelectorThreadNio"); + + Selector selector; + + ArrayList<SelectorCallback> readInterest = new ArrayList<SelectorCallback>(); + ArrayList<SelectorCallback> writeInterest = new ArrayList<SelectorCallback>(); + ArrayList<SelectorCallback> connectAcceptInterest = + new ArrayList<SelectorCallback>(); + + AtomicInteger opened = new AtomicInteger(); + AtomicInteger closed = new AtomicInteger(); + AtomicInteger loops = new AtomicInteger(); + + // actives are also stored in the Selector. This is only updated in the main + // thread + ArrayList<SelectorCallback> active = new ArrayList<SelectorCallback>(); + + boolean debug = false; + boolean running = true; + + long lastWakeup = System.currentTimeMillis(); // last time we woke + long sleepTime; + long nextWakeup; // next scheduled wakeup + + // Normally select will wait for the next time event - if it's + // too far in future, maxSleep will override it. + private long maxSleep = 1000; + + // Never sleep less than minSleep. This defines the resulution for + // time events. + private long minSleep = 100; + + boolean daemon = true; + + public SelectorThreadNio() { + this(false); + } + + public SelectorThreadNio(boolean daemon) { + try { + selectorThread = new Thread(this); + selector = Selector.open(); + // TODO: start it on-demand, close it when not in use + selectorThread.setDaemon(daemon); + this.daemon = daemon; + selectorThread.start(); + } catch(IOException e) { + throw new RuntimeException(e); + } + } + + public void setDaemon(boolean d) { + this.daemon = d; + + } + + public void setName(String n) { + selectorThread.setName(n); + Registry registry = Registry.getRegistry(null, null); + try { + registry.registerComponent(this, ":name=" + n, "SelectorThread"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Opened sockets, waiting for something ( close at least ) + */ + public int getOpened() { + return opened.get(); + } + + /** + * Closed - we're done with them. + */ + public int getClosed() { + return closed.get(); + } + + /** + * How many times we looped + */ + public int getLoops() { + return loops.get(); + } + + public long getLastWakeup() { + return lastWakeup; + } + + public long getTimeSinceLastWakeup() { + return System.currentTimeMillis() - lastWakeup; + } + + public void stop() { + running = false; + log.info("Selector thread stop " + this); + selector.wakeup(); + } + + public void run() { + log.info("Selector thread start " + this); + while (running) { + // if we want timeouts - set here. + try { + loops.incrementAndGet(); + // Check if new requests were added + processPending(); + + long now = System.currentTimeMillis(); + if (nextWakeup > 0 && nextWakeup < now) { + // We don't want to iterate on every I/O + updateSleepTimeAndProcessTimeouts(now); + } + + int selected = selector.select(sleepTime); + lastWakeup = System.currentTimeMillis(); + + // handle events for existing req first. + if (selected != 0) { + Set<SelectionKey> sel = selector.selectedKeys(); + + Iterator<SelectionKey> i = sel.iterator(); + while (i.hasNext()) { + SelectionKey sk = i.next(); + // avoid dup - disable the interest + // TODO: is this really needed ? + int readyOps = sk.readyOps(); + sk.interestOps(sk.interestOps() & ~readyOps); + // Find the request receiving the notification + SelectorData sdata = (SelectorData) sk.attachment(); + SelectorCallback cstate = sdata.callback; + if (debug) { + log.info("SelectorThread: selected " + cstate + " " + readyOps); + } + + if (sk.isValid() && sk.isAcceptable()) { + handleAccept(cstate, sk); + continue; + } + + SocketChannel sc = (SocketChannel) sk.channel(); + if (!sk.isValid()) { + if (debug) { + log.info("SelectorThread: !isValid, closed socket " + cstate); + } + close(sk, cstate, sc, null); + continue; + } + + try { + // callbacks + if (sk.isValid() && sk.isConnectable()) { + handleConnect(cstate, sc); + } + + if (sk.isValid() && sk.isWritable()) { + cstate.selectorData.lastWriteResult = 0; + cstate.dataWriteable(this); + if (cstate.selectorData.lastWriteResult > 0 && + cstate.selectorData.writeInterest) { + log.warning("SelectorThread: write interest" + + " after incomplete write"); + } + } + + if (sk.isReadable()) { + cstate.selectorData.lastReadResult = 0; + cstate.dataReceived(this); + if (cstate.selectorData.lastReadResult > 0 && + cstate.selectorData.readInterest) { + log.warning("SelectorThread: read interest" + + " after incomplete read"); + } + } + } catch (Throwable t) { + t.printStackTrace(); + close(sk, cstate, sc, t); + } + + } + // All at once + sel.clear(); + } + + } catch (Throwable e) { + log.log(Level.SEVERE, "SelectorThread: Error in select", e); + } + } // while(running) + } + + private void handleConnect(SelectorCallback cstate, SocketChannel sc) + throws IOException, SocketException { + sc.finishConnect(); + sc.socket().setSoLinger(true, 0); + cstate.connected(this); + readInterest(cstate, true); + } + + private void handleAccept(SelectorCallback cstate, SelectionKey sk) + throws IOException, ClosedChannelException { + SelectableChannel selc = sk.channel(); + ServerSocketChannel ssc=(ServerSocketChannel)selc; + SocketChannel sockC = ssc.accept(); +// if (sockC == null) { +// continue; +// } + sockC.configureBlocking(false); + SelectorCallback acb = cstate.connectionAccepted(this, sockC); + acb.selectorData.selKey = sockC.register(selector, + SelectionKey.OP_READ, + acb.selectorData); + acb.selectorData.channelData = sockC; + acb.selectorData.sel = this; + active.add(acb); + + sk.interestOps(sk.interestOps() | SelectionKey.OP_ACCEPT); + } + + public void updateCallback(SelectorCallback old, + SelectorCallback cstate) { + cstate.selectorData = old.selectorData; + cstate.selectorData.callback = cstate; + // leave old.selectorData around in case some thread is still using it + } + + public void writeInterest(SelectorCallback cstate, boolean b) { + if (b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) != 0) { + return; + } + if (!b && (cstate.selectorData.interest | SelectionKey.OP_WRITE) == 0) { + return; + } + if (Thread.currentThread() == selectorThread) { + cstate.selectorData.writeInterest = b; + if (cstate.selectorData.writeInterest) { + cstate.selectorData.interest = + cstate.selectorData.interest | SelectionKey.OP_WRITE; + } else { + cstate.selectorData.interest = + cstate.selectorData.interest & ~SelectionKey.OP_WRITE; + } + SelectionKey sk = (SelectionKey) cstate.selectorData.selKey; + sk.interestOps(cstate.selectorData.interest); + return; + } + if (!b) { + return; // can't remove interest from regular thread + } + synchronized (writeInterest) { + writeInterest.add(cstate); + } + selector.wakeup(); + } + + public void readInterest(SelectorCallback cstate, boolean b) { + if (Thread.currentThread() == selectorThread) { + cstate.selectorData.readInterest = b; + selThreadUpdateInterest(cstate); + log.info("Registering read interest"); + return; + } + if (b && (cstate.selectorData.interest | SelectionKey.OP_READ) != 0) { + return; + } + if (!b && (cstate.selectorData.interest | SelectionKey.OP_READ) == 0) { + return; + } + // Schedule the interest update. + synchronized (readInterest) { + readInterest.add(cstate); + } + log.info("Registering pending read interest"); + selector.wakeup(); + } + + + private void selThreadUpdateInterest(SelectorCallback cstate) { + SelectionKey sk = (SelectionKey) cstate.selectorData.selKey; + if (sk != null && sk.isValid()) { + if (debug) { + log.info("SelectorThread: process pending: interest " + + cstate.selectorData.interest + " for " + cstate); + } + if (cstate.selectorData.readInterest) { + cstate.selectorData.interest = + cstate.selectorData.interest | SelectionKey.OP_READ; + } else { + cstate.selectorData.interest = + cstate.selectorData.interest & ~SelectionKey.OP_READ; + } + sk.interestOps(cstate.selectorData.interest); + } + } + + private void close(SelectionKey sk, + SelectorCallback cstate, + Channel ch, + Throwable ex) { + try { + sk.cancel(); + if (ch instanceof SocketChannel) { + SocketChannel sc = (SocketChannel) ch; + if (sc.isConnected()) { + int o = opened.decrementAndGet(); + //System.err.println("Close socket, opened=" + o); + try { + sc.socket().shutdownInput(); + } catch(IOException io1) { + } + try { + sc.socket().shutdownOutput(); // TCP end to the other side + } catch(IOException io1) { + } + sc.socket().close(); + } + } + ch.close(); + closed.incrementAndGet(); + cstate.channelClosed(this, ex); + active.remove(cstate); + } catch (IOException ex2) { + log.severe("SelectorThread: Error closing socket " + ex2); + ex2.printStackTrace(); + } + } + + // --------------- Socket op abstractions ------------ + + @Override + public int readNonBlocking(SelectorCallback cstate, ByteBuffer bb) + throws IOException { + int done = ((SocketChannel) cstate.selectorData.channelData).read(bb); + cstate.selectorData.lastReadResult = done; + if (done < 0) { + if (debug) { + log.info("SelectorThread: EOF while reading"); + } + } + return done; + } + + @Override + public int writeNonBlocking(SelectorCallback cstate, ByteBuffer bb) + throws IOException { + int done = ((SocketChannel) cstate.selectorData.channelData).write(bb); + cstate.selectorData.lastWriteResult = done; + return done; + } + + /** + */ + @Override + public void connect(String host, int port, SelectorCallback cstate) + throws IOException { + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + int o = opened.incrementAndGet(); + socketChannel.connect(new InetSocketAddress(host, port)); + cstate.selectorData.channelData = socketChannel; + cstate.selectorData.interest = SelectionKey.OP_CONNECT; + synchronized (connectAcceptInterest) { + connectAcceptInterest.add(cstate); + } + selector.wakeup(); + } + + // TODO + public void configureSocket(ByteChannel ch, + boolean noDelay) throws IOException { + SocketChannel sockC = (SocketChannel) ch; + sockC.socket().setTcpNoDelay(noDelay); + } + + // TODO + public void setSocketOptions(SelectorCallback cstate, + int linger, + boolean tcpNoDelay, + int socketTimeout) + throws IOException { + + SocketChannel socketChannel = + (SocketChannel) cstate.selectorData.channelData; + Socket socket = socketChannel.socket(); + + if(linger >= 0 ) + socket.setSoLinger( true, linger); + if( tcpNoDelay ) + socket.setTcpNoDelay(tcpNoDelay); + if( socketTimeout > 0 ) + socket.setSoTimeout( socketTimeout ); + } + + @Override + public int close(SelectorCallback cstate) throws IOException { + close((SelectionKey) cstate.selectorData.selKey, cstate, + cstate.selectorData.channelData, null); + return 0; + } + + + + public void acceptor(SelectorCallback cstate, + int port, + InetAddress inet, + int backlog, + int serverTimeout) + throws IOException + { + ServerSocketChannel ssc=ServerSocketChannel.open(); + ServerSocket serverSocket = ssc.socket(); + SocketAddress sa = null; + if (inet == null) { + sa = new InetSocketAddress( port ); + } else { + sa = new InetSocketAddress(inet, port); + } + if (backlog > 0) { + serverSocket.bind( sa , backlog); + } + if( serverTimeout >= 0 ) { + serverSocket.setSoTimeout( serverTimeout ); + } + + ssc.configureBlocking(false); + + cstate.selectorData.channelData = ssc; + cstate.selectorData.interest = SelectionKey.OP_ACCEPT; + // cstate must return 'OP_ACCEPT' as interest + synchronized (connectAcceptInterest) { + connectAcceptInterest.add(cstate); + } + selector.wakeup(); + } + + public void inetdAcceptor(SelectorCallback cstate) throws IOException { + SelectorProvider sp=SelectorProvider.provider(); + + Channel ch=sp.inheritedChannel(); + if(ch!=null ) { + //("Inherited: " + ch.getClass().getName()); + // blocking mode + ServerSocketChannel ssc=(ServerSocketChannel)ch; + cstate.selectorData.channelData = ssc; + cstate.selectorData.interest = SelectionKey.OP_ACCEPT; + // cstate must return 'OP_ACCEPT' as interest + synchronized (connectAcceptInterest) { + connectAcceptInterest.add(cstate); + } + selector.wakeup(); + } + } + + // -------------- Housekeeping ------------- + /** + * Same as APR connector - iterate over tasks, get + * smallest timeout + * @throws IOException + */ + void updateSleepTimeAndProcessTimeouts(long now) throws IOException { + long min = Long.MAX_VALUE; + // TODO: test with large sets, maybe sort + Iterator<SelectorCallback> activeIt = active.iterator(); + while(activeIt.hasNext()) { + SelectorCallback cstate = activeIt.next(); + if (! cstate.selectorData.channelData.isOpen()) { + log.info("Closed socket " + cstate.selectorData.channelData); + activeIt.remove(); + close(cstate); // generate callback, increment counters. + } + + long t = cstate.selectorData.nextTimeEvent; + if (t == 0) { + continue; + } + if (t < now) { + // Timeout + cstate.timeEvent(this); + // TODO: make sure this is updated if it was selected + continue; + } + if (t < min) { + min = t; + } + } + long nextSleep = min - now; + if (nextSleep > maxSleep) { + sleepTime = maxSleep; + } else if (nextSleep < minSleep) { + sleepTime = minSleep; + } else { + sleepTime = nextSleep; + } + nextWakeup = now + sleepTime; + } + + private void processPending() throws IOException { + synchronized (connectAcceptInterest) { + Iterator<SelectorCallback> ci = connectAcceptInterest.iterator(); + + while (ci.hasNext()) { + SelectorCallback cstate = ci.next(); + SelectionKey sk = (SelectionKey) cstate.selectorData.selKey; + + // Find host, port - initiate connection + try { + // Accept interest ? + if (cstate.selectorData.channelData instanceof ServerSocketChannel) { + ServerSocketChannel socketChannel = + (ServerSocketChannel) cstate.selectorData.channelData; + cstate.selectorData.sel = this; + cstate.selectorData.selKey = + socketChannel.register(selector, + SelectionKey.OP_ACCEPT, cstate.selectorData); + + cstate.selectorData.channelData = socketChannel; + active.add(cstate); + } else { + SocketChannel socketChannel = + (SocketChannel) cstate.selectorData.channelData; + cstate.selectorData.sel = this; + cstate.selectorData.selKey = + socketChannel.register(selector, + SelectionKey.OP_CONNECT, cstate.selectorData); + active.add(cstate); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + connectAcceptInterest.clear(); + } + + // Update interest + if (readInterest.size() > 0) { + synchronized (readInterest) { + Iterator<SelectorCallback> ci = readInterest.iterator(); + while (ci.hasNext()) { + SelectorCallback cstate = ci.next(); + selThreadUpdateInterest(cstate); + } + readInterest.clear(); + } + } + if (writeInterest.size() > 0) { + synchronized (writeInterest) { + Iterator<SelectorCallback> ci = writeInterest.iterator(); + while (ci.hasNext()) { + SelectorCallback cstate = ci.next(); + // Fake callback - will update as side effect + cstate.dataWriteable(this); + } + readInterest.clear(); + } + } + } +} \ No newline at end of file Propchange: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]