This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/main by this push: new 36e8fd9033 Refactor to reduce pinning in HTTP/2 code when using virtual threads 36e8fd9033 is described below commit 36e8fd9033ccc26e63667e6a75a6b213df9cfe9c Author: Mark Thomas <ma...@apache.org> AuthorDate: Wed Jul 26 12:36:15 2023 +0100 Refactor to reduce pinning in HTTP/2 code when using virtual threads --- java/org/apache/coyote/http2/Stream.java | 244 +++++++++++++++++-------------- 1 file changed, 136 insertions(+), 108 deletions(-) diff --git a/java/org/apache/coyote/http2/Stream.java b/java/org/apache/coyote/http2/Stream.java index ffd8e47a8d..d3fc2a7b4c 100644 --- a/java/org/apache/coyote/http2/Stream.java +++ b/java/org/apache/coyote/http2/Stream.java @@ -25,6 +25,8 @@ import java.util.HashSet; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.apache.coyote.ActionCode; @@ -870,6 +872,7 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink { + private final Lock writeLock = new ReentrantLock(); private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024); // Flag that indicates that data was left over on a previous @@ -888,125 +891,145 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { */ @Override - public final synchronized int doWrite(ByteBuffer chunk) throws IOException { - if (closed) { - throw new IOException(sm.getString("stream.closed", getConnectionId(), getIdAsString())); - } - // chunk is always fully written - int result = chunk.remaining(); - if (writeBuffer.isEmpty()) { - int chunkLimit = chunk.limit(); - while (chunk.remaining() > 0) { - int thisTime = Math.min(buffer.remaining(), chunk.remaining()); - chunk.limit(chunk.position() + thisTime); - buffer.put(chunk); - chunk.limit(chunkLimit); - if (chunk.remaining() > 0 && !buffer.hasRemaining()) { - // Only flush if we have more data to write and the buffer - // is full - if (flush(true, coyoteResponse.getWriteListener() == null)) { - writeBuffer.add(chunk); - dataLeft = true; - break; + public final int doWrite(ByteBuffer chunk) throws IOException { + writeLock.lock(); + try { + if (closed) { + throw new IOException(sm.getString("stream.closed", getConnectionId(), getIdAsString())); + } + // chunk is always fully written + int result = chunk.remaining(); + if (writeBuffer.isEmpty()) { + int chunkLimit = chunk.limit(); + while (chunk.remaining() > 0) { + int thisTime = Math.min(buffer.remaining(), chunk.remaining()); + chunk.limit(chunk.position() + thisTime); + buffer.put(chunk); + chunk.limit(chunkLimit); + if (chunk.remaining() > 0 && !buffer.hasRemaining()) { + // Only flush if we have more data to write and the buffer + // is full + if (flush(true, coyoteResponse.getWriteListener() == null)) { + writeBuffer.add(chunk); + dataLeft = true; + break; + } } } + } else { + writeBuffer.add(chunk); } - } else { - writeBuffer.add(chunk); + written += result; + return result; + } finally { + writeLock.unlock(); } - written += result; - return result; } - final synchronized boolean flush(boolean block) throws IOException { - /* - * Need to ensure that there is exactly one call to flush even when there is no data to write. Too few calls - * (i.e. zero) and the end of stream message is not sent for a completed asynchronous write. Too many calls - * and the end of stream message is sent too soon and trailer headers are not sent. - */ - boolean dataInBuffer = buffer.position() > 0; - boolean flushed = false; - - if (dataInBuffer) { - dataInBuffer = flush(false, block); - flushed = true; - } + final boolean flush(boolean block) throws IOException { + writeLock.lock(); + try { + /* + * Need to ensure that there is exactly one call to flush even when there is no data to write. Too few calls + * (i.e. zero) and the end of stream message is not sent for a completed asynchronous write. Too many calls + * and the end of stream message is sent too soon and trailer headers are not sent. + */ + boolean dataInBuffer = buffer.position() > 0; + boolean flushed = false; + + if (dataInBuffer) { + dataInBuffer = flush(false, block); + flushed = true; + } - if (dataInBuffer) { - dataLeft = true; - } else { - if (writeBuffer.isEmpty()) { - // Both buffer and writeBuffer are empty. - if (flushed) { - dataLeft = false; + if (dataInBuffer) { + dataLeft = true; + } else { + if (writeBuffer.isEmpty()) { + // Both buffer and writeBuffer are empty. + if (flushed) { + dataLeft = false; + } else { + dataLeft = flush(false, block); + } } else { - dataLeft = flush(false, block); + dataLeft = writeBuffer.write(this, block); } - } else { - dataLeft = writeBuffer.write(this, block); } - } - return dataLeft; + return dataLeft; + } finally { + writeLock.unlock(); + } } - private synchronized boolean flush(boolean writeInProgress, boolean block) throws IOException { - if (log.isDebugEnabled()) { - log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdAsString(), - Integer.toString(buffer.position()), Boolean.toString(writeInProgress), - Boolean.toString(closed))); - } - if (buffer.position() == 0) { - if (closed && !endOfStreamSent) { - // Handling this special case here is simpler than trying - // to modify the following code to handle it. - handler.writeBody(Stream.this, buffer, 0, coyoteResponse.getTrailerFields() == null); + private boolean flush(boolean writeInProgress, boolean block) throws IOException { + writeLock.lock(); + try { + if (log.isDebugEnabled()) { + log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdAsString(), + Integer.toString(buffer.position()), Boolean.toString(writeInProgress), + Boolean.toString(closed))); } - // Buffer is empty. Nothing to do. - return false; - } - buffer.flip(); - int left = buffer.remaining(); - while (left > 0) { - if (streamReservation == 0) { - streamReservation = reserveWindowSize(left, block); - if (streamReservation == 0) { - // Must be non-blocking. - // Note: Can't add to the writeBuffer here as the write - // may originate from the writeBuffer. - buffer.compact(); - return true; + if (buffer.position() == 0) { + if (closed && !endOfStreamSent) { + // Handling this special case here is simpler than trying + // to modify the following code to handle it. + handler.writeBody(Stream.this, buffer, 0, coyoteResponse.getTrailerFields() == null); } + // Buffer is empty. Nothing to do. + return false; } - while (streamReservation > 0) { - int connectionReservation = handler.reserveWindowSize(Stream.this, streamReservation, block); - if (connectionReservation == 0) { - // Must be non-blocking. - // Note: Can't add to the writeBuffer here as the write - // may originate from the writeBuffer. - buffer.compact(); - return true; + buffer.flip(); + int left = buffer.remaining(); + while (left > 0) { + if (streamReservation == 0) { + streamReservation = reserveWindowSize(left, block); + if (streamReservation == 0) { + // Must be non-blocking. + // Note: Can't add to the writeBuffer here as the write + // may originate from the writeBuffer. + buffer.compact(); + return true; + } + } + while (streamReservation > 0) { + int connectionReservation = handler.reserveWindowSize(Stream.this, streamReservation, block); + if (connectionReservation == 0) { + // Must be non-blocking. + // Note: Can't add to the writeBuffer here as the write + // may originate from the writeBuffer. + buffer.compact(); + return true; + } + // Do the write + handler.writeBody(Stream.this, buffer, connectionReservation, !writeInProgress && closed && + left == connectionReservation && coyoteResponse.getTrailerFields() == null); + streamReservation -= connectionReservation; + left -= connectionReservation; } - // Do the write - handler.writeBody(Stream.this, buffer, connectionReservation, !writeInProgress && closed && - left == connectionReservation && coyoteResponse.getTrailerFields() == null); - streamReservation -= connectionReservation; - left -= connectionReservation; } + buffer.clear(); + return false; + } finally { + writeLock.unlock(); } - buffer.clear(); - return false; } - final synchronized boolean isReady() { - // Bug 63682 - // Only want to return false if the window size is zero AND we are - // already waiting for an allocation. - if (getWindowSize() > 0 && allocationManager.isWaitingForStream() || - handler.getWindowSize() > 0 && allocationManager.isWaitingForConnection() || dataLeft) { - return false; - } else { - return true; + final boolean isReady() { + writeLock.lock(); + try { + // Bug 63682 + // Only want to return false if the window size is zero AND we are + // already waiting for an allocation. + if (getWindowSize() > 0 && allocationManager.isWaitingForStream() || + handler.getWindowSize() > 0 && allocationManager.isWaitingForConnection() || dataLeft) { + return false; + } else { + return true; + } + } finally { + writeLock.unlock(); } } @@ -1046,18 +1069,23 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter { } @Override - public synchronized boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException { - int chunkLimit = src.limit(); - while (src.remaining() > 0) { - int thisTime = Math.min(buffer.remaining(), src.remaining()); - src.limit(src.position() + thisTime); - buffer.put(src); - src.limit(chunkLimit); - if (flush(false, blocking)) { - return true; + public boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException { + writeLock.lock(); + try { + int chunkLimit = src.limit(); + while (src.remaining() > 0) { + int thisTime = Math.min(buffer.remaining(), src.remaining()); + src.limit(src.position() + thisTime); + buffer.put(src); + src.limit(chunkLimit); + if (flush(false, blocking)) { + return true; + } } + return false; + } finally { + writeLock.unlock(); } - return false; } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org