Author: markt
Date: Wed Feb 4 23:18:45 2015
New Revision: 1657442
URL: http://svn.apache.org/r1657442
Log:
Re-work multiple-write registration fix.
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1657442&r1=1657441&r2=1657442&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
Wed Feb 4 23:18:45 2015
@@ -99,6 +99,7 @@ public class UpgradeProcessor implements
public final SocketState upgradeDispatch(SocketStatus status) throws
IOException {
if (status == SocketStatus.OPEN_READ) {
upgradeServletInputStream.onDataAvailable();
+ upgradeServletOutputStream.checkWriteDispatch();
} else if (status == SocketStatus.OPEN_WRITE) {
upgradeServletOutputStream.onWritePossible();
} else if (status == SocketStatus.STOP) {
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1657442&r1=1657441&r2=1657442&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java
Wed Feb 4 23:18:45 2015
@@ -34,9 +34,8 @@ public class UpgradeServletOutputStream
protected final SocketWrapperBase<?> socketWrapper;
// Used to ensure that isReady() and onWritePossible() have a consistent
- // view of buffer and fireListener when determining if the listener should
- // fire.
- private final Object fireListenerLock = new Object();
+ // view of buffer and registered.
+ private final Object registeredLock = new Object();
// Used to ensure that only one thread writes to the socket at a time and
// that buffer is consistently updated with any unwritten data after the
@@ -52,8 +51,13 @@ public class UpgradeServletOutputStream
// Start in blocking-mode
private volatile WriteListener listener = null;
- // Guarded by fireListenerLock
- private volatile boolean fireListener = false;
+ // Guarded by registeredLock
+ private volatile boolean registered = false;
+
+ // Use to track if a dispatch needs to be arranged to trigger the first
call
+ // to onWritePossible. If the socket gets registered for write while this
is
+ // set then this will be ignored.
+ private volatile boolean writeDispatchRequired = false;
private volatile ClassLoader applicationLoader = null;
@@ -72,20 +76,19 @@ public class UpgradeServletOutputStream
// Make sure isReady() and onWritePossible() have a consistent view of
// fireListener when determining if the listener should fire
- synchronized (fireListenerLock) {
+ synchronized (registeredLock) {
if (flushing) {
// Since flushing is true the socket must already be registered
// for write and multiple registrations will cause problems.
- fireListener = true;
+ registered = true;
return false;
- } else if (fireListener){
- // If the listener is configured to fire then the socket must
- // already be registered for write and multiple registrations
- // will cause problems.
+ } else if (registered){
+ // The socket is already registered for write and multiple
+ // registrations will cause problems.
return false;
} else {
boolean result = socketWrapper.isReadyForWrite();
- fireListener = !result;
+ registered = !result;
return result;
}
}
@@ -104,10 +107,8 @@ public class UpgradeServletOutputStream
}
// Container is responsible for first call to onWritePossible() but
only
// need to do this if setting the listener for the first time.
- synchronized (fireListenerLock) {
- fireListener = true;
- }
- socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
+ writeDispatchRequired = true;
+
this.listener = listener;
this.applicationLoader =
Thread.currentThread().getContextClassLoader();
}
@@ -199,6 +200,9 @@ public class UpgradeServletOutputStream
return;
}
} else {
+ // This may fill the write buffer in which case the
+ // isReadyForWrite() call below will re-register the socket for
+ // write
flushInternal(false, false);
}
@@ -206,10 +210,12 @@ public class UpgradeServletOutputStream
// of buffer and fireListener when determining if the listener
// should fire
boolean fire = false;
- synchronized (fireListenerLock) {
- if (fireListener && socketWrapper.isReadyForWrite()) {
- fireListener = false;
+ synchronized (registeredLock) {
+ if (socketWrapper.isReadyForWrite()) {
+ registered = false;
fire = true;
+ } else {
+ registered = true;
}
}
@@ -239,4 +245,16 @@ public class UpgradeServletOutputStream
thread.setContextClassLoader(originalClassLoader);
}
}
+
+
+ void checkWriteDispatch() {
+ synchronized (registeredLock) {
+ if (writeDispatchRequired) {
+ writeDispatchRequired = false;
+ if (!registered) {
+ socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]