This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push: new 0a94801588 Refactor to remove syncs on SocketWrapper to support Loom experiments 0a94801588 is described below commit 0a9480158874ea910a4d629d24f31d69d6cc5f96 Author: Mark Thomas <ma...@apache.org> AuthorDate: Mon May 23 17:27:24 2022 +0100 Refactor to remove syncs on SocketWrapper to support Loom experiments --- java/org/apache/coyote/AbstractProcessor.java | 7 ++- .../apache/coyote/http2/Http2UpgradeHandler.java | 73 +++++++++++++++------- .../tomcat/util/net/SocketProcessorBase.java | 7 ++- .../apache/tomcat/util/net/SocketWrapperBase.java | 7 +++ webapps/docs/changelog.xml | 9 +++ 5 files changed, 80 insertions(+), 23 deletions(-) diff --git a/java/org/apache/coyote/AbstractProcessor.java b/java/org/apache/coyote/AbstractProcessor.java index 699a935eb6..50743ffece 100644 --- a/java/org/apache/coyote/AbstractProcessor.java +++ b/java/org/apache/coyote/AbstractProcessor.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import jakarta.servlet.RequestDispatcher; import jakarta.servlet.ServletConnection; @@ -865,7 +866,9 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement SocketWrapperBase<?> socketWrapper = getSocketWrapper(); Iterator<DispatchType> dispatches = getIteratorAndClearDispatches(); if (socketWrapper != null) { - synchronized (socketWrapper) { + Lock lock = socketWrapper.getLock(); + lock.lock(); + try { /* * This method is called when non-blocking IO is initiated by defining * a read and/or write listener in a non-container thread. It is called @@ -888,6 +891,8 @@ public abstract class AbstractProcessor extends AbstractProcessorLight implement DispatchType dispatchType = dispatches.next(); socketWrapper.processSocket(dispatchType.getSocketStatus(), false); } + } finally { + lock.unlock(); } } } diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java b/java/org/apache/coyote/http2/Http2UpgradeHandler.java index 64ad129b40..c143149c3c 100644 --- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java +++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java @@ -335,12 +335,15 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH try { switch(status) { case OPEN_READ: - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { if (!socketWrapper.canWrite()) { // Only send a ping if there is no other data waiting to be sent. // Ping manager will ensure they aren't sent too frequently. pingManager.sendPing(false); } + } finally { + socketWrapper.getLock().unlock(); } try { // There is data to read so use the read timeout while @@ -567,12 +570,15 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // may see out of order RST frames which may hard to follow if // the client is unaware the RST frames may be received out of // order. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { if (state != null) { state.sendReset(); } socketWrapper.write(true, rstFrame, 0, rstFrame.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } @@ -658,7 +664,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH byte[] payloadLength = new byte[3]; ByteUtil.setThreeBytes(payloadLength, 0, len); - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { socketWrapper.write(true, payloadLength, 0, payloadLength.length); socketWrapper.write(true, GOAWAY, 0, GOAWAY.length); socketWrapper.write(true, fixedPayload, 0, 8); @@ -666,14 +673,19 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH socketWrapper.write(true, debugMsg, 0, debugMsg.length); } socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) throws IOException { // This ensures the Stream processing thread has control of the socket. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize); + } finally { + socketWrapper.getLock().unlock(); } stream.sentHeaders(); if (endOfStream) { @@ -790,17 +802,18 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } if (writable) { ByteUtil.set31Bits(header, 5, stream.getIdAsInt()); - synchronized (socketWrapper) { - try { - socketWrapper.write(true, header, 0, header.length); - int orgLimit = data.limit(); - data.limit(data.position() + len); - socketWrapper.write(true, data); - data.limit(orgLimit); - socketWrapper.flush(true); - } catch (IOException ioe) { - handleAppInitiatedIOException(ioe); - } + socketWrapper.getLock().lock(); + try { + socketWrapper.write(true, header, 0, header.length); + int orgLimit = data.limit(); + data.limit(data.position() + len); + socketWrapper.write(true, data); + data.limit(orgLimit); + socketWrapper.flush(true); + } catch (IOException ioe) { + handleAppInitiatedIOException(ioe); + } finally { + socketWrapper.getLock().unlock(); } } } @@ -832,7 +845,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH log.debug(sm.getString("upgradeHandler.windowUpdateConnection", getConnectionId(), Integer.valueOf(increment))); } - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { // Build window update frame for stream 0 byte[] frame = new byte[13]; ByteUtil.setThreeBytes(frame, 0, 4); @@ -867,12 +881,15 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH if (needFlush) { socketWrapper.flush(true); } + } finally { + socketWrapper.getLock().unlock(); } } protected void processWrites() throws IOException { - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { if (socketWrapper.flush(false)) { socketWrapper.registerWriteInterest(); } else { @@ -880,6 +897,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Ping manager will ensure they aren't sent too frequently. pingManager.sendPing(false); } + } finally { + socketWrapper.getLock().unlock(); } } @@ -1400,10 +1419,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH // Synchronized since PUSH_PROMISE frames have to be sent in order. Once // the stream has been created we need to ensure that the PUSH_PROMISE // is sent before the next stream is created for a PUSH_PROMISE. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { pushStream = createLocalStream(request); writeHeaders(associatedStream, pushStream.getIdAsInt(), request.getMimeHeaders(), false, Constants.DEFAULT_HEADERS_FRAME_SIZE); + } finally { + socketWrapper.getLock().unlock(); } pushStream.sentPushPromise(); @@ -1784,9 +1806,12 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH "upgradeHandler.unexpectedAck", connectionId, getIdAsString())); } } else { - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { socketWrapper.write(true, SETTINGS_ACK, 0, SETTINGS_ACK.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().unlock(); } } } @@ -1911,7 +1936,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH if (force || now - lastPingNanoTime > pingIntervalNano) { lastPingNanoTime = now; byte[] payload = new byte[8]; - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { int sentSequence = ++sequence; PingRecord pingRecord = new PingRecord(sentSequence, now); inflightPings.add(pingRecord); @@ -1919,6 +1945,8 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH socketWrapper.write(true, PING, 0, PING.length); socketWrapper.write(true, payload, 0, payload.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().lock(); } } } @@ -1949,10 +1977,13 @@ class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeH } else { // Client originated ping. Echo it back. - synchronized (socketWrapper) { + socketWrapper.getLock().lock(); + try { socketWrapper.write(true, PING_ACK, 0, PING_ACK.length); socketWrapper.write(true, payload, 0, payload.length); socketWrapper.flush(true); + } finally { + socketWrapper.getLock().lock(); } } } diff --git a/java/org/apache/tomcat/util/net/SocketProcessorBase.java b/java/org/apache/tomcat/util/net/SocketProcessorBase.java index 1207ab0e20..138da2e7b1 100644 --- a/java/org/apache/tomcat/util/net/SocketProcessorBase.java +++ b/java/org/apache/tomcat/util/net/SocketProcessorBase.java @@ -17,6 +17,7 @@ package org.apache.tomcat.util.net; import java.util.Objects; +import java.util.concurrent.locks.Lock; public abstract class SocketProcessorBase<S> implements Runnable { @@ -37,7 +38,9 @@ public abstract class SocketProcessorBase<S> implements Runnable { @Override public final void run() { - synchronized (socketWrapper) { + Lock lock = socketWrapper.getLock(); + lock.lock(); + try { // It is possible that processing may be triggered for read and // write at the same time. The sync above makes sure that processing // does not occur in parallel. The test below ensures that if the @@ -47,6 +50,8 @@ public abstract class SocketProcessorBase<S> implements Runnable { return; } doRun(); + } finally { + lock.unlock(); } } diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java index e32525ba0d..e671f632f8 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java +++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java @@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import jakarta.servlet.ServletConnection; @@ -59,6 +61,7 @@ public abstract class SocketWrapperBase<E> { private E socket; private final AbstractEndpoint<E,?> endpoint; + private final Lock lock = new ReentrantLock(); protected final AtomicBoolean closed = new AtomicBoolean(false); @@ -155,6 +158,10 @@ public abstract class SocketWrapperBase<E> { return endpoint; } + public Lock getLock() { + return lock; + } + public Object getCurrentProcessor() { return currentProcessor.get(); } diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 747b252b3c..fd61fd1f1d 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -126,6 +126,15 @@ </fix> </changelog> </subsection> + <subsection name="Coyote"> + <changelog> + <scode> + Refactor synchronization blocks locking on <code>SocketWrapper</code> to + use <code>ReentrantLock</code> to support users wishing to experiment + with project Loom. (markt) + </scode> + </changelog> + </subsection> <subsection name="Jasper"> <changelog> <fix> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org