Author: fhanik
Date: Wed Aug 9 07:44:50 2006
New Revision: 430064
URL: http://svn.apache.org/viewvc?rev=430064&view=rev
Log:
Fixed deadlock issue with thread pool
Fixed error catches for a known JDK bug on windows #5076772
Added in the ability to have more than one poller, although performance
actually gets worse
Next steps: hand off setting socket options etc to the worker thread for faster
acceptance of new socket
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
Wed Aug 9 07:44:50 2006
@@ -95,7 +95,6 @@
//readTimeout = -1;
}
inputBuffer = new InternalNioInputBuffer(request,
headerBufferSize,readTimeout);
- inputBuffer.setPoller(endpoint.getPoller());
request.setInputBuffer(inputBuffer);
response = new Response();
@@ -752,7 +751,7 @@
if (request.getAttribute("org.apache.tomcat.comet") == null) {
comet = false;
}
- SelectionKey key =
socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key != null ) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)
key.attachment();
if ( attach!=null ) {
@@ -778,10 +777,10 @@
return SocketState.CLOSED;
} else if (!comet) {
recycle();
- endpoint.getPoller().add(socket);
+ socket.getPoller().add(socket);
return SocketState.OPEN;
} else {
- endpoint.getCometPoller().add(socket);
+ socket.getPoller().add(socket);
return SocketState.LONG;
}
}
@@ -809,7 +808,6 @@
this.socket = socket;
inputBuffer.setSocket(socket);
outputBuffer.setSocket(socket);
- outputBuffer.setSelector(endpoint.getPoller().getSelector());
// Error flag
error = false;
@@ -841,7 +839,7 @@
// and the method should return true
openSocket = true;
// Add the socket to the poller
- endpoint.getPoller().add(socket);
+ socket.getPoller().add(socket);
break;
}
request.setStartTime(System.currentTimeMillis());
@@ -897,7 +895,7 @@
if (request.getAttribute("org.apache.tomcat.comet") !=
null) {
comet = true;
}
- SelectionKey key =
socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
NioEndpoint.KeyAttachment attach =
(NioEndpoint.KeyAttachment) key.attachment();
if (attach != null) {
@@ -1049,7 +1047,7 @@
comet = false;
cometClose = true;
- SelectionKey key =
socket.getIOChannel().keyFor(endpoint.getPoller().getSelector());
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key != null ) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)
key.attachment();
if ( attach!=null && attach.getComet()) {
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
Wed Aug 9 07:44:50 2006
@@ -223,6 +223,21 @@
// -------------------- Pool setup --------------------
+ public void setPollerThreadCount(int count) {
+ ep.setPollerThreadCount(count);
+ }
+
+ public int getPollerThreadCount() {
+ return ep.getPollerThreadCount();
+ }
+
+ public void setSelectorTimeout(long timeout) {
+ ep.setSelectorTimeout(timeout);
+ }
+
+ public long getSelectorTimeout() {
+ return ep.getSelectorTimeout();
+ }
// *
public Executor getExecutor() {
return ep.getExecutor();
@@ -616,7 +631,7 @@
// processor.
connections.put(socket, processor);
localProcessor.set(null);
- proto.ep.getCometPoller().add(socket);
+ socket.getPoller().add(socket);
}
return state;
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
Wed Aug 9 07:44:50 2006
@@ -182,7 +182,6 @@
* header.
*/
protected long readTimeout;
- private Poller poller;
// ------------------------------------------------------------- Properties
@@ -202,10 +201,6 @@
return socket;
}
- public Poller getPoller() {
- return poller;
- }
-
/**
* Add an input filter to the filter library.
*/
@@ -274,10 +269,6 @@
this.swallowInput = swallowInput;
}
- public void setPoller(Poller poller) {
- this.poller = poller;
- }
-
// --------------------------------------------------------- Public Methods
@@ -564,7 +555,7 @@
timedOut = (readTimeout != -1) &&
((System.currentTimeMillis()-start)>readTimeout);
if ( !timedOut && nRead == 0 ) {
try {
- final SelectionKey key =
socket.getIOChannel().keyFor(poller.getSelector());
+ final SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
final KeyAttachment att =
(KeyAttachment)key.attachment();
//to do, add in a check, we might have just timed out
on the wait,
//so there is no need to register us again.
@@ -587,7 +578,7 @@
private void addToReadQueue(final SelectionKey key, final KeyAttachment
att) {
att.setWakeUp(true);
- poller.addEvent(
+ att.getPoller().addEvent(
new Runnable() {
public void run() {
try {
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Wed Aug 9 07:44:50 2006
@@ -49,8 +49,7 @@
// ----------------------------------------------------------- Constructors
int bbufLimit = 0;
- Selector selector;
-
+
/**
* Default constructor.
*/
@@ -182,10 +181,6 @@
this.socket = socket;
}
- public void setSelector(Selector selector) {
- this.selector = selector;
- }
-
/**
* Get the underlying socket input stream.
*/
@@ -715,7 +710,7 @@
throws IOException {
//prevent timeout for async,
- SelectionKey key = socket.getIOChannel().keyFor(selector);
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment)
key.attachment();
attach.access();
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Wed
Aug 9 07:44:50 2006
@@ -20,7 +20,9 @@
import java.nio.channels.ByteChannel;
import java.nio.channels.SocketChannel;
+import org.apache.tomcat.util.net.NioEndpoint.Poller;
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
+
/**
*
* Base class for a SocketChannel wrapper used by the endpoint.
@@ -37,6 +39,8 @@
protected SocketChannel sc = null;
protected ApplicationBufferHandler bufHandler;
+
+ protected Poller poller;
public NioChannel(SocketChannel channel, ApplicationBufferHandler
bufHandler) throws IOException {
this.sc = channel;
@@ -112,6 +116,10 @@
return bufHandler;
}
+ public Poller getPoller() {
+ return poller;
+ }
+
/**
* getIOChannel
*
@@ -146,5 +154,8 @@
return 0;
}
+ public void setPoller(Poller poller) {
+ this.poller = poller;
+ }
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=430064&r1=430063&r2=430064&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed
Aug 9 07:44:50 2006
@@ -42,6 +42,9 @@
import org.apache.tomcat.util.res.StringManager;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* NIO tailored thread pool, providing the following services:
@@ -316,7 +319,7 @@
*/
protected Poller[] pollers = null;
protected int pollerRoundRobin = 0;
- public Poller getPoller() {
+ public Poller getPoller0() {
pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
Poller poller = pollers[pollerRoundRobin];
return poller;
@@ -326,8 +329,8 @@
/**
* The socket poller used for Comet support.
*/
- public Poller getCometPoller() {
- Poller poller = getPoller();
+ public Poller getCometPoller0() {
+ Poller poller = getPoller0();
return poller;
}
@@ -335,13 +338,13 @@
/**
* Dummy maxSpareThreads property.
*/
- public int getMaxSpareThreads() { return 0; }
+ public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
/**
* Dummy minSpareThreads property.
*/
- public int getMinSpareThreads() { return 0; }
+ public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
// -------------------- SSL related properties --------------------
protected String keystoreFile =
System.getProperty("user.home")+"/.keystore";
@@ -470,8 +473,8 @@
// FIXME: Doesn't seem to work that well with multiple accept
threads
acceptorThreadCount = 1;
}
- if (pollerThreadCount != 1) {
- // limit to one poller, no need for others
+ if (pollerThreadCount <= 0) {
+ //minimum one poller thread
pollerThreadCount = 1;
}
@@ -513,10 +516,12 @@
if (!running) {
running = true;
paused = false;
-
+
+
// Create worker collection
if (executor == null) {
workers = new WorkerStack(maxThreads);
+ //executor = new
ThreadPoolExecutor(getMinSpareThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new
LinkedBlockingQueue<Runnable>());
}
// Start acceptor threads
@@ -528,6 +533,7 @@
}
// Start poller threads
+ log.info("Creating poller threads:"+pollerThreadCount);
pollers = new Poller[pollerThreadCount];
for (int i = 0; i < pollerThreadCount; i++) {
pollers[i] = new Poller();
@@ -678,7 +684,8 @@
channel = new NioChannel(socket,bufhandler);
}
- getPoller().register(channel);
+
+ getPoller0().register(channel);
} catch (Throwable t) {
if (log.isDebugEnabled()) {
@@ -746,12 +753,13 @@
while (workerThread == null) {
try {
synchronized (workers) {
- workers.wait();
+ workerThread = createWorkerThread();
+ if ( workerThread == null ) workers.wait();
}
} catch (InterruptedException e) {
// Ignore
}
- workerThread = createWorkerThread();
+ if ( workerThread == null ) workerThread = createWorkerThread();
}
return workerThread;
}
@@ -974,11 +982,13 @@
public void register(final NioChannel socket)
{
+ socket.setPoller(this);
+ final KeyAttachment ka = new KeyAttachment(this);
+ ka.setChannel(socket);
Runnable r = new Runnable() {
public void run() {
try {
- KeyAttachment ka = new KeyAttachment();
- ka.setChannel(socket);
+
socket.getIOChannel().register(selector,
SelectionKey.OP_READ, ka);
} catch (Exception x) {
log.error("", x);
@@ -1027,6 +1037,14 @@
try {
wakeupCounter.set(0);
keyCount = selector.select(selectorTimeout);
+ } catch ( NullPointerException x ) {
+ //sun bug 5076772 on windows JDK 1.5
+ if ( wakeupCounter == null || selector == null ) throw x;
+ continue;
+ } catch ( CancelledKeyException x ) {
+ //sun bug 5076772 on windows JDK 1.5
+ if ( wakeupCounter == null || selector == null ) throw x;
+ continue;
} catch (Throwable x) {
log.error("",x);
continue;
@@ -1045,11 +1063,9 @@
iterator.remove();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
try {
- if ( sk.isValid() ) {
- if(attachment == null) attachment = new
KeyAttachment();
+ if ( sk.isValid() && attachment != null ) {
attachment.access();
sk.attach(attachment);
- int readyOps = sk.readyOps();
sk.interestOps(0);
attachment.interestOps(0);
NioChannel channel = attachment.getChannel();
@@ -1121,7 +1137,12 @@
}
public static class KeyAttachment {
-
+
+ public KeyAttachment(Poller poller) {
+ this.poller = poller;
+ }
+ public Poller getPoller() { return poller;}
+ public void setPoller(Poller poller){this.poller = poller;}
public long getLastAccess() { return lastAccess; }
public void access() { access(System.currentTimeMillis()); }
public void access(long access) { lastAccess = access; }
@@ -1138,6 +1159,7 @@
public void setError(boolean error) { this.error = error; }
public NioChannel getChannel() { return channel;}
public void setChannel(NioChannel channel) { this.channel = channel;}
+ protected Poller poller = null;
protected int interestOps = 0;
public int interestOps() { return interestOps;}
public int interestOps(int ops) { this.interestOps = ops; return ops;
}
@@ -1254,7 +1276,7 @@
NioChannel socket = await();
if (socket == null)
continue;
- SelectionKey key =
socket.getIOChannel().keyFor(getPoller().getSelector());
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
int handshake = -1;
try {
handshake = socket.handshake(key.isReadable(),
key.isWritable());
@@ -1310,7 +1332,7 @@
}
};
- getPoller().addEvent(r);
+ ka.getPoller().addEvent(r);
}
//dereference socket to let GC do its job
socket = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]