Author: kpreisser
Date: Tue Oct 22 23:29:14 2013
New Revision: 1534846
URL: http://svn.apache.org/r1534846
Log:
- Prevent recursive invocation of Runnables by Room.invokeAndWait() to prevent
errors like ConcurrentModificationException when Room.broadcastRoomMessage()
iterates over an ArrayList and then calls Room.invokeAndWait() recursivly,
iterating again over the array.
- Add comment about blocking Session.close() method.
Modified:
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java
Modified:
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java?rev=1534846&r1=1534845&r2=1534846&view=diff
==============================================================================
---
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java
(original)
+++
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java
Tue Oct 22 23:29:14 2013
@@ -105,6 +105,17 @@ public class Client {
CloseCodes.VIOLATED_POLICY,
"Send Buffer exceeded");
try {
+ // TODO: close() may block if the remote endpoint
doesn't read the data
+ // (eventually there will be a TimeoutException).
However, this method
+ // (sendMessage) is intended to run asynchronous
code and shouldn't
+ // block. Otherwise it would temporarily stop
processing of messages
+ // from other clients.
+ // Maybe call this method on another thread.
+ // Note that when this method is called, the
RemoteEndpoint.Async
+ // is still in the process of sending data, so
there probably should
+ // be another way to abort the Websocket
connection.
+ // Ideally, there should be some abort() method
that cancels the
+ // connection immediately...
session.close(cr);
} catch (IOException e) {
// Ignore
@@ -184,6 +195,21 @@ public class Client {
private final SendHandler sendHandler = new SendHandler() {
@Override
public void onResult(SendResult result) {
+ if (!result.isOK()) {
+ // Message could not be sent. In this case, we don't
+ // set isSendingMessage to false because we must assume the
connection
+ // broke (and onClose will be called), so we don't try to send
+ // other messages.
+ // As a precaution, we close the session (e.g. if a send
timeout occured).
+ // TODO: session.close() blocks, while this handler shouldn't
block.
+ // Ideally, there should be some abort() method that cancels
the
+ // connection immediately...
+ try {
+ session.close();
+ } catch (IOException ex) {
+ // Ignore
+ }
+ }
synchronized (messagesToSend) {
if (!messagesToSend.isEmpty()) {
Modified:
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java?rev=1534846&r1=1534845&r2=1534846&view=diff
==============================================================================
---
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java
(original)
+++
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java
Tue Oct 22 23:29:14 2013
@@ -123,6 +123,11 @@ public final class DrawboardEndpoint ext
if (player != null) {
// Remove this player from the room.
player.removeFromRoom();
+
+ // Set player to null to prevent NPEs when
onMessage events
+ // are processed (from other threads) after
onClose has been
+ // called from different thread which closed the
Websocket session.
+ player = null;
}
} catch (RuntimeException ex) {
log.error("Unexpected exception: " + ex.toString(),
ex);
Modified:
tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java?rev=1534846&r1=1534845&r2=1534846&view=diff
==============================================================================
--- tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java
(original)
+++ tomcat/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java
Tue Oct 22 23:29:14 2013
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.locks.ReentrantLock;
import javax.imageio.ImageIO;
@@ -85,9 +86,9 @@ public final class Room {
/**
- * An object used to synchronize access to this Room.
+ * The lock used to synchronize access to this Room.
*/
- private final Object syncObj = new Object();
+ private final ReentrantLock roomLock = new ReentrantLock();
/**
* Indicates if this room has already been shutdown.
@@ -196,7 +197,8 @@ public final class Room {
* @param p
*/
private void internalRemovePlayer(Player p) {
- players.remove(p);
+ boolean removed = players.remove(p);
+ assert removed;
// Broadcast that one player is removed.
broadcastRoomMessage(MessageType.PLAYER_CHANGED, "-");
@@ -292,19 +294,63 @@ public final class Room {
}
}
+ /**
+ * A list of cached {@link Runnable}s to prevent recursive invocation of
Runnables
+ * by one thread. This variable is only used by one thread at a time and
then
+ * set to <code>null</code>.
+ */
+ private List<Runnable> cachedRunnables = null;
/**
* Submits the given Runnable to the Room Executor and waits until it
* has been executed. Currently, this simply means that the Runnable
- * will be run directly inside of a synchronized() block.
+ * will be run directly inside of a synchronized() block.<br>
+ * Note that if a runnable recursively calls invokeAndWait() with another
+ * runnable on this Room, it will not be executed recursively, but instead
+ * cached until the original runnable is finished, to keep the behavior of
+ * using a Executor.
* @param task
*/
public void invokeAndWait(Runnable task) {
- synchronized (syncObj) {
- if (!closed) {
- task.run();
+
+ // Check if the current thread already holds a lock on this room.
+ // If yes, then we must not directly execute the Runnable but instead
+ // cache it until the original invokeAndWait() has finished.
+ if (roomLock.isHeldByCurrentThread()) {
+
+ if (cachedRunnables == null) {
+ cachedRunnables = new ArrayList<>();
+ }
+ cachedRunnables.add(task);
+
+ } else {
+
+ roomLock.lock();
+ try {
+ // Explicitely overwrite value to ensure data consistency in
+ // current thread
+ cachedRunnables = null;
+
+ if (!closed) {
+ task.run();
+ }
+
+ // Run the cached runnables.
+ if (cachedRunnables != null) {
+ for (int i = 0; i < cachedRunnables.size(); i++) {
+ if (!closed) {
+ cachedRunnables.get(i).run();
+ }
+ }
+ cachedRunnables = null;
+ }
+
+ } finally {
+ roomLock.unlock();
}
+
}
+
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]