This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a94801588 Refactor to remove syncs on SocketWrapper to support Loom 
experiments
0a94801588 is described below

commit 0a9480158874ea910a4d629d24f31d69d6cc5f96
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Mon May 23 17:27:24 2022 +0100

    Refactor to remove syncs on SocketWrapper to support Loom experiments
---
 java/org/apache/coyote/AbstractProcessor.java      |  7 ++-
 .../apache/coyote/http2/Http2UpgradeHandler.java   | 73 +++++++++++++++-------
 .../tomcat/util/net/SocketProcessorBase.java       |  7 ++-
 .../apache/tomcat/util/net/SocketWrapperBase.java  |  7 +++
 webapps/docs/changelog.xml                         |  9 +++
 5 files changed, 80 insertions(+), 23 deletions(-)

diff --git a/java/org/apache/coyote/AbstractProcessor.java 
b/java/org/apache/coyote/AbstractProcessor.java
index 699a935eb6..50743ffece 100644
--- a/java/org/apache/coyote/AbstractProcessor.java
+++ b/java/org/apache/coyote/AbstractProcessor.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 
 import jakarta.servlet.RequestDispatcher;
 import jakarta.servlet.ServletConnection;
@@ -865,7 +866,9 @@ public abstract class AbstractProcessor extends 
AbstractProcessorLight implement
         SocketWrapperBase<?> socketWrapper = getSocketWrapper();
         Iterator<DispatchType> dispatches = getIteratorAndClearDispatches();
         if (socketWrapper != null) {
-            synchronized (socketWrapper) {
+            Lock lock = socketWrapper.getLock();
+            lock.lock();
+            try {
                 /*
                  * This method is called when non-blocking IO is initiated by 
defining
                  * a read and/or write listener in a non-container thread. It 
is called
@@ -888,6 +891,8 @@ public abstract class AbstractProcessor extends 
AbstractProcessorLight implement
                     DispatchType dispatchType = dispatches.next();
                     
socketWrapper.processSocket(dispatchType.getSocketStatus(), false);
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 64ad129b40..c143149c3c 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -335,12 +335,15 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         try {
             switch(status) {
             case OPEN_READ:
-                synchronized (socketWrapper) {
+                socketWrapper.getLock().lock();
+                try {
                     if (!socketWrapper.canWrite()) {
                         // Only send a ping if there is no other data waiting 
to be sent.
                         // Ping manager will ensure they aren't sent too 
frequently.
                         pingManager.sendPing(false);
                     }
+                } finally {
+                    socketWrapper.getLock().unlock();
                 }
                 try {
                     // There is data to read so use the read timeout while
@@ -567,12 +570,15 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         // may see out of order RST frames which may hard to follow if
         // the client is unaware the RST frames may be received out of
         // order.
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             if (state != null) {
                 state.sendReset();
             }
             socketWrapper.write(true, rstFrame, 0, rstFrame.length);
             socketWrapper.flush(true);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
@@ -658,7 +664,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
         byte[] payloadLength = new byte[3];
         ByteUtil.setThreeBytes(payloadLength, 0, len);
 
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             socketWrapper.write(true, payloadLength, 0, payloadLength.length);
             socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
             socketWrapper.write(true, fixedPayload, 0, 8);
@@ -666,14 +673,19 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
                 socketWrapper.write(true, debugMsg, 0, debugMsg.length);
             }
             socketWrapper.flush(true);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
     void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders 
mimeHeaders,
             boolean endOfStream, int payloadSize) throws IOException {
         // This ensures the Stream processing thread has control of the socket.
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, 
payloadSize);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
         stream.sentHeaders();
         if (endOfStream) {
@@ -790,17 +802,18 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         }
         if (writable) {
             ByteUtil.set31Bits(header, 5, stream.getIdAsInt());
-            synchronized (socketWrapper) {
-                try {
-                    socketWrapper.write(true, header, 0, header.length);
-                    int orgLimit = data.limit();
-                    data.limit(data.position() + len);
-                    socketWrapper.write(true, data);
-                    data.limit(orgLimit);
-                    socketWrapper.flush(true);
-                } catch (IOException ioe) {
-                    handleAppInitiatedIOException(ioe);
-                }
+            socketWrapper.getLock().lock();
+            try {
+                socketWrapper.write(true, header, 0, header.length);
+                int orgLimit = data.limit();
+                data.limit(data.position() + len);
+                socketWrapper.write(true, data);
+                data.limit(orgLimit);
+                socketWrapper.flush(true);
+            } catch (IOException ioe) {
+                handleAppInitiatedIOException(ioe);
+            } finally {
+                socketWrapper.getLock().unlock();
             }
         }
     }
@@ -832,7 +845,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
             log.debug(sm.getString("upgradeHandler.windowUpdateConnection",
                     getConnectionId(), Integer.valueOf(increment)));
         }
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             // Build window update frame for stream 0
             byte[] frame = new byte[13];
             ByteUtil.setThreeBytes(frame, 0,  4);
@@ -867,12 +881,15 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             if (needFlush) {
                 socketWrapper.flush(true);
             }
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
 
     protected void processWrites() throws IOException {
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             if (socketWrapper.flush(false)) {
                 socketWrapper.registerWriteInterest();
             } else {
@@ -880,6 +897,8 @@ class Http2UpgradeHandler extends AbstractStream implements 
InternalHttpUpgradeH
                 // Ping manager will ensure they aren't sent too frequently.
                 pingManager.sendPing(false);
             }
+        } finally {
+            socketWrapper.getLock().unlock();
         }
     }
 
@@ -1400,10 +1419,13 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
         // Synchronized since PUSH_PROMISE frames have to be sent in order. 
Once
         // the stream has been created we need to ensure that the PUSH_PROMISE
         // is sent before the next stream is created for a PUSH_PROMISE.
-        synchronized (socketWrapper) {
+        socketWrapper.getLock().lock();
+        try {
             pushStream = createLocalStream(request);
             writeHeaders(associatedStream, pushStream.getIdAsInt(), 
request.getMimeHeaders(),
                     false, Constants.DEFAULT_HEADERS_FRAME_SIZE);
+        } finally {
+            socketWrapper.getLock().unlock();
         }
 
         pushStream.sentPushPromise();
@@ -1784,9 +1806,12 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
                         "upgradeHandler.unexpectedAck", connectionId, 
getIdAsString()));
             }
         } else {
-            synchronized (socketWrapper) {
+            socketWrapper.getLock().lock();
+            try {
                 socketWrapper.write(true, SETTINGS_ACK, 0, 
SETTINGS_ACK.length);
                 socketWrapper.flush(true);
+            } finally {
+                socketWrapper.getLock().unlock();
             }
         }
     }
@@ -1911,7 +1936,8 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
             if (force || now - lastPingNanoTime > pingIntervalNano) {
                 lastPingNanoTime = now;
                 byte[] payload = new byte[8];
-                synchronized (socketWrapper) {
+                socketWrapper.getLock().lock();
+                try {
                     int sentSequence = ++sequence;
                     PingRecord pingRecord = new PingRecord(sentSequence, now);
                     inflightPings.add(pingRecord);
@@ -1919,6 +1945,8 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
                     socketWrapper.write(true, PING, 0, PING.length);
                     socketWrapper.write(true, payload, 0, payload.length);
                     socketWrapper.flush(true);
+                } finally {
+                    socketWrapper.getLock().lock();
                 }
             }
         }
@@ -1949,10 +1977,13 @@ class Http2UpgradeHandler extends AbstractStream 
implements InternalHttpUpgradeH
 
             } else {
                 // Client originated ping. Echo it back.
-                synchronized (socketWrapper) {
+                socketWrapper.getLock().lock();
+                try {
                     socketWrapper.write(true, PING_ACK, 0, PING_ACK.length);
                     socketWrapper.write(true, payload, 0, payload.length);
                     socketWrapper.flush(true);
+                } finally {
+                    socketWrapper.getLock().lock();
                 }
             }
         }
diff --git a/java/org/apache/tomcat/util/net/SocketProcessorBase.java 
b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
index 1207ab0e20..138da2e7b1 100644
--- a/java/org/apache/tomcat/util/net/SocketProcessorBase.java
+++ b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
@@ -17,6 +17,7 @@
 package org.apache.tomcat.util.net;
 
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
 
 public abstract class SocketProcessorBase<S> implements Runnable {
 
@@ -37,7 +38,9 @@ public abstract class SocketProcessorBase<S> implements 
Runnable {
 
     @Override
     public final void run() {
-        synchronized (socketWrapper) {
+        Lock lock = socketWrapper.getLock();
+        lock.lock();
+        try {
             // It is possible that processing may be triggered for read and
             // write at the same time. The sync above makes sure that 
processing
             // does not occur in parallel. The test below ensures that if the
@@ -47,6 +50,8 @@ public abstract class SocketProcessorBase<S> implements 
Runnable {
                 return;
             }
             doRun();
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index e32525ba0d..e671f632f8 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import jakarta.servlet.ServletConnection;
 
@@ -59,6 +61,7 @@ public abstract class SocketWrapperBase<E> {
 
     private E socket;
     private final AbstractEndpoint<E,?> endpoint;
+    private final Lock lock = new ReentrantLock();
 
     protected final AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -155,6 +158,10 @@ public abstract class SocketWrapperBase<E> {
         return endpoint;
     }
 
+    public Lock getLock() {
+        return lock;
+    }
+
     public Object getCurrentProcessor() {
         return currentProcessor.get();
     }
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 747b252b3c..fd61fd1f1d 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -126,6 +126,15 @@
       </fix>
     </changelog>
   </subsection>
+  <subsection name="Coyote">
+    <changelog>
+      <scode>
+        Refactor synchronization blocks locking on <code>SocketWrapper</code> 
to
+        use <code>ReentrantLock</code> to support users wishing to experiment
+        with project Loom. (markt)
+      </scode>
+    </changelog>
+  </subsection>
   <subsection name="Jasper">
     <changelog>
       <fix>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to