This is an automated email from the ASF dual-hosted git repository.
markt pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/9.0.x by this push:
new cf3ffde893 Refactor to remove syncs on SocketWrapper to support Loom
experiments
cf3ffde893 is described below
commit cf3ffde89312ff012ef15d7ae6c785e257115b6f
Author: Mark Thomas <[email protected]>
AuthorDate: Mon May 23 17:27:24 2022 +0100
Refactor to remove syncs on SocketWrapper to support Loom experiments
---
java/org/apache/coyote/AbstractProcessor.java | 7 ++-
.../apache/coyote/http2/Http2UpgradeHandler.java | 73 +++++++++++++++-------
.../tomcat/util/net/SocketProcessorBase.java | 7 ++-
.../apache/tomcat/util/net/SocketWrapperBase.java | 7 +++
webapps/docs/changelog.xml | 5 ++
5 files changed, 76 insertions(+), 23 deletions(-)
diff --git a/java/org/apache/coyote/AbstractProcessor.java
b/java/org/apache/coyote/AbstractProcessor.java
index a1131df94c..a49bc921cf 100644
--- a/java/org/apache/coyote/AbstractProcessor.java
+++ b/java/org/apache/coyote/AbstractProcessor.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import javax.servlet.RequestDispatcher;
@@ -863,7 +864,9 @@ public abstract class AbstractProcessor extends
AbstractProcessorLight implement
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
Iterator<DispatchType> dispatches = getIteratorAndClearDispatches();
if (socketWrapper != null) {
- synchronized (socketWrapper) {
+ Lock lock = socketWrapper.getLock();
+ lock.lock();
+ try {
/*
* This method is called when non-blocking IO is initiated by
defining a read and/or write listener in a
* non-container thread. It is called once the non-container
thread completes so that the first calls to
@@ -881,6 +884,8 @@ public abstract class AbstractProcessor extends
AbstractProcessorLight implement
DispatchType dispatchType = dispatches.next();
socketWrapper.processSocket(dispatchType.getSocketStatus(), false);
}
+ } finally {
+ lock.unlock();
}
}
}
diff --git a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
index 1a99048395..6aa07c7066 100644
--- a/java/org/apache/coyote/http2/Http2UpgradeHandler.java
+++ b/java/org/apache/coyote/http2/Http2UpgradeHandler.java
@@ -330,12 +330,15 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
try {
switch (status) {
case OPEN_READ:
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
if (socketWrapper.canWrite()) {
// Only send a ping if there is no other data
waiting to be sent.
// Ping manager will ensure they aren't sent too
frequently.
pingManager.sendPing(false);
}
+ } finally {
+ socketWrapper.getLock().unlock();
}
try {
// Disable the connection timeout while frames are
processed
@@ -582,7 +585,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
// may see out of order RST frames which may hard to follow if
// the client is unaware the RST frames may be received out of
// order.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
if (state != null) {
boolean active = state.isActive();
state.sendReset();
@@ -592,6 +596,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
}
socketWrapper.write(true, rstFrame, 0, rstFrame.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
@@ -674,7 +680,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
byte[] payloadLength = new byte[3];
ByteUtil.setThreeBytes(payloadLength, 0, len);
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
socketWrapper.write(true, payloadLength, 0, payloadLength.length);
socketWrapper.write(true, GOAWAY, 0, GOAWAY.length);
socketWrapper.write(true, fixedPayload, 0, 8);
@@ -682,14 +689,19 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
socketWrapper.write(true, debugMsg, 0, debugMsg.length);
}
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders
mimeHeaders, boolean endOfStream, int payloadSize)
throws IOException {
// This ensures the Stream processing thread has control of the socket.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream,
payloadSize);
+ } finally {
+ socketWrapper.getLock().unlock();
}
stream.sentHeaders();
if (endOfStream) {
@@ -802,17 +814,18 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
}
if (writable) {
ByteUtil.set31Bits(header, 5, stream.getIdAsInt());
- synchronized (socketWrapper) {
- try {
- socketWrapper.write(true, header, 0, header.length);
- int orgLimit = data.limit();
- data.limit(data.position() + len);
- socketWrapper.write(true, data);
- data.limit(orgLimit);
- socketWrapper.flush(true);
- } catch (IOException ioe) {
- handleAppInitiatedIOException(ioe);
- }
+ socketWrapper.getLock().lock();
+ try {
+ socketWrapper.write(true, header, 0, header.length);
+ int orgLimit = data.limit();
+ data.limit(data.position() + len);
+ socketWrapper.write(true, data);
+ data.limit(orgLimit);
+ socketWrapper.flush(true);
+ } catch (IOException ioe) {
+ handleAppInitiatedIOException(ioe);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
}
@@ -848,7 +861,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
log.debug(sm.getString("upgradeHandler.windowUpdateConnection",
getConnectionId(),
Integer.valueOf(increment)));
}
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
// Build window update frame for stream 0
byte[] frame = new byte[13];
ByteUtil.setThreeBytes(frame, 0, 4);
@@ -883,12 +897,15 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
if (needFlush) {
socketWrapper.flush(true);
}
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
protected void processWrites() throws IOException {
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
if (socketWrapper.flush(false)) {
socketWrapper.registerWriteInterest();
} else {
@@ -896,6 +913,8 @@ class Http2UpgradeHandler extends AbstractStream implements
InternalHttpUpgradeH
// Ping manager will ensure they aren't sent too frequently.
pingManager.sendPing(false);
}
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
@@ -1412,10 +1431,13 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
// Synchronized since PUSH_PROMISE frames have to be sent in order.
Once
// the stream has been created we need to ensure that the PUSH_PROMISE
// is sent before the next stream is created for a PUSH_PROMISE.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
pushStream = createLocalStream(request);
writeHeaders(associatedStream, pushStream.getIdAsInt(),
request.getMimeHeaders(), false,
Constants.DEFAULT_HEADERS_FRAME_SIZE);
+ } finally {
+ socketWrapper.getLock().unlock();
}
pushStream.sentPushPromise();
@@ -1804,9 +1826,12 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
log.warn(sm.getString("upgradeHandler.unexpectedAck",
connectionId, getIdAsString()));
}
} else {
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
socketWrapper.write(true, SETTINGS_ACK, 0,
SETTINGS_ACK.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().unlock();
}
}
}
@@ -1922,7 +1947,8 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
if (force || now - lastPingNanoTime > pingIntervalNano) {
lastPingNanoTime = now;
byte[] payload = new byte[8];
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
int sentSequence = ++sequence;
PingRecord pingRecord = new PingRecord(sentSequence, now);
inflightPings.add(pingRecord);
@@ -1930,6 +1956,8 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
socketWrapper.write(true, PING, 0, PING.length);
socketWrapper.write(true, payload, 0, payload.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().lock();
}
}
}
@@ -1959,10 +1987,13 @@ class Http2UpgradeHandler extends AbstractStream
implements InternalHttpUpgradeH
} else {
// Client originated ping. Echo it back.
- synchronized (socketWrapper) {
+ socketWrapper.getLock().lock();
+ try {
socketWrapper.write(true, PING_ACK, 0, PING_ACK.length);
socketWrapper.write(true, payload, 0, payload.length);
socketWrapper.flush(true);
+ } finally {
+ socketWrapper.getLock().lock();
}
}
}
diff --git a/java/org/apache/tomcat/util/net/SocketProcessorBase.java
b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
index 1207ab0e20..138da2e7b1 100644
--- a/java/org/apache/tomcat/util/net/SocketProcessorBase.java
+++ b/java/org/apache/tomcat/util/net/SocketProcessorBase.java
@@ -17,6 +17,7 @@
package org.apache.tomcat.util.net;
import java.util.Objects;
+import java.util.concurrent.locks.Lock;
public abstract class SocketProcessorBase<S> implements Runnable {
@@ -37,7 +38,9 @@ public abstract class SocketProcessorBase<S> implements
Runnable {
@Override
public final void run() {
- synchronized (socketWrapper) {
+ Lock lock = socketWrapper.getLock();
+ lock.lock();
+ try {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that
processing
// does not occur in parallel. The test below ensures that if the
@@ -47,6 +50,8 @@ public abstract class SocketProcessorBase<S> implements
Runnable {
return;
}
doRun();
+ } finally {
+ lock.unlock();
}
}
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index fe571d6f9c..5e1491bf24 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -30,6 +30,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
@@ -44,6 +46,7 @@ public abstract class SocketWrapperBase<E> {
private E socket;
private final AbstractEndpoint<E,?> endpoint;
+ private final Lock lock = new ReentrantLock();
protected final AtomicBoolean closed = new AtomicBoolean(false);
@@ -138,6 +141,10 @@ public abstract class SocketWrapperBase<E> {
return endpoint;
}
+ public Lock getLock() {
+ return lock;
+ }
+
public Object getCurrentProcessor() {
return currentProcessor.get();
}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 625a3ceb08..970e81ddd3 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -195,6 +195,11 @@
<code>ERR_HTTP2_SERVER_REFUSED_STREAM</code> for some connections.
(markt)
</fix>
+ <scode>
+ Refactor synchronization blocks locking on <code>SocketWrapper</code>
to
+ use <code>ReentrantLock</code> to support users wishing to experiment
+ with project Loom. (markt)
+ </scode>
</changelog>
</subsection>
<subsection name="Jasper">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]