CAMEL-10283: camel-websocket producer should not use blocking send operations. 
This can cause sendToAll to block while attempting to send to many clients.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/74ffd483
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/74ffd483
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/74ffd483

Branch: refs/heads/master
Commit: 74ffd4831327fe2a5013dd129669d61868ac3c77
Parents: 83f6706
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Sep 4 12:08:24 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Sep 4 12:08:24 2016 +0200

----------------------------------------------------------------------
 .../src/main/docs/websocket-component.adoc      |  2 +-
 .../component/websocket/WebsocketEndpoint.java  |  6 +--
 .../component/websocket/WebsocketProducer.java  | 43 +++++++++++++++-----
 .../websocket/WebsocketSendException.java       | 36 ++++++++++++++++
 .../websocket/WebsocketProducerTest.java        |  5 ++-
 5 files changed, 75 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/docs/websocket-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-websocket/src/main/docs/websocket-component.adoc 
b/components/camel-websocket/src/main/docs/websocket-component.adoc
index a473bec..2659ef0 100644
--- a/components/camel-websocket/src/main/docs/websocket-component.adoc
+++ b/components/camel-websocket/src/main/docs/websocket-component.adoc
@@ -86,7 +86,7 @@ The Jetty Websocket component supports 21 endpoint options 
which are listed belo
 | sessionSupport | consumer | false | boolean | Whether to enable session 
support which enables HttpSession for each http request.
 | staticResources | consumer |  | String | Set a resource path for static 
resources (such as .html files etc). The resources can be loaded from classpath 
if you prefix with classpath: otherwise the resources is loaded from file 
system or from JAR files. For example to load from root classpath use 
classpath:. or classpath:WEB-INF/static If not configured (eg null) then no 
static resource is in use.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the 
consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler 
is enabled then this options is not in use. By default the consumer will deal 
with exceptions that will be logged at WARN/ERROR level and ignored.
-| sendTimeout | producer | 60000 | Integer | Timeout in millise when sending 
to a websocket channel. The default timeout is 60000 (60 seconds).
+| sendTimeout | producer | 30000 | Integer | Timeout in millise when sending 
to a websocket channel. The default timeout is 30000 (30 seconds).
 | sendToAll | producer |  | Boolean | To send to all websocket subscribers. 
Can be used to configure on endpoint level instead of having to use the 
WebsocketConstants.SEND_TO_ALL header on the message.
 | bufferSize | advanced | 8192 | Integer | Set the buffer size of the 
websocketServlet which is also the max frame byte size (default 8192)
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default 
exchange pattern when creating an exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
index 0ce9f49..5ac923e 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
@@ -54,8 +54,8 @@ public class WebsocketEndpoint extends DefaultEndpoint {
 
     @UriParam(label = "producer")
     private Boolean sendToAll;
-    @UriParam(label = "producer", defaultValue = "60000")
-    private Integer sendTimeout = 60000;
+    @UriParam(label = "producer", defaultValue = "30000")
+    private Integer sendTimeout = 30000;
     @UriParam(label = "monitoring")
     private boolean enableJmx;
     @UriParam(label = "consumer")
@@ -196,7 +196,7 @@ public class WebsocketEndpoint extends DefaultEndpoint {
 
     /**
      * Timeout in millise when sending to a websocket channel.
-     * The default timeout is 60000 (60 seconds).
+     * The default timeout is 30000 (30 seconds).
      */
     public void setSendTimeout(Integer sendTimeout) {
         this.sendTimeout = sendTimeout;

http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
index 6f50317..80d0618 100644
--- 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
@@ -24,9 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.StopWatch;
@@ -60,13 +58,14 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
                 log.debug("Sending to connection key {} -> {}", connectionKey, 
message);
                 Future<Void> future = sendMessage(websocket, message);
                 if (future != null) {
-                    future.get(endpoint.getSendTimeout(), 
TimeUnit.MILLISECONDS);
-                    if (!future.isDone()) {
-                        throw new ExchangeTimedOutException(exchange, 
endpoint.getSendTimeout(), "Failed to send message to the connection");
+                    int timeout = endpoint.getSendTimeout();
+                    future.get(timeout, TimeUnit.MILLISECONDS);
+                    if (!future.isCancelled() && !future.isDone()) {
+                        throw new WebsocketSendException("Failed to send 
message to the connection within " + timeout + " millis.", exchange);
                     }
                 }
             } else {
-                throw new IllegalArgumentException("Failed to send message to 
single connection; connection key not set.");
+                throw new WebsocketSendException("Failed to send message to 
single connection; connection key not set.", exchange);
             }
         }
     }
@@ -75,7 +74,6 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
         return endpoint;
     }
 
-
     @Override
     public void doStart() throws Exception {
         super.doStart();
@@ -108,24 +106,35 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
                 }
             } catch (Exception e) {
                 if (exception == null) {
-                    exception = new CamelExchangeException("Failed to deliver 
message to one or more recipients.", exchange, e);
+                    exception = new WebsocketSendException("Failed to deliver 
message to one or more recipients.", exchange, e);
                 }
             }
         }
 
         // check if they are all done within the timed out period
         StopWatch watch = new StopWatch();
-        while (!futures.isEmpty() && watch.taken() < 
endpoint.getSendTimeout()) {
+        int timeout = endpoint.getSendTimeout();
+        while (!futures.isEmpty() && watch.taken() < timeout) {
             // remove all that are done/cancelled
             for (Future future : futures) {
                 if (future.isDone() || future.isCancelled()) {
                     futures.remove(future);
                 }
+                // if there are still more then we need to wait a little bit 
before checking again, to avoid burning cpu cycles in the while loop
+                if (!futures.isEmpty()) {
+                    long interval = Math.min(1000, timeout);
+                    log.debug("Sleeping {} millis waiting for sendToAll to 
complete sending with timeout {} millis", interval, timeout);
+                    try {
+                        Thread.sleep(interval);
+                    } catch (InterruptedException e) {
+                        handleSleepInterruptedException(e, exchange);
+                    }
+                }
             }
-            // TODO sleep a bit until done to avoid burning cpu cycles
+
         }
         if (!futures.isEmpty()) {
-            exception = new CamelExchangeException("Failed to deliver message 
within " + endpoint.getSendTimeout() + " millis to one or more recipients.", 
exchange);
+            exception = new WebsocketSendException("Failed to deliver message 
within " + endpoint.getSendTimeout() + " millis to one or more recipients.", 
exchange);
         }
 
         if (exception != null) {
@@ -152,4 +161,16 @@ public class WebsocketProducer extends DefaultProducer 
implements WebsocketProdu
     public void setStore(WebsocketStore store) {
         this.store = store;
     }
+
+    /**
+     * Called when a sleep is interrupted; allows derived classes to handle 
this case differently
+     */
+    protected void handleSleepInterruptedException(InterruptedException e, 
Exchange exchange) throws InterruptedException {
+        if (log.isDebugEnabled()) {
+            log.debug("Sleep interrupted, are we stopping? {}", isStopping() 
|| isStopped());
+        }
+        Thread.currentThread().interrupt();
+        throw e;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java
 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java
new file mode 100644
index 0000000..79f401c
--- /dev/null
+++ 
b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+/**
+ * Exception while sending to a websocket channel.
+ */
+public class WebsocketSendException extends CamelExchangeException {
+
+    private static final long serialVersionUID = 1L;
+
+    public WebsocketSendException(String message, Exchange exchange) {
+        super(message, exchange);
+    }
+
+    public WebsocketSendException(String message, Exchange exchange, Throwable 
cause) {
+        super(message, exchange, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
index 4e362c8..e49b9b4 100644
--- 
a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
+++ 
b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java
@@ -141,7 +141,7 @@ public class WebsocketProducerTest {
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket1, times(1)).getSession();
         inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
-        inOrder.verify(endpoint, times(2)).getSendTimeout();
+        inOrder.verify(endpoint, times(1)).getSendTimeout();
         inOrder.verifyNoMoreInteractions();
     }
 
@@ -171,6 +171,7 @@ public class WebsocketProducerTest {
         inOrder.verify(session, times(1)).isOpen();
         inOrder.verify(defaultWebsocket2, times(1)).getSession();
         inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE);
+        inOrder.verify(endpoint, times(1)).getSendTimeout();
         inOrder.verifyNoMoreInteractions();
     }
 
@@ -185,7 +186,7 @@ public class WebsocketProducerTest {
             websocketProducer.process(exchange);
             fail("Exception expected");
         } catch (Exception e) {
-            assertEquals(IllegalArgumentException.class, e.getClass());
+            assertEquals(WebsocketSendException.class, e.getClass());
             assertNotNull(e.getMessage());
             assertNull(e.getCause());
         }

Reply via email to