This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 10.1.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/10.1.x by this push:
     new d48c941249 Refactor to reduce pinning in HTTP/2 code when using 
virtual threads
d48c941249 is described below

commit d48c941249d51dd486735321b84a734b5de06af0
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Wed Jul 26 12:36:15 2023 +0100

    Refactor to reduce pinning in HTTP/2 code when using virtual threads
---
 java/org/apache/coyote/http2/Stream.java | 244 +++++++++++++++++--------------
 1 file changed, 136 insertions(+), 108 deletions(-)

diff --git a/java/org/apache/coyote/http2/Stream.java 
b/java/org/apache/coyote/http2/Stream.java
index 25b10d3391..88056547fa 100644
--- a/java/org/apache/coyote/http2/Stream.java
+++ b/java/org/apache/coyote/http2/Stream.java
@@ -28,6 +28,8 @@ import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 
 import org.apache.coyote.ActionCode;
@@ -913,6 +915,7 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
 
     class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
 
+        private final Lock writeLock = new ReentrantLock();
         private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
         private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
         // Flag that indicates that data was left over on a previous
@@ -931,125 +934,145 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
          */
 
         @Override
-        public final synchronized int doWrite(ByteBuffer chunk) throws 
IOException {
-            if (closed) {
-                throw new IOException(sm.getString("stream.closed", 
getConnectionId(), getIdAsString()));
-            }
-            // chunk is always fully written
-            int result = chunk.remaining();
-            if (writeBuffer.isEmpty()) {
-                int chunkLimit = chunk.limit();
-                while (chunk.remaining() > 0) {
-                    int thisTime = Math.min(buffer.remaining(), 
chunk.remaining());
-                    chunk.limit(chunk.position() + thisTime);
-                    buffer.put(chunk);
-                    chunk.limit(chunkLimit);
-                    if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
-                        // Only flush if we have more data to write and the 
buffer
-                        // is full
-                        if (flush(true, coyoteResponse.getWriteListener() == 
null)) {
-                            writeBuffer.add(chunk);
-                            dataLeft = true;
-                            break;
+        public final int doWrite(ByteBuffer chunk) throws IOException {
+            writeLock.lock();
+            try {
+                if (closed) {
+                    throw new IOException(sm.getString("stream.closed", 
getConnectionId(), getIdAsString()));
+                }
+                // chunk is always fully written
+                int result = chunk.remaining();
+                if (writeBuffer.isEmpty()) {
+                    int chunkLimit = chunk.limit();
+                    while (chunk.remaining() > 0) {
+                        int thisTime = Math.min(buffer.remaining(), 
chunk.remaining());
+                        chunk.limit(chunk.position() + thisTime);
+                        buffer.put(chunk);
+                        chunk.limit(chunkLimit);
+                        if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
+                            // Only flush if we have more data to write and 
the buffer
+                            // is full
+                            if (flush(true, coyoteResponse.getWriteListener() 
== null)) {
+                                writeBuffer.add(chunk);
+                                dataLeft = true;
+                                break;
+                            }
                         }
                     }
+                } else {
+                    writeBuffer.add(chunk);
                 }
-            } else {
-                writeBuffer.add(chunk);
+                written += result;
+                return result;
+            } finally {
+                writeLock.unlock();
             }
-            written += result;
-            return result;
         }
 
-        final synchronized boolean flush(boolean block) throws IOException {
-            /*
-             * Need to ensure that there is exactly one call to flush even 
when there is no data to write. Too few calls
-             * (i.e. zero) and the end of stream message is not sent for a 
completed asynchronous write. Too many calls
-             * and the end of stream message is sent too soon and trailer 
headers are not sent.
-             */
-            boolean dataInBuffer = buffer.position() > 0;
-            boolean flushed = false;
-
-            if (dataInBuffer) {
-                dataInBuffer = flush(false, block);
-                flushed = true;
-            }
+        final boolean flush(boolean block) throws IOException {
+            writeLock.lock();
+            try {
+                /*
+                 * Need to ensure that there is exactly one call to flush even 
when there is no data to write. Too few calls
+                 * (i.e. zero) and the end of stream message is not sent for a 
completed asynchronous write. Too many calls
+                 * and the end of stream message is sent too soon and trailer 
headers are not sent.
+                 */
+                boolean dataInBuffer = buffer.position() > 0;
+                boolean flushed = false;
+
+                if (dataInBuffer) {
+                    dataInBuffer = flush(false, block);
+                    flushed = true;
+                }
 
-            if (dataInBuffer) {
-                dataLeft = true;
-            } else {
-                if (writeBuffer.isEmpty()) {
-                    // Both buffer and writeBuffer are empty.
-                    if (flushed) {
-                        dataLeft = false;
+                if (dataInBuffer) {
+                    dataLeft = true;
+                } else {
+                    if (writeBuffer.isEmpty()) {
+                        // Both buffer and writeBuffer are empty.
+                        if (flushed) {
+                            dataLeft = false;
+                        } else {
+                            dataLeft = flush(false, block);
+                        }
                     } else {
-                        dataLeft = flush(false, block);
+                        dataLeft = writeBuffer.write(this, block);
                     }
-                } else {
-                    dataLeft = writeBuffer.write(this, block);
                 }
-            }
 
-            return dataLeft;
+                return dataLeft;
+            } finally {
+                writeLock.unlock();
+            }
         }
 
-        private synchronized boolean flush(boolean writeInProgress, boolean 
block) throws IOException {
-            if (log.isDebugEnabled()) {
-                log.debug(sm.getString("stream.outputBuffer.flush.debug", 
getConnectionId(), getIdAsString(),
-                        Integer.toString(buffer.position()), 
Boolean.toString(writeInProgress),
-                        Boolean.toString(closed)));
-            }
-            if (buffer.position() == 0) {
-                if (closed && !endOfStreamSent) {
-                    // Handling this special case here is simpler than trying
-                    // to modify the following code to handle it.
-                    handler.writeBody(Stream.this, buffer, 0, 
coyoteResponse.getTrailerFields() == null);
+        private boolean flush(boolean writeInProgress, boolean block) throws 
IOException {
+            writeLock.lock();
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString("stream.outputBuffer.flush.debug", 
getConnectionId(), getIdAsString(),
+                            Integer.toString(buffer.position()), 
Boolean.toString(writeInProgress),
+                            Boolean.toString(closed)));
                 }
-                // Buffer is empty. Nothing to do.
-                return false;
-            }
-            buffer.flip();
-            int left = buffer.remaining();
-            while (left > 0) {
-                if (streamReservation == 0) {
-                    streamReservation = reserveWindowSize(left, block);
-                    if (streamReservation == 0) {
-                        // Must be non-blocking.
-                        // Note: Can't add to the writeBuffer here as the write
-                        // may originate from the writeBuffer.
-                        buffer.compact();
-                        return true;
+                if (buffer.position() == 0) {
+                    if (closed && !endOfStreamSent) {
+                        // Handling this special case here is simpler than 
trying
+                        // to modify the following code to handle it.
+                        handler.writeBody(Stream.this, buffer, 0, 
coyoteResponse.getTrailerFields() == null);
                     }
+                    // Buffer is empty. Nothing to do.
+                    return false;
                 }
-                while (streamReservation > 0) {
-                    int connectionReservation = 
handler.reserveWindowSize(Stream.this, streamReservation, block);
-                    if (connectionReservation == 0) {
-                        // Must be non-blocking.
-                        // Note: Can't add to the writeBuffer here as the write
-                        // may originate from the writeBuffer.
-                        buffer.compact();
-                        return true;
+                buffer.flip();
+                int left = buffer.remaining();
+                while (left > 0) {
+                    if (streamReservation == 0) {
+                        streamReservation = reserveWindowSize(left, block);
+                        if (streamReservation == 0) {
+                            // Must be non-blocking.
+                            // Note: Can't add to the writeBuffer here as the 
write
+                            // may originate from the writeBuffer.
+                            buffer.compact();
+                            return true;
+                        }
+                    }
+                    while (streamReservation > 0) {
+                        int connectionReservation = 
handler.reserveWindowSize(Stream.this, streamReservation, block);
+                        if (connectionReservation == 0) {
+                            // Must be non-blocking.
+                            // Note: Can't add to the writeBuffer here as the 
write
+                            // may originate from the writeBuffer.
+                            buffer.compact();
+                            return true;
+                        }
+                        // Do the write
+                        handler.writeBody(Stream.this, buffer, 
connectionReservation, !writeInProgress && closed &&
+                                left == connectionReservation && 
coyoteResponse.getTrailerFields() == null);
+                        streamReservation -= connectionReservation;
+                        left -= connectionReservation;
                     }
-                    // Do the write
-                    handler.writeBody(Stream.this, buffer, 
connectionReservation, !writeInProgress && closed &&
-                            left == connectionReservation && 
coyoteResponse.getTrailerFields() == null);
-                    streamReservation -= connectionReservation;
-                    left -= connectionReservation;
                 }
+                buffer.clear();
+                return false;
+            } finally {
+                writeLock.unlock();
             }
-            buffer.clear();
-            return false;
         }
 
-        final synchronized boolean isReady() {
-            // Bug 63682
-            // Only want to return false if the window size is zero AND we are
-            // already waiting for an allocation.
-            if (getWindowSize() > 0 && allocationManager.isWaitingForStream() 
||
-                    handler.getWindowSize() > 0 && 
allocationManager.isWaitingForConnection() || dataLeft) {
-                return false;
-            } else {
-                return true;
+        final boolean isReady() {
+            writeLock.lock();
+            try {
+                // Bug 63682
+                // Only want to return false if the window size is zero AND we 
are
+                // already waiting for an allocation.
+                if (getWindowSize() > 0 && 
allocationManager.isWaitingForStream() ||
+                        handler.getWindowSize() > 0 && 
allocationManager.isWaitingForConnection() || dataLeft) {
+                    return false;
+                } else {
+                    return true;
+                }
+            } finally {
+                writeLock.unlock();
             }
         }
 
@@ -1089,18 +1112,23 @@ class Stream extends AbstractNonZeroStream implements 
HeaderEmitter {
         }
 
         @Override
-        public synchronized boolean writeFromBuffer(ByteBuffer src, boolean 
blocking) throws IOException {
-            int chunkLimit = src.limit();
-            while (src.remaining() > 0) {
-                int thisTime = Math.min(buffer.remaining(), src.remaining());
-                src.limit(src.position() + thisTime);
-                buffer.put(src);
-                src.limit(chunkLimit);
-                if (flush(false, blocking)) {
-                    return true;
+        public boolean writeFromBuffer(ByteBuffer src, boolean blocking) 
throws IOException {
+            writeLock.lock();
+            try {
+                int chunkLimit = src.limit();
+                while (src.remaining() > 0) {
+                    int thisTime = Math.min(buffer.remaining(), 
src.remaining());
+                    src.limit(src.position() + thisTime);
+                    buffer.put(src);
+                    src.limit(chunkLimit);
+                    if (flush(false, blocking)) {
+                        return true;
+                    }
                 }
+                return false;
+            } finally {
+                writeLock.unlock();
             }
-            return false;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to