Author: fhanik
Date: Mon Oct 23 10:39:28 2006
New Revision: 467044
URL: http://svn.apache.org/viewvc?view=rev&rev=467044
Log:
Added in blocking logic to the NIO connector. This logic ensures that if there
is a slow client, we will not be wasting CPU cycles doing endless spinning.
Ideas for this implementation can be credited to the Tribes implementation
where we have a pool of selectors, so that each sending thread uses its own
selector and from Jeanfrancois Arcand's blog about wrapping a NIO channel in a
blocking inputstream.
Added:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
Mon Oct 23 10:39:28 2006
@@ -31,6 +31,7 @@
import org.apache.catalina.tribes.transport.AbstractSender;
import org.apache.catalina.tribes.transport.DataSender;
import org.apache.catalina.tribes.RemoteProcessException;
+import java.io.EOFException;
/**
* This class is NOT thread safe and should never be used with more than one
thread at a time
@@ -177,6 +178,7 @@
//weve written everything, or we are starting a new package
//protect against buffer overwrite
int byteswritten = socketChannel.write(writebuf);
+ if (byteswritten == -1 ) throw new EOFException();
remaining -= byteswritten;
//if the entire message was written from the buffer
//reset the position counter
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
Mon Oct 23 10:39:28 2006
@@ -100,7 +100,7 @@
response = new Response();
response.setHook(this);
- outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize);
+ outputBuffer = new InternalNioOutputBuffer(response,
headerBufferSize,readTimeout);
response.setOutputBuffer(outputBuffer);
request.setResponse(response);
@@ -806,6 +806,8 @@
this.socket = socket;
inputBuffer.setSocket(socket);
outputBuffer.setSocket(socket);
+ inputBuffer.setSelectorPool(endpoint.getSelectorPool());
+ outputBuffer.setSelectorPool(endpoint.getSelectorPool());
// Error flag
error = false;
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
Mon Oct 23 10:39:28 2006
@@ -33,6 +33,8 @@
import org.apache.tomcat.util.net.NioEndpoint.Poller;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import java.nio.channels.Selector;
/**
* Implementation of InputBuffer which provides HTTP request header parsing as
@@ -157,7 +159,12 @@
* Underlying socket.
*/
protected NioChannel socket;
-
+
+ /**
+ * Selector pool, for blocking reads and blocking writes
+ */
+ protected NioSelectorPool pool;
+
/**
* Underlying input buffer.
@@ -199,8 +206,7 @@
public void setSocket(NioChannel socket) {
this.socket = socket;
}
-
-
+
/**
* Get the underlying socket input stream.
*/
@@ -208,6 +214,15 @@
return socket;
}
+ public void setSelectorPool(NioSelectorPool pool) {
+ this.pool = pool;
+ }
+
+ public NioSelectorPool getSelectorPool() {
+ return pool;
+ }
+
+
/**
* Add an input filter to the filter library.
*/
@@ -549,47 +564,34 @@
*/
private boolean readSocket(boolean timeout, boolean block) throws
IOException {
int nRead = 0;
- long start = System.currentTimeMillis();
- boolean timedOut = false;
- do {
-
- socket.getBufHandler().getReadBuffer().clear();
+ long rto = timeout?this.readTimeout:-1;
+ socket.getBufHandler().getReadBuffer().clear();
+ if ( block ) {
+ Selector selector = null;
+ try { selector = getSelectorPool().get(); }catch ( IOException x )
{}
+ try {
+ nRead =
getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket.getIOChannel(),selector,rto);
+ } catch ( EOFException eof ) {
+ nRead = -1;
+ } finally {
+ if ( selector != null ) getSelectorPool().put(selector);
+ }
+ } else {
nRead = socket.read(socket.getBufHandler().getReadBuffer());
- if (nRead > 0) {
- socket.getBufHandler().getReadBuffer().flip();
- socket.getBufHandler().getReadBuffer().limit(nRead);
- expand(nRead + pos);
- socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
- lastValid = pos + nRead;
- return true;
- } else if (nRead == -1) {
- //return false;
- throw new EOFException(sm.getString("iib.eof.error"));
- } else if ( !block ) {
- return false;
- } else {
- timedOut = timeout && (readTimeout != -1) &&
((System.currentTimeMillis()-start)>readTimeout);
- if ( !timedOut && nRead == 0 ) {
- try {
- final SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- final KeyAttachment att =
(KeyAttachment)key.attachment();
- //to do, add in a check, we might have just timed out
on the wait,
- //so there is no need to register us again.
- boolean addToQueue = false;
- try { addToQueue =
((att.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch (
CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");}
- if ( addToQueue ) {
- synchronized (att.getMutex()) {
- addToReadQueue(key, att);
- att.getMutex().wait(readTimeout);
- }
- }//end if
- }catch ( Exception x ) {}
- }
- }
- }while ( nRead == 0 && (!timedOut) );
- //else throw new IOException(sm.getString("iib.failedread"));
- //return false; //timeout
- throw new IOException("read timed out.");
+ }
+ if (nRead > 0) {
+ socket.getBufHandler().getReadBuffer().flip();
+ socket.getBufHandler().getReadBuffer().limit(nRead);
+ expand(nRead + pos);
+ socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
+ lastValid = pos + nRead;
+ return true;
+ } else if (nRead == -1) {
+ //return false;
+ throw new EOFException(sm.getString("iib.eof.error"));
+ } else {
+ return false;
+ }
}
private void addToReadQueue(final SelectionKey key, final KeyAttachment
att) {
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Mon Oct 23 10:39:28 2006
@@ -17,9 +17,11 @@
package org.apache.coyote.http11;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
@@ -31,6 +33,7 @@
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioSelectorPool;
import org.apache.tomcat.util.res.StringManager;
/**
@@ -54,14 +57,14 @@
* Default constructor.
*/
public InternalNioOutputBuffer(Response response) {
- this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
+ this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
}
/**
* Alternate constructor.
*/
- public InternalNioOutputBuffer(Response response, int headerBufferSize) {
+ public InternalNioOutputBuffer(Response response, int headerBufferSize,
long writeTimeout) {
this.response = response;
headers = response.getMimeHeaders();
@@ -83,6 +86,8 @@
committed = false;
finished = false;
+
+ this.writeTimeout = writeTimeout;
// Cause loading of HttpMessages
HttpMessages.getMessage(200);
@@ -143,6 +148,12 @@
* Underlying socket.
*/
protected NioChannel socket;
+
+ /**
+ * Selector pool, for blocking reads and blocking writes
+ */
+ protected NioSelectorPool pool;
+
/**
@@ -168,7 +179,11 @@
* Index of the last active filter.
*/
protected int lastActiveFilter;
-
+
+ /**
+ * Write time out in milliseconds
+ */
+ protected long writeTimeout = -1;
// ------------------------------------------------------------- Properties
@@ -181,12 +196,28 @@
this.socket = socket;
}
+ public void setWriteTimeout(long writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
/**
* Get the underlying socket input stream.
*/
public NioChannel getSocket() {
return socket;
}
+
+ public long getWriteTimeout() {
+ return writeTimeout;
+ }
+
+ public void setSelectorPool(NioSelectorPool pool) {
+ this.pool = pool;
+ }
+
+ public NioSelectorPool getSelectorPool() {
+ return pool;
+ }
/**
* Set the socket buffer size.
*/
@@ -392,14 +423,22 @@
private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean
flip) throws IOException {
//int limit = bytebuffer.position();
if ( flip ) bytebuffer.flip();
- while ( bytebuffer.hasRemaining() ) {
- int written = socket.write(bytebuffer);
+ int written = 0;
+ Selector selector = null;
+ try {
+ selector = getSelectorPool().get();
+ } catch ( IOException x ) {
+ //ignore
+ }
+ try {
+ written = getSelectorPool().write(bytebuffer,
socket.getIOChannel(), selector, writeTimeout);
+ //make sure we are flushed
+ do {
+ if (socket.flush(selector)) break;
+ }while ( true );
+ }finally {
+ if ( selector != null ) getSelectorPool().put(selector);
}
- //make sure we are flushed
- do {
- if (socket.flush()) break;
- }while ( true );
-
socket.getBufHandler().getWriteBuffer().clear();
this.total = 0;
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Mon
Oct 23 10:39:28 2006
@@ -1,19 +1,21 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
+
package org.apache.tomcat.util.net;
import java.io.IOException;
@@ -23,6 +25,7 @@
import org.apache.tomcat.util.net.NioEndpoint.Poller;
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
+import java.nio.channels.Selector;
/**
*
@@ -34,20 +37,20 @@
* @version 1.0
*/
public class NioChannel implements ByteChannel{
-
+
protected static ByteBuffer emptyBuf = ByteBuffer.allocate(0);
protected SocketChannel sc = null;
protected ApplicationBufferHandler bufHandler;
-
+
protected Poller poller;
public NioChannel(SocketChannel channel, ApplicationBufferHandler
bufHandler) throws IOException {
this.sc = channel;
this.bufHandler = bufHandler;
}
-
+
public void reset() throws IOException {
bufHandler.getReadBuffer().clear();
bufHandler.getWriteBuffer().clear();
@@ -58,7 +61,7 @@
* been flushed out and is empty
* @return boolean
*/
- public boolean flush() throws IOException {
+ public boolean flush(Selector s) throws IOException {
return true; //no network buffer in the regular channel
}
@@ -154,7 +157,7 @@
public boolean isInitHandshakeComplete() {
return true;
}
-
+
public int handshake(boolean read, boolean write) throws IOException {
return 0;
}
@@ -171,4 +174,4 @@
return super.toString()+":"+this.sc.toString();
}
-}
\ No newline at end of file
+}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon
Oct 23 10:39:28 2006
@@ -35,7 +35,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -143,6 +142,8 @@
protected int readBufSize = 8192;
protected int writeBufSize = 8192;
+ protected NioSelectorPool selectorPool = new NioSelectorPool();;
+
/**
* Server socket "pointer".
*/
@@ -418,6 +419,10 @@
this.readBufSize = readBufSize;
}
+ public void setSelectorPool(NioSelectorPool selectorPool) {
+ this.selectorPool = selectorPool;
+ }
+
protected SSLContext sslContext = null;
public SSLContext getSSLContext() { return sslContext;}
public void setSSLContext(SSLContext c) { sslContext = c;}
@@ -548,6 +553,9 @@
running = true;
paused = false;
+ selectorPool.setMaxSelectors(maxThreads);
+ selectorPool.setMaxSpareSelectors(-1);
+ selectorPool.open();
// Create worker collection
if (executor == null) {
@@ -611,6 +619,7 @@
}
pollers = null;
}
+ try {selectorPool.close();}catch (IOException x){}
nioChannels.clear();
}
@@ -650,8 +659,12 @@
return readBufSize;
}
+ public NioSelectorPool getSelectorPool() {
+ return selectorPool;
+ }
+
/**
- * Unlock the server socket accept using a bugus connection.
+ * Unlock the server socket accept using a bogus connection.
*/
protected void unlockAccept() {
java.net.Socket s = null;
@@ -709,7 +722,7 @@
int appbufsize =
engine.getSession().getApplicationBufferSize();
int bufsize = Math.max(Math.max(getReadBufSize(),
getWriteBufSize()), appbufsize);
NioBufferHandler bufhandler = new
NioBufferHandler(bufsize, bufsize);
- channel = new SecureNioChannel(socket, engine, bufhandler);
+ channel = new SecureNioChannel(socket, engine, bufhandler,
selectorPool);
} else {
NioBufferHandler bufhandler = new
NioBufferHandler(getReadBufSize(), getWriteBufSize());
channel = new NioChannel(socket, bufhandler);
Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=auto&rev=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
(added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
Mon Oct 23 10:39:28 2006
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.channels.Selector;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.io.EOFException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ *
+ * Thread safe non blocking selector pool
+ * @author Filip Hanik
+ * @version 1.0
+ * @since 6.0
+ */
+
+public class NioSelectorPool {
+ protected int maxSelectors = 100;
+ protected int maxSpareSelectors = -1;
+ protected boolean enabled = true;
+ protected AtomicInteger active = new AtomicInteger(0);
+ protected AtomicInteger spare = new AtomicInteger(0);
+ //protected LinkedList<Selector> selectors = new LinkedList<Selector>();
+ protected ConcurrentLinkedQueue<Selector> selectors = new
ConcurrentLinkedQueue<Selector>();
+
+ public Selector get() throws IOException{
+ if ( (!enabled) || active.incrementAndGet() >= maxSelectors ) {
+ active.decrementAndGet();
+ return null;
+ }
+ Selector s = null;
+ try {
+ s = selectors.size()>0?selectors.poll():null;
+ if (s == null) s = Selector.open();
+ else spare.decrementAndGet();
+
+ }catch (NoSuchElementException x ) {
+ try {s = Selector.open();}catch (IOException iox){}
+ } finally {
+ if ( s == null ) active.decrementAndGet();//we were unable to find
a selector
+ }
+ return s;
+ }
+
+
+
+ public void put(Selector s) throws IOException {
+ active.decrementAndGet();
+ if ( enabled && (maxSpareSelectors==-1 || spare.get() <
Math.min(maxSpareSelectors,maxSelectors)) ) {
+ spare.incrementAndGet();
+ selectors.offer(s);
+ }
+ else s.close();
+ }
+
+ public void close() throws IOException {
+ enabled = false;
+ Selector s;
+ while ( (s = selectors.poll()) != null ) s.close();
+ spare.set(0);
+ }
+
+ public void open(){
+ enabled = true;
+ }
+
+ /**
+ * Performs a blocking write using the bytebuffer for data to be written
and a selector to block.
+ * If the <code>selector</code> parameter is null, then it will perform a
busy write that could
+ * take up a lot of CPU cycles.
+ * @param buf ByteBuffer - the buffer containing the data, we will write
as long as <code>(buf.hasRemaining()==true)</code>
+ * @param socket SocketChannel - the socket to write data to
+ * @param selector Selector - the selector to use for blocking, if null
then a busy write will be initiated
+ * @param writeTimeout long - the timeout for this write operation in
milliseconds, -1 means no timeout
+ * @return int - returns the number of bytes written
+ * @throws EOFException if write returns -1
+ * @throws SocketTimeoutException if the write times out
+ * @throws IOException if an IO Exception occurs in the underlying socket
logic
+ */
+ public int write(ByteBuffer buf, SocketChannel socket, Selector selector,
long writeTimeout) throws IOException {
+ SelectionKey key = null;
+ int written = 0;
+ boolean timedout = false;
+ int keycount = 1; //assume we can write
+ long time = System.currentTimeMillis(); //start the timeout timer
+ try {
+ while ( (!timedout) && buf.hasRemaining() ) {
+ if ( keycount > 0 ) { //only write if we were registered for a
write
+ int cnt = socket.write(buf); //write the data
+ if (cnt == -1) throw new EOFException();
+ written += cnt;
+ if (cnt > 0) {
+ time = System.currentTimeMillis(); //reset our timeout
timer
+ continue; //we successfully wrote, try again without a
selector
+ }
+ }
+ if ( selector != null ) {
+ //register OP_WRITE to the selector
+ if (key==null) key = socket.register(selector,
SelectionKey.OP_WRITE);
+ else key.interestOps(SelectionKey.OP_WRITE);
+ keycount = selector.select(writeTimeout);
+ }
+ if (writeTimeout > 0 && (selector == null || keycount == 0) )
timedout = (System.currentTimeMillis()-time)>=writeTimeout;
+ }//while
+ if ( timedout ) throw new SocketTimeoutException();
+ } finally {
+ if (key != null) key.cancel();
+ if (selector != null) selector.selectNow();
+ }
+ return written;
+ }
+
+ /**
+ * Performs a blocking read using the bytebuffer for data to be read and a
selector to block.
+ * If the <code>selector</code> parameter is null, then it will perform a
busy read that could
+ * take up a lot of CPU cycles.
+ * @param buf ByteBuffer - the buffer containing the data, we will read as
until we have read at least one byte or we timed out
+ * @param socket SocketChannel - the socket to write data to
+ * @param selector Selector - the selector to use for blocking, if null
then a busy read will be initiated
+ * @param readTimeout long - the timeout for this read operation in
milliseconds, -1 means no timeout
+ * @return int - returns the number of bytes read
+ * @throws EOFException if read returns -1
+ * @throws SocketTimeoutException if the read times out
+ * @throws IOException if an IO Exception occurs in the underlying socket
logic
+ */
+ public int read(ByteBuffer buf, SocketChannel socket, Selector selector,
long readTimeout) throws IOException {
+ SelectionKey key = null;
+ int read = 0;
+ boolean timedout = false;
+ int keycount = 1; //assume we can write
+ long time = System.currentTimeMillis(); //start the timeout timer
+ try {
+ while ( (!timedout) && read == 0 ) {
+ if ( keycount > 0 ) { //only read if we were registered for a
read
+ int cnt = socket.read(buf);
+ if (cnt == -1) throw new EOFException();
+ read += cnt;
+ if (cnt > 0) break;
+ }
+ if ( selector != null ) {
+ //register OP_WRITE to the selector
+ if (key==null) key = socket.register(selector,
SelectionKey.OP_READ);
+ else key.interestOps(SelectionKey.OP_READ);
+ keycount = selector.select(readTimeout);
+ }
+ if (readTimeout > 0 && (selector == null || keycount == 0) )
timedout = (System.currentTimeMillis()-time)>=readTimeout;
+ }//while
+ if ( timedout ) throw new SocketTimeoutException();
+ } finally {
+ if (key != null) key.cancel();
+ if (selector != null) selector.selectNow();
+ }
+ return read;
+ }
+
+ public void setMaxSelectors(int maxSelectors) {
+ this.maxSelectors = maxSelectors;
+ }
+
+ public void setMaxSpareSelectors(int maxSpareSelectors) {
+ this.maxSpareSelectors = maxSpareSelectors;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public int getMaxSelectors() {
+ return maxSelectors;
+ }
+
+ public int getMaxSpareSelectors() {
+ return maxSpareSelectors;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+}
\ No newline at end of file
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
Mon Oct 23 10:39:28 2006
@@ -8,6 +8,7 @@
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
+import java.nio.channels.Selector;
/**
*
@@ -29,7 +30,10 @@
protected boolean closed = false;
protected boolean closing = false;
- public SecureNioChannel(SocketChannel channel, SSLEngine engine,
ApplicationBufferHandler bufHandler) throws IOException {
+ protected NioSelectorPool pool;
+
+ public SecureNioChannel(SocketChannel channel, SSLEngine engine,
+ ApplicationBufferHandler bufHandler,
NioSelectorPool pool) throws IOException {
super(channel,bufHandler);
this.sslEngine = engine;
int appBufSize = sslEngine.getSession().getApplicationBufferSize();
@@ -37,7 +41,10 @@
//allocate network buffers - TODO, add in optional direct non-direct
buffers
if ( netInBuffer == null ) netInBuffer =
ByteBuffer.allocateDirect(netBufSize);
if ( netOutBuffer == null ) netOutBuffer =
ByteBuffer.allocateDirect(netBufSize);
-
+
+ //selector pool for blocking operations
+ this.pool = pool;
+
//ensure that the application has a large enough read/write buffers
//by doing this, we should not encounter any buffer overflow errors
bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
@@ -72,12 +79,13 @@
* been flushed out and is empty
* @return boolean
*/
- public boolean flush() throws IOException {
- return flush(netOutBuffer);
+ public boolean flush(Selector s, long timeout) throws IOException {
+ pool.write(netOutBuffer,sc,s,timeout);
+ return !netOutBuffer.hasRemaining();
}
/**
- * Flushes the buffer to the network
+ * Flushes the buffer to the network, non blocking
* @param buf ByteBuffer
* @return boolean true if the buffer has been emptied out, false otherwise
* @throws IOException
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]