Author: kpreisser Date: Tue Oct 22 23:32:26 2013 New Revision: 1534848 URL: http://svn.apache.org/r1534848 Log: Merged revision(s) 1534846 from tomcat/trunk: - Prevent recursive invocation of Runnables by Room.invokeAndWait() to prevent errors like ConcurrentModificationException when Room.broadcastRoomMessage() iterates over an ArrayList and then calls Room.invokeAndWait() recursivly, iterating again over the array. - Add comment about blocking Session.close() method.
Modified: tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java Modified: tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java?rev=1534848&r1=1534847&r2=1534848&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java (original) +++ tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Client.java Tue Oct 22 23:32:26 2013 @@ -105,6 +105,17 @@ public class Client { CloseCodes.VIOLATED_POLICY, "Send Buffer exceeded"); try { + // TODO: close() may block if the remote endpoint doesn't read the data + // (eventually there will be a TimeoutException). However, this method + // (sendMessage) is intended to run asynchronous code and shouldn't + // block. Otherwise it would temporarily stop processing of messages + // from other clients. + // Maybe call this method on another thread. + // Note that when this method is called, the RemoteEndpoint.Async + // is still in the process of sending data, so there probably should + // be another way to abort the Websocket connection. + // Ideally, there should be some abort() method that cancels the + // connection immediately... session.close(cr); } catch (IOException e) { // Ignore @@ -188,6 +199,21 @@ public class Client { private final SendHandler sendHandler = new SendHandler() { @Override public void onResult(SendResult result) { + if (!result.isOK()) { + // Message could not be sent. In this case, we don't + // set isSendingMessage to false because we must assume the connection + // broke (and onClose will be called), so we don't try to send + // other messages. + // As a precaution, we close the session (e.g. if a send timeout occured). + // TODO: session.close() blocks, while this handler shouldn't block. + // Ideally, there should be some abort() method that cancels the + // connection immediately... + try { + session.close(); + } catch (IOException ex) { + // Ignore + } + } synchronized (messagesToSend) { if (!messagesToSend.isEmpty()) { Modified: tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java?rev=1534848&r1=1534847&r2=1534848&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java (original) +++ tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/DrawboardEndpoint.java Tue Oct 22 23:32:26 2013 @@ -123,6 +123,11 @@ public final class DrawboardEndpoint ext if (player != null) { // Remove this player from the room. player.removeFromRoom(); + + // Set player to null to prevent NPEs when onMessage events + // are processed (from other threads) after onClose has been + // called from different thread which closed the Websocket session. + player = null; } } catch (RuntimeException ex) { log.error("Unexpected exception: " + ex.toString(), ex); Modified: tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java?rev=1534848&r1=1534847&r2=1534848&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java (original) +++ tomcat/tc7.0.x/trunk/webapps/examples/WEB-INF/classes/websocket/drawboard/Room.java Tue Oct 22 23:32:26 2013 @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.locks.ReentrantLock; import javax.imageio.ImageIO; @@ -85,9 +86,9 @@ public final class Room { /** - * An object used to synchronize access to this Room. + * The lock used to synchronize access to this Room. */ - private final Object syncObj = new Object(); + private final ReentrantLock roomLock = new ReentrantLock(); /** * Indicates if this room has already been shutdown. @@ -196,7 +197,8 @@ public final class Room { * @param p */ private void internalRemovePlayer(Player p) { - players.remove(p); + boolean removed = players.remove(p); + assert removed; // Broadcast that one player is removed. broadcastRoomMessage(MessageType.PLAYER_CHANGED, "-"); @@ -292,19 +294,63 @@ public final class Room { } } + /** + * A list of cached {@link Runnable}s to prevent recursive invocation of Runnables + * by one thread. This variable is only used by one thread at a time and then + * set to <code>null</code>. + */ + private List<Runnable> cachedRunnables = null; /** * Submits the given Runnable to the Room Executor and waits until it * has been executed. Currently, this simply means that the Runnable - * will be run directly inside of a synchronized() block. + * will be run directly inside of a synchronized() block.<br> + * Note that if a runnable recursively calls invokeAndWait() with another + * runnable on this Room, it will not be executed recursively, but instead + * cached until the original runnable is finished, to keep the behavior of + * using a Executor. * @param task */ public void invokeAndWait(Runnable task) { - synchronized (syncObj) { - if (!closed) { - task.run(); + + // Check if the current thread already holds a lock on this room. + // If yes, then we must not directly execute the Runnable but instead + // cache it until the original invokeAndWait() has finished. + if (roomLock.isHeldByCurrentThread()) { + + if (cachedRunnables == null) { + cachedRunnables = new ArrayList<Runnable>(); + } + cachedRunnables.add(task); + + } else { + + roomLock.lock(); + try { + // Explicitely overwrite value to ensure data consistency in + // current thread + cachedRunnables = null; + + if (!closed) { + task.run(); + } + + // Run the cached runnables. + if (cachedRunnables != null) { + for (int i = 0; i < cachedRunnables.size(); i++) { + if (!closed) { + cachedRunnables.get(i).run(); + } + } + cachedRunnables = null; + } + + } finally { + roomLock.unlock(); } + } + } /** --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org