Author: markt Date: Mon Jan 14 10:45:24 2013 New Revision: 1432867 URL: http://svn.apache.org/viewvc?rev=1432867&view=rev Log: Fix various issues highlighted when running the Autobahn test suite on Linux. - Don't register the socket for a read when a write event completes (may lead to thread starvation) - Use a dedicated lock rather than the ServletOutputStream to protect the writing of a WebSocket message - Add a lock to protect the write buffer from concurrent calls to writeInternal() - Add a lock to ensure a consistent view of buffer and fireListenerLock
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1432867&r1=1432866&r2=1432867&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Mon Jan 14 10:45:24 2013 @@ -663,7 +663,14 @@ public abstract class AbstractProtocol i } else if (state == SocketState.UPGRADED) { // Need to keep the connection associated with the processor connections.put(socket, processor); - longPoll(wrapper, processor); + // Don't add sockets back to the poller if this was a + // non-blocking write otherwise the poller may trigger + // multiple read events which may lead to thread starvation + // in the connector. The write() method will add this this + // socket to the poller if necessary. + if (status != SocketStatus.OPEN_WRITE) { + longPoll(wrapper, processor); + } } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java?rev=1432867&r1=1432866&r2=1432867&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java Mon Jan 14 10:45:24 2013 @@ -28,6 +28,8 @@ public abstract class AbstractServletOut protected static final StringManager sm = StringManager.getManager(Constants.Package); + private final Object fireListenerLock = new Object(); + private final Object nioWriteLock = new Object(); // Start in blocking-mode private volatile WriteListener listener = null; @@ -41,9 +43,13 @@ public abstract class AbstractServletOut sm.getString("upgrade.sos.canWrite.is")); } - boolean result = (buffer == null); - fireListener = !result; - return result; + // Make sure canWrite() and onWritePossible() have a consistent view of + // buffer and fireListener when determining if the listener should fire + synchronized (fireListenerLock) { + boolean result = (buffer == null); + fireListener = !result; + return result; + } } @Override @@ -90,14 +96,21 @@ public abstract class AbstractServletOut doWrite(true, b, off, len); } else { // Non-blocking IO - int written = doWrite(false, b, off, len); - if (written < len) { - // TODO: - Reuse the buffer - // - Only reallocate if it gets too big (>8k?) - buffer = new byte[len - written]; - System.arraycopy(b, off + written, buffer, 0, len - written); - } else { - buffer = null; + // If the non-blocking read does not complete, doWrite() will add + // the socket back into the poller. The poller way trigger a new + // write event before this method has finished updating buffer. This + // sync makes sure that buffer is updated before the next write + // executes. + synchronized (nioWriteLock) { + int written = doWrite(false, b, off, len); + if (written < len) { + // TODO: - Reuse the buffer + // - Only reallocate if it gets too big (>8k?) + buffer = new byte[len - written]; + System.arraycopy(b, off + written, buffer, 0, len - written); + } else { + buffer = null; + } } } } @@ -109,9 +122,13 @@ public abstract class AbstractServletOut } catch (IOException ioe) { throw new RuntimeException(ioe); } - if (buffer == null && fireListener) { - fireListener = false; - listener.onWritePossible(); + // Make sure canWrite() and onWritePossible() have a consistent view of + // buffer and fireListener when determining if the listener should fire + synchronized (fireListenerLock) { + if (buffer == null && fireListener) { + fireListener = false; + listener.onWritePossible(); + } } } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java?rev=1432867&r1=1432866&r2=1432867&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Mon Jan 14 10:45:24 2013 @@ -36,6 +36,8 @@ import javax.websocket.SendResult; public class WsRemoteEndpoint implements RemoteEndpoint { + private final Object messageWriteLock = new Object(); + private final ServletOutputStream sos; private final WsSession wsSession; // Max length for outgoing WebSocket frame header is 10 bytes @@ -248,7 +250,9 @@ public class WsRemoteEndpoint implements } header.flip(); - synchronized (sos) { + // Could sync on sos but don't as other (user or container) code may + // sync on this creating the potential for deadlocks. + synchronized (messageWriteLock) { doBlockingWrite(header); doBlockingWrite(message); try { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org