CAMEL-10283: camel-websocket producer should not use blocking send operations. This can cause sendToAll to block while attempting to send to many clients.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/74ffd483 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/74ffd483 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/74ffd483 Branch: refs/heads/master Commit: 74ffd4831327fe2a5013dd129669d61868ac3c77 Parents: 83f6706 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Sep 4 12:08:24 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Sep 4 12:08:24 2016 +0200 ---------------------------------------------------------------------- .../src/main/docs/websocket-component.adoc | 2 +- .../component/websocket/WebsocketEndpoint.java | 6 +-- .../component/websocket/WebsocketProducer.java | 43 +++++++++++++++----- .../websocket/WebsocketSendException.java | 36 ++++++++++++++++ .../websocket/WebsocketProducerTest.java | 5 ++- 5 files changed, 75 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/docs/websocket-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/docs/websocket-component.adoc b/components/camel-websocket/src/main/docs/websocket-component.adoc index a473bec..2659ef0 100644 --- a/components/camel-websocket/src/main/docs/websocket-component.adoc +++ b/components/camel-websocket/src/main/docs/websocket-component.adoc @@ -86,7 +86,7 @@ The Jetty Websocket component supports 21 endpoint options which are listed belo | sessionSupport | consumer | false | boolean | Whether to enable session support which enables HttpSession for each http request. | staticResources | consumer | | String | Set a resource path for static resources (such as .html files etc). The resources can be loaded from classpath if you prefix with classpath: otherwise the resources is loaded from file system or from JAR files. For example to load from root classpath use classpath:. or classpath:WEB-INF/static If not configured (eg null) then no static resource is in use. | exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. -| sendTimeout | producer | 60000 | Integer | Timeout in millise when sending to a websocket channel. The default timeout is 60000 (60 seconds). +| sendTimeout | producer | 30000 | Integer | Timeout in millise when sending to a websocket channel. The default timeout is 30000 (30 seconds). | sendToAll | producer | | Boolean | To send to all websocket subscribers. Can be used to configure on endpoint level instead of having to use the WebsocketConstants.SEND_TO_ALL header on the message. | bufferSize | advanced | 8192 | Integer | Set the buffer size of the websocketServlet which is also the max frame byte size (default 8192) | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java index 0ce9f49..5ac923e 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java @@ -54,8 +54,8 @@ public class WebsocketEndpoint extends DefaultEndpoint { @UriParam(label = "producer") private Boolean sendToAll; - @UriParam(label = "producer", defaultValue = "60000") - private Integer sendTimeout = 60000; + @UriParam(label = "producer", defaultValue = "30000") + private Integer sendTimeout = 30000; @UriParam(label = "monitoring") private boolean enableJmx; @UriParam(label = "consumer") @@ -196,7 +196,7 @@ public class WebsocketEndpoint extends DefaultEndpoint { /** * Timeout in millise when sending to a websocket channel. - * The default timeout is 60000 (60 seconds). + * The default timeout is 30000 (30 seconds). */ public void setSendTimeout(Integer sendTimeout) { this.sendTimeout = sendTimeout; http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java index 6f50317..80d0618 100644 --- a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java @@ -24,9 +24,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; -import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.StopWatch; @@ -60,13 +58,14 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu log.debug("Sending to connection key {} -> {}", connectionKey, message); Future<Void> future = sendMessage(websocket, message); if (future != null) { - future.get(endpoint.getSendTimeout(), TimeUnit.MILLISECONDS); - if (!future.isDone()) { - throw new ExchangeTimedOutException(exchange, endpoint.getSendTimeout(), "Failed to send message to the connection"); + int timeout = endpoint.getSendTimeout(); + future.get(timeout, TimeUnit.MILLISECONDS); + if (!future.isCancelled() && !future.isDone()) { + throw new WebsocketSendException("Failed to send message to the connection within " + timeout + " millis.", exchange); } } } else { - throw new IllegalArgumentException("Failed to send message to single connection; connection key not set."); + throw new WebsocketSendException("Failed to send message to single connection; connection key not set.", exchange); } } } @@ -75,7 +74,6 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu return endpoint; } - @Override public void doStart() throws Exception { super.doStart(); @@ -108,24 +106,35 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu } } catch (Exception e) { if (exception == null) { - exception = new CamelExchangeException("Failed to deliver message to one or more recipients.", exchange, e); + exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e); } } } // check if they are all done within the timed out period StopWatch watch = new StopWatch(); - while (!futures.isEmpty() && watch.taken() < endpoint.getSendTimeout()) { + int timeout = endpoint.getSendTimeout(); + while (!futures.isEmpty() && watch.taken() < timeout) { // remove all that are done/cancelled for (Future future : futures) { if (future.isDone() || future.isCancelled()) { futures.remove(future); } + // if there are still more then we need to wait a little bit before checking again, to avoid burning cpu cycles in the while loop + if (!futures.isEmpty()) { + long interval = Math.min(1000, timeout); + log.debug("Sleeping {} millis waiting for sendToAll to complete sending with timeout {} millis", interval, timeout); + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + handleSleepInterruptedException(e, exchange); + } + } } - // TODO sleep a bit until done to avoid burning cpu cycles + } if (!futures.isEmpty()) { - exception = new CamelExchangeException("Failed to deliver message within " + endpoint.getSendTimeout() + " millis to one or more recipients.", exchange); + exception = new WebsocketSendException("Failed to deliver message within " + endpoint.getSendTimeout() + " millis to one or more recipients.", exchange); } if (exception != null) { @@ -152,4 +161,16 @@ public class WebsocketProducer extends DefaultProducer implements WebsocketProdu public void setStore(WebsocketStore store) { this.store = store; } + + /** + * Called when a sleep is interrupted; allows derived classes to handle this case differently + */ + protected void handleSleepInterruptedException(InterruptedException e, Exchange exchange) throws InterruptedException { + if (log.isDebugEnabled()) { + log.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); + } + Thread.currentThread().interrupt(); + throw e; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java new file mode 100644 index 0000000..79f401c --- /dev/null +++ b/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketSendException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.websocket; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; + +/** + * Exception while sending to a websocket channel. + */ +public class WebsocketSendException extends CamelExchangeException { + + private static final long serialVersionUID = 1L; + + public WebsocketSendException(String message, Exchange exchange) { + super(message, exchange); + } + + public WebsocketSendException(String message, Exchange exchange, Throwable cause) { + super(message, exchange, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/74ffd483/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java index 4e362c8..e49b9b4 100644 --- a/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java +++ b/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketProducerTest.java @@ -141,7 +141,7 @@ public class WebsocketProducerTest { inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket1, times(1)).getSession(); inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); - inOrder.verify(endpoint, times(2)).getSendTimeout(); + inOrder.verify(endpoint, times(1)).getSendTimeout(); inOrder.verifyNoMoreInteractions(); } @@ -171,6 +171,7 @@ public class WebsocketProducerTest { inOrder.verify(session, times(1)).isOpen(); inOrder.verify(defaultWebsocket2, times(1)).getSession(); inOrder.verify(remoteEndpoint, times(1)).sendStringByFuture(MESSAGE); + inOrder.verify(endpoint, times(1)).getSendTimeout(); inOrder.verifyNoMoreInteractions(); } @@ -185,7 +186,7 @@ public class WebsocketProducerTest { websocketProducer.process(exchange); fail("Exception expected"); } catch (Exception e) { - assertEquals(IllegalArgumentException.class, e.getClass()); + assertEquals(WebsocketSendException.class, e.getClass()); assertNotNull(e.getMessage()); assertNull(e.getCause()); }