Author: markt
Date: Thu Nov 20 06:30:08 2014
New Revision: 1640688
URL: http://svn.apache.org/r1640688
Log:
Fix various problems identified with flushing batched messages:
- Flush triggered by disabling batching failed to flip buffer before writing
and also failed to clear the buffer after writing was complete. This resulted
in duplicated and/or corrupted messages.
- The flush triggered by session close was too late since no writes are
permitted once the close process starts. This resulted in an exception being
thrown.
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu
Nov 20 06:30:08 2014
@@ -70,7 +70,7 @@ wsRemoteEndpoint.closedOutputStream=This
wsRemoteEndpoint.closedWriter=This method may not be called as the Writer has
been closed
wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments
bust be of the same type
wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently
even when using the asynchronous send messages. The client must wait for the
previous message to complete before sending the next.
-wsRemoteEndpoint.flushOnCloseFailed=Flushing batched messages before closing
the session failed
+wsRemoteEndpoint.flushOnCloseFailed=Batched messages still enabled after
session has been closed. Unable to flush remaining batched message.
wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not
be instantiated
wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}]
wsRemoteEndpoint.wrongState=The remote endpoint was in state [{0}] which is an
invalid state for called method
@@ -88,6 +88,7 @@ wsSession.duplicateHandlerBinary=A binar
wsSession.duplicateHandlerPong=A pong message handler has already been
configured
wsSession.duplicateHandlerText=A text message handler has already been
configured
wsSession.invalidHandlerTypePong=A pong message handler must implement
MessageHandler.Basic
+wsSession.flushFailOnClose=Failed to flush batched messages on session close
wsSession.messageFailed=Unable to write the complete message as the WebSocket
connection has been closed
wsSession.sendCloseFail=Failed to send close message to remote endpoint
wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not
registered with this session
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
Thu Nov 20 06:30:08 2014
@@ -302,13 +302,10 @@ public abstract class WsRemoteEndpointIm
boolean doWrite = false;
synchronized (messagePartLock) {
- if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
- try {
- setBatchingAllowed(false);
- } catch (IOException e) {
- log.warn(sm.getString(
- "wsRemoteEndpoint.flushOnCloseFailed"), e);
- }
+ if (Constants.OPCODE_CLOSE == mp.getOpCode() &&
getBatchingAllowed()) {
+ // Should not happen. To late to send batched messages now
since
+ // the session has been closed. Complain loudly.
+ log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
}
if (messagePartInProgress) {
// When a control message is sent while another message is
being
@@ -382,7 +379,10 @@ public abstract class WsRemoteEndpointIm
if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
nextFragmented = fragmented;
nextText = text;
- doWrite(mp.getEndHandler(), outputBuffer);
+ outputBuffer.flip();
+ SendHandler flushHandler = new OutputBufferFlushSendHandler(
+ outputBuffer, mp.getEndHandler());
+ doWrite(flushHandler, outputBuffer);
return;
}
@@ -836,6 +836,30 @@ public abstract class WsRemoteEndpointIm
}
}
+
+ /**
+ * Ensures that tne output buffer is cleared after it has been flushed.
+ */
+ private static class OutputBufferFlushSendHandler implements SendHandler {
+
+ private final ByteBuffer outputBuffer;
+ private final SendHandler handler;
+
+ public OutputBufferFlushSendHandler(ByteBuffer outputBuffer,
SendHandler handler) {
+ this.outputBuffer = outputBuffer;
+ this.handler = handler;
+ }
+
+ @Override
+ public void onResult(SendResult result) {
+ if (result.isOK()) {
+ outputBuffer.clear();
+ }
+ handler.onResult(result);
+ }
+ }
+
+
private class WsOutputStream extends OutputStream {
private final WsRemoteEndpointImplBase endpoint;
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Nov 20
06:30:08 2014
@@ -459,6 +459,13 @@ public class WsSession implements Sessio
return;
}
+ try {
+ wsRemoteEndpoint.setBatchingAllowed(false);
+ } catch (IOException e) {
+ log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+ fireEndpointOnError(e);
+ }
+
state = State.CLOSING;
sendCloseMessage(closeReasonMessage);
@@ -487,6 +494,12 @@ public class WsSession implements Sessio
synchronized (stateLock) {
if (state == State.OPEN) {
+ try {
+ wsRemoteEndpoint.setBatchingAllowed(false);
+ } catch (IOException e) {
+ log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+ fireEndpointOnError(e);
+ }
sendCloseMessage(closeReason);
fireEndpointOnClose(closeReason);
state = State.CLOSED;
@@ -497,7 +510,6 @@ public class WsSession implements Sessio
}
}
-
private void fireEndpointOnClose(CloseReason closeReason) {
// Fire the onClose event
@@ -515,6 +527,21 @@ public class WsSession implements Sessio
}
+
+ private void fireEndpointOnError(Throwable throwable) {
+
+ // Fire the onError event
+ Thread t = Thread.currentThread();
+ ClassLoader cl = t.getContextClassLoader();
+ t.setContextClassLoader(applicationClassLoader);
+ try {
+ localEndpoint.onError(this, throwable);
+ } finally {
+ t.setContextClassLoader(cl);
+ }
+ }
+
+
private void sendCloseMessage(CloseReason closeReason) {
// 125 is maximum size for the payload of a control message
ByteBuffer msg = ByteBuffer.allocate(125);
Modified:
tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
(original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Thu
Nov 20 06:30:08 2014
@@ -119,10 +119,14 @@ public class TesterFirehoseServer {
for (int i = 0; i < MESSAGE_COUNT; i++) {
remote.sendText(MESSAGE);
+ if (i % (MESSAGE_COUNT * 0.4) == 0) {
+ remote.setBatchingAllowed(false);
+ remote.setBatchingAllowed(true);
+ }
}
- // Ensure remaining messages are flushed
- remote.setBatchingAllowed(false);
+ // Flushing should happen automatically on session close
+ session.close();
}
@OnError
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]