Repository: camel
Updated Branches:
  refs/heads/master 6f995a7d1 -> 74e75a537


CAMEL-10283: camel-websocket producer should not use blocking send operations. 
This can cause sendToAll to block while attempting to send to many clients.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/83f67061
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/83f67061
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/83f67061

Branch: refs/heads/master
Commit: 83f67061a0cca04e81ee5766ced34bda4e99f443
Parents: 6f995a7
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Sep 4 11:37:00 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Sep 4 11:37:00 2016 +0200

----------------------------------------------------------------------
 .../src/main/docs/websocket-component.adoc      |   3 +-
 .../component/websocket/WebsocketEndpoint.java  |  14 +++
 .../component/websocket/WebsocketProducer.java  |  47 +++++++--
 .../WebsocketProducerRouteExampleTest.java      |   3 +-
 .../websocket/WebsocketProducerTest.java        | 103 ++++++-------------
 5 files changed, 87 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/main/docs/websocket-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/docs/websocket-component.adoc 
b/components/camel-websocket/src/main/docs/websocket-component.adoc
index 9d9cad2..a473bec 100644
--- a/components/camel-websocket/src/main/docs/websocket-component.adoc
+++ b/components/camel-websocket/src/main/docs/websocket-component.adoc
@@ -72,7 +72,7 @@ The Jetty Websocket component supports 12 options which are 
listed below.
 
 
 // endpoint options: START
-The Jetty Websocket component supports 20 endpoint options which are listed 
below:
+The Jetty Websocket component supports 21 endpoint options which are listed 
below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -86,6 +86,7 @@ The Jetty Websocket component supports 20 endpoint options 
which are listed belo
 | sessionSupport | consumer | false | boolean | Whether to enable session 
support which enables HttpSession for each http request.
 | staticResources | consumer |  | String | Set a resource path for static 
resources (such as .html files etc). The resources can be loaded from classpath 
if you prefix with classpath: otherwise the resources is loaded from file 
system or from JAR files. For example to load from root classpath use 
classpath:. or classpath:WEB-INF/static If not configured (eg null) then no 
static resource is in use.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the 
consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler 
is enabled then this options is not in use. By default the consumer will deal 
with exceptions that will be logged at WARN/ERROR level and ignored.
+| sendTimeout | producer | 60000 | Integer | Timeout in millise when sending 
to a websocket channel. The default timeout is 60000 (60 seconds).
 | sendToAll | producer |  | Boolean | To send to all websocket subscribers. 
Can be used to configure on endpoint level instead of having to use the 
WebsocketConstants.SEND_TO_ALL header on the message.
 | bufferSize | advanced | 8192 | Integer | Set the buffer size of the 
websocketServlet which is also the max frame byte size (default 8192)
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default 
exchange pattern when creating an exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index 11e1eba..0ce9f49 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -54,6 +54,8 @@ public class WebsocketEndpoint extends DefaultEndpoint {
 
     @UriParam(label = "producer")
     private Boolean sendToAll;
+    @UriParam(label = "producer", defaultValue = "60000")
+    private Integer sendTimeout = 60000;
     @UriParam(label = "monitoring")
     private boolean enableJmx;
     @UriParam(label = "consumer")
@@ -188,6 +190,18 @@ public class WebsocketEndpoint extends DefaultEndpoint {
         this.sendToAll = sendToAll;
     }
 
+    public Integer getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * Timeout in millise when sending to a websocket channel.
+     * The default timeout is 60000 (60 seconds).
+     */
+    public void setSendTimeout(Integer sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+
     public String getProtocol() {
         return uri.getScheme();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 18753a6..6f50317 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -19,11 +19,17 @@ package org.apache.camel.component.websocket;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.StopWatch;
 
 public class WebsocketProducer extends DefaultProducer implements 
WebsocketProducerConsumer {
 
@@ -52,9 +58,15 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
             if (connectionKey != null) {
                 DefaultWebsocket websocket = store.get(connectionKey);
                 log.debug("Sending to connection key {} -> {}", connectionKey, 
message);
-                sendMessage(websocket, message);
+                Future<Void> future = sendMessage(websocket, message);
+                if (future != null) {
+                    future.get(endpoint.getSendTimeout(), 
TimeUnit.MILLISECONDS);
+                    if (!future.isDone()) {
+                        throw new ExchangeTimedOutException(exchange, 
endpoint.getSendTimeout(), "Failed to send message to the connection");
+                    }
+                }
             } else {
-                throw new IllegalArgumentException("Failed to send message to 
single connection; connetion key not set.");
+                throw new IllegalArgumentException("Failed to send message to 
single connection; connection key not set.");
             }
         }
     }
@@ -86,31 +98,54 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
         log.debug("Sending to all {}", message);
         Collection<DefaultWebsocket> websockets = store.getAll();
         Exception exception = null;
+
+        List<Future> futures = new CopyOnWriteArrayList<>();
         for (DefaultWebsocket websocket : websockets) {
             try {
-                sendMessage(websocket, message);
+                Future<Void> future = sendMessage(websocket, message);
+                if (future != null) {
+                    futures.add(future);
+                }
             } catch (Exception e) {
                 if (exception == null) {
                     exception = new CamelExchangeException("Failed to deliver 
message to one or more recipients.", exchange, e);
                 }
             }
         }
+
+        // check if they are all done within the timed out period
+        StopWatch watch = new StopWatch();
+        while (!futures.isEmpty() && watch.taken() < 
endpoint.getSendTimeout()) {
+            // remove all that are done/cancelled
+            for (Future future : futures) {
+                if (future.isDone() || future.isCancelled()) {
+                    futures.remove(future);
+                }
+            }
+            // TODO sleep a bit until done to avoid burning cpu cycles
+        }
+        if (!futures.isEmpty()) {
+            exception = new CamelExchangeException("Failed to deliver message 
within " + endpoint.getSendTimeout() + " millis to one or more recipients.", 
exchange);
+        }
+
         if (exception != null) {
             throw exception;
         }
     }
 
-    void sendMessage(DefaultWebsocket websocket, Object message) throws 
IOException {
+    Future<Void> sendMessage(DefaultWebsocket websocket, Object message) 
throws IOException {
+        Future<Void> future = null;
         // in case there is web socket and socket connection is open - send 
message
         if (websocket != null && websocket.getSession().isOpen()) {
             log.trace("Sending to websocket {} -> {}", 
websocket.getConnectionKey(), message);
             if (message instanceof String) {
-                websocket.getSession().getRemote().sendString((String) 
message);
+                future = 
websocket.getSession().getRemote().sendStringByFuture((String) message);
             } else if (message instanceof byte[]) {
                 ByteBuffer buf = ByteBuffer.wrap((byte[]) message);
-                websocket.getSession().getRemote().sendBytes(buf);
+                future = 
websocket.getSession().getRemote().sendBytesByFuture(buf);
             }
         }
+        return future;
     }
 
     //Store is set/unset upon connect/disconnect of the producer

http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
index 5f8026a..2c52cbd 100644
--- 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
+++ 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerRouteExampleTest.java
@@ -61,6 +61,7 @@ public class WebsocketProducerRouteExampleTest extends 
CamelTestSupport {
         WebSocket websocket = c.prepareGet("ws://localhost:" + port + 
"/shop").execute(
             new WebSocketUpgradeHandler.Builder()
                 .addWebSocketListener(new WebSocketTextListener() {
+
                     @Override
                     public void onMessage(String message) {
                         received.add(message);
@@ -68,7 +69,6 @@ public class WebsocketProducerRouteExampleTest extends 
CamelTestSupport {
                         latch.countDown();
                     }
 
-                    
                     @Override
                     public void onOpen(WebSocket websocket) {
                     }
@@ -112,7 +112,6 @@ public class WebsocketProducerRouteExampleTest extends 
CamelTestSupport {
                         latch.countDown();
                     }
 
-                   
                     @Override
                     public void onOpen(WebSocket websocket) {
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/83f67061/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
index 989ae0b..4e362c8 100644
--- 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
+++ 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
@@ -19,6 +19,9 @@ package org.apache.camel.component.websocket;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -67,8 +70,11 @@ public class WebsocketProducerTest {
     private Message inMessage;
     @Mock
     private RemoteEndpoint remoteEndpoint;
+    @Mock
+    private Future<Void> future;
 
-    private IOException exception = new IOException("BAD NEWS EVERYONE!");
+    private IOException ioException = new IOException("BAD NEWS EVERYONE!");
+    private ExecutionException exception = new ExecutionException("Failure", 
ioException);
     private WebsocketProducer websocketProducer;
     private Collection<DefaultWebsocket> sockets;
 
@@ -101,7 +107,7 @@ public class WebsocketProducerTest {
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verifyNoMoreInteractions();
     }
 
@@ -115,13 +121,14 @@ public class WebsocketProducerTest {
         when(defaultWebsocket1.getSession()).thenReturn(session);
         when(session.isOpen()).thenReturn(true);
         when(session.getRemote()).thenReturn(remoteEndpoint);
-        doThrow(exception).when(remoteEndpoint).sendString(MESSAGE);
+        when(remoteEndpoint.sendStringByFuture(MESSAGE)).thenReturn(future);
+        doThrow(exception).when(future).get(60000, TimeUnit.MILLISECONDS);
 
         try {
             websocketProducer.process(exchange);
             fail("Exception expected");
-        } catch (IOException ioe) {
-            assertEquals(exception, ioe);
+        } catch (Exception e) {
+            // expected
         }
 
         InOrder inOrder = inOrder(endpoint, store, session, defaultWebsocket1, 
defaultWebsocket2, exchange, inMessage, remoteEndpoint);
@@ -133,7 +140,8 @@ public class WebsocketProducerTest {
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
+        inOrder.verify(endpoint, times(2)).getSendTimeout();
         inOrder.verifyNoMoreInteractions();
     }
 
@@ -158,46 +166,11 @@ public class WebsocketProducerTest {
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
-        inOrder.verify(defaultWebsocket2, times(1)).getSession();
-        inOrder.verify(session, times(1)).isOpen();
-        inOrder.verify(defaultWebsocket2, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
-        inOrder.verifyNoMoreInteractions();
-    }
-
-    @Test
-    public void testProcessMultipleMessagesWithException() throws Exception {
-        when(exchange.getIn()).thenReturn(inMessage);
-        when(inMessage.getMandatoryBody()).thenReturn(MESSAGE);
-        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true);
-        when(store.getAll()).thenReturn(sockets);
-        when(defaultWebsocket1.getSession()).thenReturn(session);
-        when(defaultWebsocket2.getSession()).thenReturn(session);
-        when(session.getRemote()).thenReturn(remoteEndpoint);
-        doThrow(exception).when(remoteEndpoint).sendString(MESSAGE);
-        when(session.isOpen()).thenReturn(true);
-
-        try {
-            websocketProducer.process(exchange);
-            fail("Exception expected");
-        } catch (Exception e) {
-            assertEquals(exception, e.getCause());
-        }
-
-        InOrder inOrder = inOrder(endpoint, store, session, defaultWebsocket1, 
defaultWebsocket2, exchange, inMessage, remoteEndpoint);
-        inOrder.verify(exchange, times(1)).getIn();
-        inOrder.verify(inMessage, times(1)).getMandatoryBody();
-        inOrder.verify(inMessage, 
times(1)).getHeader(WebsocketConstants.SEND_TO_ALL, false, Boolean.class);
-        inOrder.verify(store, times(1)).getAll();
-        inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(session, times(1)).isOpen();
-        inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verify(defaultWebsocket2, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket2, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verifyNoMoreInteractions();
     }
 
@@ -237,7 +210,7 @@ public class WebsocketProducerTest {
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verifyNoMoreInteractions();
     }
 
@@ -256,28 +229,6 @@ public class WebsocketProducerTest {
     }
 
     @Test
-    public void testSendMessageWithException() throws Exception {
-        when(defaultWebsocket1.getSession()).thenReturn(session);
-        when(session.isOpen()).thenReturn(true);
-        when(session.getRemote()).thenReturn(remoteEndpoint);
-        doThrow(exception).when(remoteEndpoint).sendString(MESSAGE);
-
-        try {
-            websocketProducer.sendMessage(defaultWebsocket1, MESSAGE);
-            fail("Exception expected");
-        } catch (IOException ioe) {
-            assertEquals(exception, ioe);
-        }
-
-        InOrder inOrder = inOrder(endpoint, store, session, defaultWebsocket1, 
defaultWebsocket2, exchange, inMessage, remoteEndpoint);
-        inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(session, times(1)).isOpen();
-        inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
-        inOrder.verifyNoMoreInteractions();
-    }
-
-    @Test
     public void testIsSendToAllSet() {
         when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true, false);
         assertTrue(websocketProducer.isSendToAllSet(inMessage));
@@ -311,28 +262,32 @@ public class WebsocketProducerTest {
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verify(defaultWebsocket2, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket2, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verifyNoMoreInteractions();
     }
 
     @Test
-    public void testSendToAllWithExcpetion() throws Exception {
+    public void testSendToAllWithException() throws Exception {
+        when(exchange.getIn()).thenReturn(inMessage);
+        when(inMessage.getMandatoryBody()).thenReturn(MESSAGE);
+        when(inMessage.getHeader(WebsocketConstants.SEND_TO_ALL, false, 
Boolean.class)).thenReturn(true);
         when(store.getAll()).thenReturn(sockets);
         when(defaultWebsocket1.getSession()).thenReturn(session);
         when(defaultWebsocket2.getSession()).thenReturn(session);
         when(session.getRemote()).thenReturn(remoteEndpoint);
-        doThrow(exception).when(remoteEndpoint).sendString(MESSAGE);
         when(session.isOpen()).thenReturn(true);
+        when(remoteEndpoint.sendStringByFuture(MESSAGE)).thenReturn(future);
+        doThrow(exception).when(future).get(60000, TimeUnit.MILLISECONDS);
 
         try {
-            websocketProducer.sendToAll(store, MESSAGE, exchange);
+            websocketProducer.process(exchange);
             fail("Exception expected");
         } catch (Exception e) {
-            assertEquals(exception, e.getCause());
+            // expected
         }
 
         InOrder inOrder = inOrder(store, session, defaultWebsocket1, 
defaultWebsocket2, remoteEndpoint);
@@ -340,11 +295,11 @@ public class WebsocketProducerTest {
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verify(defaultWebsocket2, times(1)).getSession();
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket2, times(1)).getSession();
-        inOrder.verify(remoteEndpoint, times(1)).sendString(MESSAGE);
+        inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
         inOrder.verifyNoMoreInteractions();
     }
 }

Reply via email to