Author: fhanik
Date: Wed Aug 9 10:12:37 2006
New Revision: 430097
URL: http://svn.apache.org/viewvc?rev=430097&view=rev
Log:
Added in a cache for byte buffers
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=430097&r1=430096&r2=430097&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Wed
Aug 9 10:12:37 2006
@@ -46,6 +46,11 @@
this.sc = channel;
this.bufHandler = bufHandler;
}
+
+ public void reset() throws IOException {
+ bufHandler.getReadBuffer().clear();
+ bufHandler.getWriteBuffer().clear();
+ }
/**
* returns true if the network buffer has
@@ -119,7 +124,6 @@
public Poller getPoller() {
return poller;
}
-
/**
* getIOChannel
*
@@ -156,6 +160,14 @@
public void setPoller(Poller poller) {
this.poller = poller;
+ }
+
+ public void setIOChannel(SocketChannel IOChannel) {
+ this.sc = IOChannel;
+ }
+
+ public String toString() {
+ return super.toString()+":"+this.sc.toString();
}
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=430097&r1=430096&r2=430097&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed
Aug 9 10:12:37 2006
@@ -45,6 +45,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
+import java.net.Socket;
/**
* NIO tailored thread pool, providing the following services:
@@ -149,11 +150,13 @@
protected ServerSocketChannel serverSock = null;
- /**
- * APR memory pool for the server socket.
- */
- protected long serverSockPool = 0;
-
+ protected ConcurrentLinkedQueue<NioChannel> nioChannels = new
ConcurrentLinkedQueue<NioChannel>() {
+ public boolean offer(NioChannel o) {
+ //avoid over growing our cache or add after we have stopped
+ if ( running && (size() < curThreads) ) return super.offer(o);
+ else return false;
+ }
+ };
@@ -581,6 +584,7 @@
}
pollers = null;
}
+ nioChannels.clear();
}
@@ -597,6 +601,7 @@
serverSock = null;
sslContext = null;
initialized = false;
+ nioChannels.clear();
}
@@ -658,33 +663,36 @@
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
-
+ Socket sock = socket.socket();
// 1: Set socket options: timeout, linger, etc
if (soLinger >= 0)
- socket.socket().setSoLinger(true,soLinger);
+ sock.setSoLinger(true,soLinger);
if (tcpNoDelay)
- socket.socket().setTcpNoDelay(true);
+ sock.setTcpNoDelay(true);
if (soTimeout > 0)
- socket.socket().setSoTimeout(soTimeout);
+ sock.setSoTimeout(soTimeout);
- NioChannel channel = null;
- // 2: SSL setup
- step = 2;
- if (sslContext != null) {
- SSLEngine engine = sslContext.createSSLEngine();
- engine.setNeedClientAuth(getClientAuth());
- engine.setUseClientMode(false);
- int appbufsize =
engine.getSession().getApplicationBufferSize();
- int bufsize =
Math.max(Math.max(getReadBufSize(),getWriteBufSize()),appbufsize);
- NioBufferHandler bufhandler = new
NioBufferHandler(bufsize,bufsize);
- channel = new SecureNioChannel(socket,engine,bufhandler);
-
+ NioChannel channel = nioChannels.poll();
+ if ( channel == null ) {
+ // 2: SSL setup
+ step = 2;
+
+ if (sslContext != null) {
+ SSLEngine engine = sslContext.createSSLEngine();
+ engine.setNeedClientAuth(getClientAuth());
+ engine.setUseClientMode(false);
+ int appbufsize =
engine.getSession().getApplicationBufferSize();
+ int bufsize = Math.max(Math.max(getReadBufSize(),
getWriteBufSize()), appbufsize);
+ NioBufferHandler bufhandler = new
NioBufferHandler(bufsize, bufsize);
+ channel = new SecureNioChannel(socket, engine, bufhandler);
+ } else {
+ NioBufferHandler bufhandler = new
NioBufferHandler(getReadBufSize(), getWriteBufSize());
+ channel = new NioChannel(socket, bufhandler);
+ }
} else {
- NioBufferHandler bufhandler = new
NioBufferHandler(getReadBufSize(),getWriteBufSize());
- channel = new NioChannel(socket,bufhandler);
+ channel.setIOChannel(socket);
+ channel.reset();
}
-
-
getPoller0().register(channel);
} catch (Throwable t) {
@@ -779,6 +787,21 @@
}
+ protected boolean processSocket(SocketChannel socket) {
+ try {
+ if (executor == null) {
+ getWorkerThread().assign(socket);
+ } else {
+ executor.execute(new SocketOptionsProcessor(socket));
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
/**
* Process given socket.
*/
@@ -849,13 +872,14 @@
try {
// Accept the next incoming connection from the server
socket
SocketChannel socket = serverSock.accept();
+ processSocket(socket);
// Hand this socket off to an appropriate processor
- if(!setSocketOptions(socket))
- {
- // Close socket right away
- socket.socket().close();
- socket.close();
- }
+// if(!setSocketOptions(socket))
+// {
+// // Close socket right away
+// socket.socket().close();
+// socket.close();
+// }
} catch (Throwable t) {
log.error(sm.getString("endpoint.accept.fail"), t);
}
@@ -1187,7 +1211,7 @@
protected Thread thread = null;
protected boolean available = false;
- protected NioChannel socket = null;
+ protected Object socket = null;
protected boolean event = false;
protected boolean error = false;
@@ -1201,7 +1225,7 @@
*
* @param socket TCP socket to process
*/
- protected synchronized void assign(NioChannel socket) {
+ protected synchronized void assign(Object socket) {
// Wait for the Processor to get the previous Socket
while (available) {
@@ -1210,7 +1234,6 @@
} catch (InterruptedException e) {
}
}
-
// Store the newly available Socket and notify our thread
this.socket = socket;
event = false;
@@ -1221,7 +1244,7 @@
}
- protected synchronized void assign(NioChannel socket, boolean error) {
+ protected synchronized void assign(Object socket, boolean error) {
// Wait for the Processor to get the previous Socket
while (available) {
@@ -1244,7 +1267,7 @@
* Await a newly assigned Socket from our Connector, or
<code>null</code>
* if we are supposed to shut down.
*/
- protected synchronized NioChannel await() {
+ protected synchronized Object await() {
// Wait for the Connector to provide a new Socket
while (!available) {
@@ -1255,7 +1278,7 @@
}
// Notify the Connector that we have received this Socket
- NioChannel socket = this.socket;
+ Object socket = this.socket;
available = false;
notifyAll();
@@ -1272,72 +1295,99 @@
// Process requests until we receive a shutdown signal
while (running) {
- // Wait for the next socket to be assigned
- NioChannel socket = await();
- if (socket == null)
- continue;
- SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- int handshake = -1;
try {
- handshake = socket.handshake(key.isReadable(),
key.isWritable());
- }catch ( IOException x ) {
- handshake = -1;
- log.error("Error during SSL handshake",x);
- }catch ( CancelledKeyException ckx ) {
- handshake = -1;
- }
- if ( handshake == 0 ) {
- // Process the request from this socket
- if ((event) && (handler.event(socket, error) ==
Handler.SocketState.CLOSED)) {
- // Close socket and pool
- try {
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- }catch ( Exception x ) {
- log.error("",x);
+ // Wait for the next socket to be assigned
+ Object channel = await();
+ if (channel == null)
+ continue;
+
+ if ( channel instanceof SocketChannel) {
+ SocketChannel sc = (SocketChannel)channel;
+ if ( !setSocketOptions(sc) ) {
+ try {
+ sc.socket().close();
+ sc.close();
+ }catch ( IOException ix ) {
+ if ( log.isDebugEnabled() ) log.debug("",ix);
+ }
+ } else {
+ //now we have it registered, remove it from the
cache
+
}
- } else if ((!event) && (handler.process(socket) ==
Handler.SocketState.CLOSED)) {
- // Close socket and pool
+ } else {
+
+ NioChannel socket = (NioChannel)channel;
+
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ int handshake = -1;
try {
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- }catch ( Exception x ) {
- log.error("",x);
+ handshake = socket.handshake(key.isReadable(),
key.isWritable());
+ }catch ( IOException x ) {
+ handshake = -1;
+ log.error("Error during SSL handshake",x);
+ }catch ( CancelledKeyException ckx ) {
+ handshake = -1;
}
- }
- } else if (handshake == -1 ) {
- if ( key.isValid() ) key.cancel();
- try {socket.close(true);}catch (IOException ignore){}
- } else {
- final SelectionKey fk = key;
- final int intops = handshake;
- final KeyAttachment ka = (KeyAttachment)fk.attachment();
- //register for handshake ops
- Runnable r = new Runnable() {
- public void run() {
- try {
- fk.interestOps(intops);
- ka.interestOps(intops);
- } catch (CancelledKeyException ckx) {
+ if ( handshake == 0 ) {
+ // Process the request from this socket
+ if ((event) && (handler.event(socket, error) ==
Handler.SocketState.CLOSED)) {
+ // Close socket and pool
try {
- if ( fk != null && fk.attachment() != null
) {
-
- ka.setError(true); //set to collect
this socket immediately
- try
{ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
- try
{ka.getChannel().close();}catch(Exception ignore){}
- ka.setWakeUp(false);
- }
- } catch (Exception ignore) {}
+
+ try {socket.close();}catch (Exception
ignore){}
+ if ( socket.isOpen() ) socket.close(true);
+ nioChannels.offer(socket);
+ }catch ( Exception x ) {
+ log.error("",x);
+ }
+ } else if ((!event) && (handler.process(socket) ==
Handler.SocketState.CLOSED)) {
+ // Close socket and pool
+ try {
+
+ try {socket.close();}catch (Exception
ignore){}
+ if ( socket.isOpen() ) socket.close(true);
+ nioChannels.offer(socket);
+ }catch ( Exception x ) {
+ log.error("",x);
+ }
}
+ } else if (handshake == -1 ) {
+ if ( key.isValid() ) key.cancel();
+ try {socket.close(true);}catch (IOException
ignore){}
+ nioChannels.offer(socket);
+ } else {
+ final SelectionKey fk = key;
+ final int intops = handshake;
+ final KeyAttachment ka =
(KeyAttachment)fk.attachment();
+ //register for handshake ops
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ fk.interestOps(intops);
+ ka.interestOps(intops);
+ } catch (CancelledKeyException ckx) {
+ try {
+ if ( fk != null && fk.attachment()
!= null ) {
+
+ ka.setError(true); //set to
collect this socket immediately
+ try
{ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
+ try
{ka.getChannel().close();}catch(Exception ignore){}
+ ka.setWakeUp(false);
+ }
+ } catch (Exception ignore) {}
+ }
+ }
+ };
+ ka.getPoller().addEvent(r);
}
- };
- ka.getPoller().addEvent(r);
+ }
+ } finally {
+ //dereference socket to let GC do its job
+ socket = null;
+ // Finish up this request
+ recycleWorkerThread(this);
}
- //dereference socket to let GC do its job
- socket = null;
- // Finish up this request
- recycleWorkerThread(this);
}
}
@@ -1446,6 +1496,32 @@
}
+ // ---------------------------------------------- SocketOptionsProcessor
Inner Class
+
+
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool.
+ */
+ protected class SocketOptionsProcessor implements Runnable {
+
+ protected SocketChannel sc = null;
+
+ public SocketOptionsProcessor(SocketChannel socket) {
+ this.sc = socket;
+ }
+
+ public void run() {
+ if ( !setSocketOptions(sc) ) {
+ try {
+ sc.socket().close();
+ sc.close();
+ }catch ( IOException ix ) {
+ if ( log.isDebugEnabled() ) log.debug("",ix);
+ }
+ }
+ }
+ }
// ---------------------------------------------- SocketProcessor Inner
Class
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=430097&r1=430096&r2=430097&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
Wed Aug 9 10:12:37 2006
@@ -31,28 +31,31 @@
public SecureNioChannel(SocketChannel channel, SSLEngine engine,
ApplicationBufferHandler bufHandler) throws IOException {
super(channel,bufHandler);
-
this.sslEngine = engine;
-
-
int appBufSize = sslEngine.getSession().getApplicationBufferSize();
int netBufSize = sslEngine.getSession().getPacketBufferSize();
-
+ //allocate network buffers - TODO, add in optional direct non-direct
buffers
+ if ( netInBuffer == null ) netInBuffer =
ByteBuffer.allocateDirect(netBufSize);
+ if ( netOutBuffer == null ) netOutBuffer =
ByteBuffer.allocateDirect(netBufSize);
+
//ensure that the application has a large enough read/write buffers
//by doing this, we should not encounter any buffer overflow errors
bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
bufHandler.expand(bufHandler.getWriteBuffer(), appBufSize);
- //allocate network buffers - TODO, add in optional direct buffers
- this.netInBuffer = ByteBuffer.allocate(netBufSize);
- this.netOutBuffer = ByteBuffer.allocate(netBufSize);
- this.netOutBuffer.position(0);
- this.netOutBuffer.limit(0);
- this.netInBuffer.position(0);
- this.netInBuffer.limit(0);
+ reset();
+ }
+
+ public void reset() throws IOException {
+ super.reset();
+ netOutBuffer.position(0);
+ netOutBuffer.limit(0);
+ netInBuffer.position(0);
+ netInBuffer.limit(0);
//initiate handshake
sslEngine.beginHandshake();
initHandshakeStatus = sslEngine.getHandshakeStatus();
+
}
//===========================================================================================
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]