This is an automated email from the ASF dual-hosted git repository.
remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push:
new e813ae0 Remove poller thread count from NIO connector
e813ae0 is described below
commit e813ae0d9329ebf4b95c02043c39c676edb47d3c
Author: remm <[email protected]>
AuthorDate: Mon May 13 14:40:36 2019 +0200
Remove poller thread count from NIO connector
Simplify code when possible. As the poller is set for the connector,
onlythe NioChannel and NioSocketWrapper have a dynamic association. I
will close PR163.
---
.../apache/coyote/http11/Http11NioProtocol.java | 3 +-
.../tomcat/util/net/NioBlockingSelector.java | 4 +-
java/org/apache/tomcat/util/net/NioChannel.java | 32 ++---
java/org/apache/tomcat/util/net/NioEndpoint.java | 145 ++++++++-------------
webapps/docs/changelog.xml | 6 +-
webapps/docs/config/http.xml | 12 --
6 files changed, 75 insertions(+), 127 deletions(-)
diff --git a/java/org/apache/coyote/http11/Http11NioProtocol.java
b/java/org/apache/coyote/http11/Http11NioProtocol.java
index e79390b..e27bc89 100644
--- a/java/org/apache/coyote/http11/Http11NioProtocol.java
+++ b/java/org/apache/coyote/http11/Http11NioProtocol.java
@@ -47,11 +47,10 @@ public class Http11NioProtocol extends
AbstractHttp11JsseProtocol<NioChannel> {
// -------------------- Pool setup --------------------
public void setPollerThreadCount(int count) {
- ((NioEndpoint)getEndpoint()).setPollerThreadCount(count);
}
public int getPollerThreadCount() {
- return ((NioEndpoint)getEndpoint()).getPollerThreadCount();
+ return 1;
}
public void setSelectorTimeout(long timeout) {
diff --git a/java/org/apache/tomcat/util/net/NioBlockingSelector.java
b/java/org/apache/tomcat/util/net/NioBlockingSelector.java
index d723c7a..eb8d511 100644
--- a/java/org/apache/tomcat/util/net/NioBlockingSelector.java
+++ b/java/org/apache/tomcat/util/net/NioBlockingSelector.java
@@ -82,7 +82,7 @@ public class NioBlockingSelector {
*/
public int write(ByteBuffer buf, NioChannel socket, long writeTimeout)
throws IOException {
- SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
if (key == null) {
throw new
IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
}
@@ -158,7 +158,7 @@ public class NioBlockingSelector {
* @throws IOException if an IO Exception occurs in the underlying socket
logic
*/
public int read(ByteBuffer buf, NioChannel socket, long readTimeout)
throws IOException {
- SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
if (key == null) {
throw new
IOException(sm.getString("nioBlockingSelector.keyNotRegistered"));
}
diff --git a/java/org/apache/tomcat/util/net/NioChannel.java
b/java/org/apache/tomcat/util/net/NioChannel.java
index 4bc865c..01222e6 100644
--- a/java/org/apache/tomcat/util/net/NioChannel.java
+++ b/java/org/apache/tomcat/util/net/NioChannel.java
@@ -21,11 +21,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
-import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import org.apache.tomcat.util.net.NioEndpoint.Poller;
+import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper;
import org.apache.tomcat.util.res.StringManager;
/**
@@ -42,12 +41,10 @@ public class NioChannel implements ByteChannel,
ScatteringByteChannel, Gathering
protected static final ByteBuffer emptyBuf = ByteBuffer.allocate(0);
protected SocketChannel sc = null;
- protected SocketWrapperBase<NioChannel> socketWrapper = null;
+ protected NioSocketWrapper socketWrapper = null;
protected final SocketBufferHandler bufHandler;
- protected Poller poller;
-
public NioChannel(SocketChannel channel, SocketBufferHandler bufHandler) {
this.sc = channel;
this.bufHandler = bufHandler;
@@ -63,11 +60,18 @@ public class NioChannel implements ByteChannel,
ScatteringByteChannel, Gathering
}
- void setSocketWrapper(SocketWrapperBase<NioChannel> socketWrapper) {
+ void setSocketWrapper(NioSocketWrapper socketWrapper) {
this.socketWrapper = socketWrapper;
}
/**
+ * @return the socketWrapper
+ */
+ NioSocketWrapper getSocketWrapper() {
+ return socketWrapper;
+ }
+
+ /**
* Free the channel memory
*/
public void free() {
@@ -172,22 +176,10 @@ public class NioChannel implements ByteChannel,
ScatteringByteChannel, Gathering
return sc.read(dsts, offset, length);
}
- public Object getAttachment() {
- Poller pol = getPoller();
- Selector sel = pol!=null?pol.getSelector():null;
- SelectionKey key = sel!=null?getIOChannel().keyFor(sel):null;
- Object att = key!=null?key.attachment():null;
- return att;
- }
-
public SocketBufferHandler getBufHandler() {
return bufHandler;
}
- public Poller getPoller() {
- return poller;
- }
-
public SocketChannel getIOChannel() {
return sc;
}
@@ -213,10 +205,6 @@ public class NioChannel implements ByteChannel,
ScatteringByteChannel, Gathering
return 0;
}
- public void setPoller(Poller poller) {
- this.poller = poller;
- }
-
public void setIOChannel(SocketChannel IOChannel) {
this.sc = IOChannel;
}
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java
b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 1dac62f..0831273 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -44,7 +44,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLEngine;
@@ -147,34 +146,32 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
/**
- * Poller thread count.
+ * NO-OP.
+ *
+ * @param pollerThreadCount Unused
+ *
+ * @deprecated Will be removed in Tomcat 10.
+ */
+ @Deprecated
+ public void setPollerThreadCount(int pollerThreadCount) { }
+ /**
+ * Always returns 1.
+ *
+ * @return Always 1.
+ *
+ * @deprecated Will be removed in Tomcat 10.
*/
- private int pollerThreadCount = 1;
- public void setPollerThreadCount(int pollerThreadCount) {
this.pollerThreadCount = pollerThreadCount; }
- public int getPollerThreadCount() { return pollerThreadCount; }
+ @Deprecated
+ public int getPollerThreadCount() { return 1; }
private long selectorTimeout = 1000;
public void setSelectorTimeout(long timeout) { this.selectorTimeout =
timeout;}
public long getSelectorTimeout() { return this.selectorTimeout; }
/**
- * The socket pollers.
+ * The socket poller.
*/
- private Poller[] pollers = null;
- private AtomicInteger pollerRotater = new AtomicInteger(0);
- /**
- * Return an available poller in true round robin fashion.
- *
- * @return The next poller in sequence
- */
- public Poller getPoller0() {
- if (pollerThreadCount == 1) {
- return pollers[0];
- } else {
- int idx = Math.abs(pollerRotater.incrementAndGet()) %
pollers.length;
- return pollers[idx];
- }
- }
+ private Poller poller = null;
public void setSelectorPool(NioSelectorPool selectorPool) {
@@ -200,14 +197,10 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
* for the next request to be received on the socket
*/
public int getKeepAliveCount() {
- if (pollers == null) {
+ if (poller == null) {
return 0;
} else {
- int sum = 0;
- for (int i = 0; i < pollers.length; i++) {
- sum += pollers[i].getKeyCount();
- }
- return sum;
+ return poller.getKeyCount();
}
}
@@ -221,16 +214,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
public void bind() throws Exception {
initServerSocket();
- // Initialize thread count defaults for acceptor, poller
- if (acceptorThreadCount == 0) {
- // FIXME: Doesn't seem to work that well with multiple accept
threads
- acceptorThreadCount = 1;
- }
- if (pollerThreadCount <= 0) {
- //minimum one poller thread
- pollerThreadCount = 1;
- }
- setStopLatch(new CountDownLatch(pollerThreadCount));
+ setStopLatch(new CountDownLatch(1));
// Initialize SSL if needed
initialiseSsl();
@@ -290,15 +274,12 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
initializeConnectionLatch();
- // Start poller threads
- pollers = new Poller[getPollerThreadCount()];
- for (int i = 0; i < pollers.length; i++) {
- pollers[i] = new Poller();
- Thread pollerThread = new Thread(pollers[i], getName() +
"-ClientPoller-" + i);
- pollerThread.setPriority(threadPriority);
- pollerThread.setDaemon(true);
- pollerThread.start();
- }
+ // Start poller thread
+ poller = new Poller();
+ Thread pollerThread = new Thread(poller, getName() +
"-ClientPoller");
+ pollerThread.setPriority(threadPriority);
+ pollerThread.setDaemon(true);
+ pollerThread.start();
startAcceptorThread();
}
@@ -315,12 +296,9 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
}
if (running) {
running = false;
- for (int i = 0; pollers != null && i < pollers.length; i++) {
- if (pollers[i] == null) {
- continue;
- }
- pollers[i].destroy();
- pollers[i] = null;
+ if (poller != null) {
+ poller.destroy();
+ poller = null;
}
try {
if (!getStopLatch().await(selectorTimeout + 100,
TimeUnit.MILLISECONDS)) {
@@ -433,7 +411,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
channel.setIOChannel(socket);
channel.reset();
}
- getPoller0().register(channel);
+ poller.register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
@@ -495,7 +473,8 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
private void close(NioChannel socket, SelectionKey key) {
try {
- if (socket.getPoller().cancelledKey(key) != null) {
+ Poller poller = this.poller;
+ if (poller != null && poller.cancelledKey(key) != null) {
// SocketWrapper (attachment) was removed from the
// key - recycle the key. This can only happen once
// per attempted closure so it is used to determine
@@ -525,33 +504,30 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
private NioChannel socket;
private int interestOps;
- private NioSocketWrapper socketWrapper;
- public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
- reset(ch, w, intOps);
+ public PollerEvent(NioChannel ch, int intOps) {
+ reset(ch, intOps);
}
- public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
+ public void reset(NioChannel ch, int intOps) {
socket = ch;
interestOps = intOps;
- socketWrapper = w;
}
public void reset() {
- reset(null, null, 0);
+ reset(null, 0);
}
@Override
public void run() {
if (interestOps == OP_REGISTER) {
try {
- socket.getIOChannel().register(
- socket.getPoller().getSelector(),
SelectionKey.OP_READ, socketWrapper);
+
socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(),
SelectionKey.OP_READ, socket.getSocketWrapper());
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
- final SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ final SelectionKey key =
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
@@ -571,12 +547,12 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
- socket.getPoller().cancelledKey(key);
+
socket.getSocketWrapper().getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
- socket.getPoller().cancelledKey(key);
+
socket.getSocketWrapper().getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
@@ -584,7 +560,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
@Override
public String toString() {
- return "Poller event: socket [" + socket + "], socketWrapper [" +
socketWrapper +
+ return "Poller event: socket [" + socket + "], socketWrapper [" +
socket.getSocketWrapper() +
"], interestOps [" + interestOps + "]";
}
}
@@ -638,23 +614,22 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
* of time equal to pollTime (in most cases, latency will be much
lower,
* however).
*
- * @param socket to add to the poller
+ * @param socketWrapper to add to the poller
* @param interestOps Operations for which to register this socket with
* the Poller
*/
- public void add(final NioChannel socket, final int interestOps) {
+ public void add(NioSocketWrapper socketWrapper, int interestOps) {
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
- r = new PollerEvent(socket, null, interestOps);
+ r = new PollerEvent(socketWrapper.getSocket(), interestOps);
} else {
- r.reset(socket, null, interestOps);
+ r.reset(socketWrapper.getSocket(), interestOps);
}
addEvent(r);
if (close) {
- NioSocketWrapper socketWrapper = (NioSocketWrapper)
socket.getAttachment();
processSocket(socketWrapper, SocketEvent.STOP, false);
}
}
@@ -691,7 +666,6 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
* @param socket The newly created socket
*/
public void register(final NioChannel socket) {
- socket.setPoller(this);
NioSocketWrapper socketWrapper = new NioSocketWrapper(socket,
NioEndpoint.this);
socket.setSocketWrapper(socketWrapper);
socketWrapper.setPoller(this);
@@ -705,9 +679,9 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
r = eventCache.pop();
}
if (r == null) {
- r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
+ r = new PollerEvent(socket, OP_REGISTER);
} else {
- r.reset(socket, socketWrapper, OP_REGISTER);
+ r.reset(socket, OP_REGISTER);
}
addEvent(r);
}
@@ -943,7 +917,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
log.debug("OP_WRITE for sendfile: " + sd.fileName);
}
if (calledByProcessor) {
- add(socketWrapper.getSocket(), SelectionKey.OP_WRITE);
+ add(socketWrapper, SelectionKey.OP_WRITE);
} else {
reg(sk, socketWrapper, SelectionKey.OP_WRITE);
}
@@ -1290,11 +1264,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
// Ignore
}
try {
- NioSocketWrapper socketWrapper = (NioSocketWrapper)
channel.getAttachment();
- if (socketWrapper == null) {
- throw new
IOException(sm.getString("endpoint.nio.keyMustBeCancelled"));
- }
- nRead = pool.read(to, channel, selector,
socketWrapper.getReadTimeout());
+ nRead = pool.read(to, channel, selector, getReadTimeout());
} finally {
if (selector != null) {
pool.put(selector);
@@ -1350,13 +1320,13 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
@Override
public void registerReadInterest() {
- getPoller().add(getSocket(), SelectionKey.OP_READ);
+ getPoller().add(this, SelectionKey.OP_READ);
}
@Override
public void registerWriteInterest() {
- getPoller().add(getSocket(), SelectionKey.OP_WRITE);
+ getPoller().add(this, SelectionKey.OP_WRITE);
}
@@ -1369,10 +1339,9 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
@Override
public SendfileState processSendfile(SendfileDataBase sendfileData) {
setSendfileData((SendfileData) sendfileData);
- SelectionKey key = getSocket().getIOChannel().keyFor(
- getSocket().getPoller().getSelector());
+ SelectionKey key =
getSocket().getIOChannel().keyFor(getPoller().getSelector());
// Might as well do the first write on this thread
- return getSocket().getPoller().processSendfile(key, this, true);
+ return getPoller().processSendfile(key, this, true);
}
@@ -1810,7 +1779,7 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
- SelectionKey key =
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+ SelectionKey key =
socket.getIOChannel().keyFor(socket.getSocketWrapper().getPoller().getSelector());
try {
int handshake = -1;
@@ -1863,12 +1832,12 @@ public class NioEndpoint extends
AbstractJsseEndpoint<NioChannel,SocketChannel>
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
- socket.getPoller().cancelledKey(key);
+ poller.cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
- socket.getPoller().cancelledKey(key);
+ poller.cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 1084c95..f81c36e 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -81,8 +81,12 @@
certain microbenchmarks. (remm)
</fix>
<fix>
- Avoid possible NPEs in on connector stop. (remm)
+ Avoid possible NPEs on connector stop. (remm)
</fix>
+ <update>
+ Remove <code>pollerThreadCount</code> Connector attribute for NIO,
+ one poller thread is sufficient. (remm)
+ </update>
</changelog>
</subsection>
<subsection name="Other">
diff --git a/webapps/docs/config/http.xml b/webapps/docs/config/http.xml
index d028861..ce1b6cf 100644
--- a/webapps/docs/config/http.xml
+++ b/webapps/docs/config/http.xml
@@ -725,18 +725,6 @@
<attributes>
- <attribute name="pollerThreadCount" required="false">
- <p>(int)The number of threads to be used to run for the polling events.
- Default value is <code>1</code>.<br/>
- When accepting a socket, the operating system holds a global lock. So
the benefit of
- going above 2 threads diminishes rapidly. Having more than one thread
is for
- system that need to accept connections very rapidly. However usually
just
- increasing <code>acceptCount</code> will solve that problem.
- Increasing this value may also be beneficial when a large amount of
send file
- operations are going on.
- </p>
- </attribute>
-
<attribute name="pollerThreadPriority" required="false">
<p>(int)The priority of the poller threads.
The default value is <code>5</code> (the value of the
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]