This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/9.0.x by this push:
     new cf3ffde893 Refactor to remove syncs on SocketWrapper to support Loom 
experiments
cf3ffde893 is described below

commit cf3ffde89312ff012ef15d7ae6c785e257115b6f
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon May 23 17:27:24 2022 +0100

    Refactor to remove syncs on SocketWrapper to support Loom experiments
---
 java/org/apache/coyote/AbstractProcessor.java      |  7 ++-
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 73 +++++++++++++++-------
 .../tomcat/util/net/SocketProcessorBase.java       |  7 ++-
 .../apache/tomcat/util/net/SocketWrapperBase.java  |  7 +++
 webapps/docs/changelog.xml                         |  5 ++
 5 files changed, 76 insertions(+), 23 deletions(-)

diff --git a/java/org/apache/coyote/AbstractProcessor.java 
b/java/org/apache/coyote/AbstractProcessor.java
index a1131df94c..a49bc921cf 100644
--- a/java/org/apache/coyote/AbstractProcessor.java
+++ b/java/org/apache/coyote/AbstractProcessor.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 
 import javax.servlet.RequestDispatcher;
 
@@ -863,7 +864,9 @@ public abstract class AbstractProcessor extends 
AbstractProcessorLight implement
         SocketWrapperBase<?> socketWrapper = getSocketWrapper();
         Iterator<DispatchType> dispatches = getIteratorAndClearDispatches();
         if (socketWrapper != null) {
-            synchronized (socketWrapper) {
+            Lock lock = socketWrapper.getLock();
+            lock.lock();
+            try {
                 /*
                  * This method is called when non-blocking IO is initiated by 
defining a read and/or write listener in a
                  * non-container thread. It is called once the non-container 
thread completes so that the first calls to
@@ -881,6 +884,8 @@ public abstract class AbstractProcessor extends 
AbstractProcessorLight implement
                     DispatchType dispatchType = dispatches.next();
                     
socketWrapper.processSocket(dispatchType.getSocketStatus(), false);
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 1a99048395..6aa07c7066 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -330,12 +330,15 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         try {
             switch (status) {
                 case OPEN_READ:
-                    synchronized (socketWrapper) {
+                    socketWrapper.getLock().lock();
+                    try {
                         if (socketWrapper.canWrite()) {
                             // Only send a ping if there is no other data 
waiting to be sent.
                             // Ping manager will ensure they aren't sent too 
frequently.
                             pingManager.sendPing(false);
                         }
+                    } finally {
+                        socketWrapper.getLock().unlock();
                     }
                     try {
                         // Disable the connection timeout while frames are 
processed
@@ -582,7 +585,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
         // may see out of order RST frames which may hard to follow if
         // the client is unaware the RST frames may be received out of
         // order.
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             if (state != null) {
                 boolean active = state.isActive();
                 state.sendReset();
@@ -592,6 +596,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
             }
             socketWrapper.write(true, rstFrame, 0, rstFrame.length);
             socketWrapper.flush(true);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
@@ -674,7 +680,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
         byte[] payloadLength = new byte[3];
         ByteUtil.setThreeBytes(payloadLength, 0, len);
 
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             socketWrapper.write(true, payloadLength, 0, payloadLength.length);
             socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
             socketWrapper.write(true, fixedPayload, 0, 8);
@@ -682,14 +689,19 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
                 socketWrapper.write(true, debugMsg, 0, debugMsg.length);
             }
             socketWrapper.flush(true);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
     void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders 
mimeHeaders, boolean endOfStream, int payloadSize)
             throws IOException {
         // This ensures the Stream processing thread has control of the socket.
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, 
payloadSize);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
         stream.sentHeaders();
         if (endOfStream) {
@@ -802,17 +814,18 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         }
         if (writable) {
             ByteUtil.set31Bits(header, 5, stream.getIdAsInt());
-            synchronized (socketWrapper) {
-                try {
-                    socketWrapper.write(true, header, 0, header.length);
-                    int orgLimit = data.limit();
-                    data.limit(data.position() + len);
-                    socketWrapper.write(true, data);
-                    data.limit(orgLimit);
-                    socketWrapper.flush(true);
-                } catch (IOException ioe) {
-                    handleAppInitiatedIOException(ioe);
-                }
+            socketWrapper.getLock().lock();
+            try {
+                socketWrapper.write(true, header, 0, header.length);
+                int orgLimit = data.limit();
+                data.limit(data.position() + len);
+                socketWrapper.write(true, data);
+                data.limit(orgLimit);
+                socketWrapper.flush(true);
+            } catch (IOException ioe) {
+                handleAppInitiatedIOException(ioe);
+            } finally {
+                socketWrapper.getLock().unlock();
             }
         }
     }
@@ -848,7 +861,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
             log.debug(sm.getString("upgradeHandler.windowUpdateConnection", 
getConnectionId(),
                     Integer.valueOf(increment)));
         }
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             // Build window update frame for stream 0
             byte[] frame = new byte[13];
             ByteUtil.setThreeBytes(frame, 0, 4);
@@ -883,12 +897,15 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             if (needFlush) {
                 socketWrapper.flush(true);
             }
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
 
     protected void processWrites() throws IOException {
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             if (socketWrapper.flush(false)) {
                 socketWrapper.registerWriteInterest();
             } else {
@@ -896,6 +913,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
                 // Ping manager will ensure they aren't sent too frequently.
                 pingManager.sendPing(false);
             }
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
@@ -1412,10 +1431,13 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         // Synchronized since PUSH_PROMISE frames have to be sent in order. 
Once
         // the stream has been created we need to ensure that the PUSH_PROMISE
         // is sent before the next stream is created for a PUSH_PROMISE.
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             pushStream = createLocalStream(request);
             writeHeaders(associatedStream, pushStream.getIdAsInt(), 
request.getMimeHeaders(), false,
                     Constants.DEFAULT_HEADERS_FRAME_SIZE);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
 
         pushStream.sentPushPromise();
@@ -1804,9 +1826,12 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
                 log.warn(sm.getString("upgradeHandler.unexpectedAck", 
connectionId, getIdAsString()));
             }
         } else {
-            synchronized (socketWrapper) {
+            socketWrapper.getLock().lock();
+            try {
                 socketWrapper.write(true, SETTINGS_ACK, 0, 
SETTINGS_ACK.length);
                 socketWrapper.flush(true);
+            } finally {
+                socketWrapper.getLock().unlock();
             }
         }
     }
@@ -1922,7 +1947,8 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             if (force || now - lastPingNanoTime > pingIntervalNano) {
                 lastPingNanoTime = now;
                 byte[] payload = new byte[8];
-                synchronized (socketWrapper) {
+                socketWrapper.getLock().lock();
+                try {
                     int sentSequence = ++sequence;
                     PingRecord pingRecord = new PingRecord(sentSequence, now);
                     inflightPings.add(pingRecord);
@@ -1930,6 +1956,8 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
                     socketWrapper.write(true, PING, 0, PING.length);
                     socketWrapper.write(true, payload, 0, payload.length);
                     socketWrapper.flush(true);
+                } finally {
+                    socketWrapper.getLock().lock();
                 }
             }
         }
@@ -1959,10 +1987,13 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
             } else {
                 // Client originated ping. Echo it back.
-                synchronized (socketWrapper) {
+                socketWrapper.getLock().lock();
+                try {
                     socketWrapper.write(true, PING_ACK, 0, PING_ACK.length);
                     socketWrapper.write(true, payload, 0, payload.length);
                     socketWrapper.flush(true);
+                } finally {
+                    socketWrapper.getLock().lock();
                 }
             }
         }
diff --git a/java/org/apache/tomcat/util/net/SocketProcessorBase.java 
b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
index 1207ab0e20..138da2e7b1 100644
--- a/java/org/apache/tomcat/util/net/SocketProcessorBase.java
+++ b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
@@ -17,6 +17,7 @@
 package org.apache.tomcat.util.net;
 
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
 
 public abstract class SocketProcessorBase<S> implements Runnable {
 
@@ -37,7 +38,9 @@ public abstract class SocketProcessorBase<S> implements 
Runnable {
 
     @Override
     public final void run() {
-        synchronized (socketWrapper) {
+        Lock lock = socketWrapper.getLock();
+        lock.lock();
+        try {
             // It is possible that processing may be triggered for read and
             // write at the same time. The sync above makes sure that 
processing
             // does not occur in parallel. The test below ensures that if the
@@ -47,6 +50,8 @@ public abstract class SocketProcessorBase<S> implements 
Runnable {
                 return;
             }
             doRun();
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index fe571d6f9c..5e1491bf24 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -30,6 +30,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -44,6 +46,7 @@ public abstract class SocketWrapperBase<E> {
 
     private E socket;
     private final AbstractEndpoint<E,?> endpoint;
+    private final Lock lock = new ReentrantLock();
 
     protected final AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -138,6 +141,10 @@ public abstract class SocketWrapperBase<E> {
         return endpoint;
     }
 
+    public Lock getLock() {
+        return lock;
+    }
+
     public Object getCurrentProcessor() {
         return currentProcessor.get();
     }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 625a3ceb08..970e81ddd3 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -195,6 +195,11 @@
         <code>ERR_HTTP2_SERVER_REFUSED_STREAM</code> for some connections.
         (markt)
       </fix>
+      <scode>
+        Refactor synchronization blocks locking on <code>SocketWrapper</code> 
to
+        use <code>ReentrantLock</code> to support users wishing to experiment
+        with project Loom. (markt)
+      </scode>
     </changelog>
   </subsection>
   <subsection name="Jasper">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to