This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 8.5.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/8.5.x by this push: new 1fb11eb135 Refactor to remove syncs on SocketWrapper to support Loom experiments 1fb11eb135 is described below commit 1fb11eb1353bce578562d583eff6fa51b2e42dcf Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon May 23 17:27:24 2022 +0100 Refactor to remove syncs on SocketWrapper to support Loom experiments --- java/org/apache/coyote/AbstractProcessor.java | 7 ++- .../apache/coyote/http2/Http2UpgradeHandler.java | 73 +++++++++++++++------- java/org/apache/tomcat/util/net/AprEndpoint.java | 6 +- .../tomcat/util/net/SocketProcessorBase.java | 7 ++- .../apache/tomcat/util/net/SocketWrapperBase.java | 7 +++ webapps/docs/changelog.xml | 5 ++ 6 files changed, 81 insertions(+), 24 deletions(-) diff --git a/java/org/apache/coyote/AbstractProcessor.java b/java/org/apache/coyote/AbstractProcessor.java index ec3b067024..2a28d683ed 100644 --- a/java/org/apache/coyote/AbstractProcessor.java +++ b/java/org/apache/coyote/AbstractProcessor.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import javax.servlet.RequestDispatcher; @@ -856,7 +857,9 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement SocketWrapperBase<?> socketWrapper = getSocketWrapper(); Iterator<DispatchType> dispatches = getIteratorAndClearDispatches(); if (socketWrapper != null) { - synchronized (socketWrapper) { + Lock lock = socketWrapper.getLock(); + lock.lock(); + try { /* * This method is called when non-blocking IO is initiated by defining a read and/or write listener in a * non-container thread. It is called once the non-container thread completes so that the first calls to @@ -874,6 +877,8 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement DispatchType dispatchType = dispatches.next(); socketWrapper.processSocket(dispatchType.getSocketStatus(), false); } + } finally { + lock.unlock(); } } } diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index 17c53583c8..f82e47a388 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -326,12 +326,15 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH try { switch (status) { case OPEN_READ: - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { if (socketWrapper.canWrite()) { // Only send a ping if there is no other data waiting to be sent. // Ping manager will ensure they aren't sent too frequently. pingManager.sendPing(false); } + } finally { + socketWrapper.getLock().unlock(); } try { // Disable the connection timeout while frames are processed @@ -578,7 +581,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // may see out of order RST frames which may hard to follow if // the client is unaware the RST frames may be received out of // order. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { if (state != null) { boolean active = state.isActive(); state.sendReset(); @@ -588,6 +592,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } socketWrapper.write(true, rstFrame, 0, rstFrame.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } @@ -670,7 +676,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH byte[] payloadLength = new byte[3]; ByteUtil.setThreeBytes(payloadLength, 0, len); - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { socketWrapper.write(true, payloadLength, 0, payloadLength.length); socketWrapper.write(true, GOAWAY, 0, GOAWAY.length); socketWrapper.write(true, fixedPayload, 0, 8); @@ -678,14 +685,19 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH socketWrapper.write(true, debugMsg, 0, debugMsg.length); } socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException { // This ensures the Stream processing thread has control of the socket. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize); + } finally { + socketWrapper.getLock().unlock(); } stream.sentHeaders(); if (endOfStream) { @@ -799,17 +811,18 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } if (writable) { ByteUtil.set31Bits(header, 5, stream.getIdAsInt()); - synchronized (socketWrapper) { - try { - socketWrapper.write(true, header, 0, header.length); - int orgLimit = data.limit(); - data.limit(data.position() + len); - socketWrapper.write(true, data); - data.limit(orgLimit); - socketWrapper.flush(true); - } catch (IOException ioe) { - handleAppInitiatedIOException(ioe); - } + socketWrapper.getLock().lock(); + try { + socketWrapper.write(true, header, 0, header.length); + int orgLimit = data.limit(); + data.limit(data.position() + len); + socketWrapper.write(true, data); + data.limit(orgLimit); + socketWrapper.flush(true); + } catch (IOException ioe) { + handleAppInitiatedIOException(ioe); + } finally { + socketWrapper.getLock().unlock(); } } } @@ -845,7 +858,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH log.debug(sm.getString("upgradeHandler.windowUpdateConnection", getConnectionId(), Integer.valueOf(increment))); } - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { // Build window update frame for stream 0 byte[] frame = new byte[13]; ByteUtil.setThreeBytes(frame, 0, 4); @@ -880,12 +894,15 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH if (needFlush) { socketWrapper.flush(true); } + } finally { + socketWrapper.getLock().unlock(); } } private void processWrites() throws IOException { - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { if (socketWrapper.flush(false)) { socketWrapper.registerWriteInterest(); } else { @@ -893,6 +910,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Ping manager will ensure they aren't sent too frequently. pingManager.sendPing(false); } + } finally { + socketWrapper.getLock().unlock(); } } @@ -1397,10 +1416,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Synchronized since PUSH_PROMISE frames have to be sent in order. Once // the stream has been created we need to ensure that the PUSH_PROMISE // is sent before the next stream is created for a PUSH_PROMISE. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { pushStream = createLocalStream(request); writeHeaders(associatedStream, pushStream.getIdAsInt(), request.getMimeHeaders(), false, Constants.DEFAULT_HEADERS_FRAME_SIZE); + } finally { + socketWrapper.getLock().unlock(); } pushStream.sentPushPromise(); @@ -1905,9 +1927,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH log.warn(sm.getString("upgradeHandler.unexpectedAck", connectionId, getIdAsString())); } } else { - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { socketWrapper.write(true, SETTINGS_ACK, 0, SETTINGS_ACK.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } } @@ -2023,7 +2048,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH if (force || now - lastPingNanoTime > pingIntervalNano) { lastPingNanoTime = now; byte[] payload = new byte[8]; - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { int sentSequence = ++sequence; PingRecord pingRecord = new PingRecord(sentSequence, now); inflightPings.add(pingRecord); @@ -2031,6 +2057,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH socketWrapper.write(true, PING, 0, PING.length); socketWrapper.write(true, payload, 0, payload.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } } @@ -2060,10 +2088,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } else { // Client originated ping. Echo it back. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { socketWrapper.write(true, PING_ACK, 0, PING_ACK.length); socketWrapper.write(true, payload, 0, payload.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } } diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index 588cefdc4c..1d61bee3db 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -2039,7 +2039,9 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB @Override public void run() { - synchronized (socket) { + Lock lock = socket.getLock(); + lock.lock(); + try { if (!deferAccept) { if (setSocketOptions(socket)) { getPoller().add(socket.getSocket().longValue(), @@ -2067,6 +2069,8 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB socket = null; } } + } finally { + lock.unlock(); } } } diff --git a/java/org/apache/tomcat/util/net/SocketProcessorBase.java b/java/org/apache/tomcat/util/net/SocketProcessorBase.java index 1207ab0e20..138da2e7b1 100644 --- a/java/org/apache/tomcat/util/net/SocketProcessorBase.java +++ b/java/org/apache/tomcat/util/net/SocketProcessorBase.java @@ -17,6 +17,7 @@ package org.apache.tomcat.util.net; import java.util.Objects; +import java.util.concurrent.locks.Lock; public abstract class SocketProcessorBase<S> implements Runnable { @@ -37,7 +38,9 @@ public abstract class SocketProcessorBase<S> implements Runnable { @Override public final void run() { - synchronized (socketWrapper) { + Lock lock = socketWrapper.getLock(); + lock.lock(); + try { // It is possible that processing may be triggered for read and // write at the same time. The sync above makes sure that processing // does not occur in parallel. The test below ensures that if the @@ -47,6 +50,8 @@ public abstract class SocketProcessorBase<S> implements Runnable { return; } doRun(); + } finally { + lock.unlock(); } } diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java index 743ffd3fc7..c8f503185c 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java +++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java @@ -30,6 +30,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -44,6 +46,7 @@ public abstract class SocketWrapperBase<E> { private E socket; private final AbstractEndpoint<E,?> endpoint; + private final Lock lock = new ReentrantLock(); protected final AtomicBoolean closed = new AtomicBoolean(false); @@ -138,6 +141,10 @@ public abstract class SocketWrapperBase<E> { return endpoint; } + public Lock getLock() { + return lock; + } + public Object getCurrentProcessor() { return currentProcessor.get(); } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index cac423a1ef..041e3d7e55 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -195,6 +195,11 @@ <code>ERR_HTTP2_SERVER_REFUSED_STREAM</code> for some connections. (markt) </fix> + <scode> + Refactor synchronization blocks locking on <code>SocketWrapper</code> to + use <code>ReentrantLock</code> to support users wishing to experiment + with project Loom. (markt) + </scode> </changelog> </subsection> <subsection name="Jasper"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org