This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch camel-3.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.0.x by this push: new 3d6d4b0 CAMEL-14396: websocket-jsr356 consumer fails when endpoint URI parameters are provided 3d6d4b0 is described below commit 3d6d4b0804fade0efbb92d84cf1cec5f2352a9c7 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Tue Jan 14 15:08:56 2020 +0000 CAMEL-14396: websocket-jsr356 consumer fails when endpoint URI parameters are provided --- .../src/main/docs/websocket-jsr356-component.adoc | 2 +- .../websocket/jsr356/CamelServerEndpoint.java | 2 +- .../camel/websocket/jsr356/JSR356Consumer.java | 19 +++++------ .../camel/websocket/jsr356/JSR356Endpoint.java | 22 ++++++++---- .../camel/websocket/jsr356/JSR356Producer.java | 31 ++++++++++++----- .../websocket/jsr356/JSR356WebSocketComponent.java | 39 ++++++---------------- .../camel/websocket/jsr356/JSR356ConsumerTest.java | 15 ++++----- .../camel/websocket/jsr356/JSR356ProducerTest.java | 29 +++++----------- .../dsl/JSR356WebSocketEndpointBuilderFactory.java | 7 ++-- .../ROOT/pages/websocket-jsr356-component.adoc | 2 +- 10 files changed, 76 insertions(+), 92 deletions(-) diff --git a/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc b/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc index 7b04a42..22afd27 100644 --- a/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc +++ b/components/camel-websocket-jsr356/src/main/docs/websocket-jsr356-component.adoc @@ -63,7 +63,7 @@ with the following path and query parameters: [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type -| *websocketPathOrUri* | If a path (/foo) it will deploy locally the endpoint, if an uri it will connect to the corresponding server | | String +| *uri* | If a schemeless URI path is provided, a ServerEndpoint is deployed under that path. Else if the URI is prefixed with the 'ws://' scheme, then a connection is established to the corresponding server | | URI |=== diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java index 9d265b0..66bf8aa 100644 --- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java +++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/CamelServerEndpoint.java @@ -63,7 +63,7 @@ public class CamelServerEndpoint extends Endpoint { synchronized (session) { if (session.isOpen()) { try { - session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "an exception occured")); + session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "an exception occurred")); } catch (final IOException e) { log.debug("Error closing session #{}", session.getId(), e); } diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java index 5e1fbb3..856d71a 100644 --- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java +++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Consumer.java @@ -30,7 +30,6 @@ import org.apache.camel.support.DefaultConsumer; import static java.util.Optional.ofNullable; public class JSR356Consumer extends DefaultConsumer { - private final int sessionCount; private ClientSessions manager; private Runnable closeTask; @@ -45,9 +44,8 @@ public class JSR356Consumer extends DefaultConsumer { }); }; - JSR356Consumer(final JSR356Endpoint jsr356Endpoint, final Processor processor, final int sessionCount) { + JSR356Consumer(final JSR356Endpoint jsr356Endpoint, final Processor processor) { super(jsr356Endpoint, processor); - this.sessionCount = sessionCount; } @Override @@ -58,20 +56,19 @@ public class JSR356Consumer extends DefaultConsumer { @Override protected void doStart() throws Exception { super.doStart(); - final String endpointKey = getEndpoint().getEndpointUri().substring("websocket-jsr356://".length()); - if (endpointKey.contains("://")) { // we act as a client - final ClientEndpointConfig.Builder clientConfig = ClientEndpointConfig.Builder.create(); // todo: - // config - manager = new ClientSessions(sessionCount, URI.create(endpointKey), clientConfig.build(), onMessage); + final URI uri = getEndpoint().getUri(); + if (uri.getScheme() != null && uri.getScheme().equals("ws")) { // we act as a client + final ClientEndpointConfig.Builder clientConfig = ClientEndpointConfig.Builder.create(); + manager = new ClientSessions(getEndpoint().getSessionCount(), uri, clientConfig.build(), onMessage); manager.prepare(); } else { final JSR356WebSocketComponent.ContextBag bag = JSR356WebSocketComponent.getContext(null); - final CamelServerEndpoint endpoint = bag.getEndpoints().get(endpointKey); + final CamelServerEndpoint endpoint = bag.getEndpoints().get(uri.getPath()); if (endpoint == null) { // todo: make it customizable (the endpoint config) - final ServerEndpointConfig.Builder configBuilder = ServerEndpointConfig.Builder.create(CamelServerEndpoint.class, endpointKey); + final ServerEndpointConfig.Builder configBuilder = ServerEndpointConfig.Builder.create(CamelServerEndpoint.class, uri.getPath()); final CamelServerEndpoint serverEndpoint = new CamelServerEndpoint(); - bag.getEndpoints().put(endpointKey, serverEndpoint); + bag.getEndpoints().put(uri.getPath(), serverEndpoint); closeTask = addObserver(serverEndpoint); configBuilder.configurator(new ServerEndpointConfig.Configurator() { @Override diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java index ec90e82..810fe97 100644 --- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java +++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Endpoint.java @@ -16,6 +16,8 @@ */ package org.apache.camel.websocket.jsr356; +import java.net.URI; + import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -26,17 +28,15 @@ import org.apache.camel.support.DefaultEndpoint; @UriEndpoint(firstVersion = "2.23.0", scheme = "websocket-jsr356", title = "Javax Websocket", syntax = "websocket-jsr356:websocketPathOrUri", label = "jsr356") public class JSR356Endpoint extends DefaultEndpoint { - @UriPath(description = "If a path (/foo) it will deploy locally the endpoint, " + "if an uri it will connect to the corresponding server") - private String websocketPathOrUri; + @UriPath(description = "If a schemeless URI path is provided, a ServerEndpoint is deployed under that path. " + + "Else if the URI is prefixed with the 'ws://' scheme, then a connection is established to the corresponding server") + private URI uri; @UriParam(description = "Used when the endpoint is in client mode to populate a pool of sessions") private int sessionCount = 1; - private final JSR356WebSocketComponent component; - public JSR356Endpoint(final JSR356WebSocketComponent component, final String uri) { super(uri, component); - this.component = component; } @Override @@ -46,12 +46,20 @@ public class JSR356Endpoint extends DefaultEndpoint { @Override public Consumer createConsumer(final Processor processor) { - return new JSR356Consumer(this, processor, sessionCount); + return new JSR356Consumer(this, processor); } @Override public Producer createProducer() { - return new JSR356Producer(this, sessionCount); + return new JSR356Producer(this); + } + + public URI getUri() { + return uri; + } + + public void setUri(URI uri) { + this.uri = uri; } public int getSessionCount() { diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java index f74361c..d6eeaae 100644 --- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java +++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356Producer.java @@ -17,26 +17,28 @@ package org.apache.camel.websocket.jsr356; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.ByteBuffer; import java.util.function.BiConsumer; import javax.websocket.ClientEndpointConfig; +import javax.websocket.RemoteEndpoint; import javax.websocket.Session; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultAsyncProducer; +import org.apache.camel.util.IOHelper; import static java.util.Optional.ofNullable; public class JSR356Producer extends DefaultAsyncProducer { - private final int sessionCount; private ClientSessions manager; private BiConsumer<Exchange, AsyncCallback> onExchange; - JSR356Producer(final JSR356Endpoint jsr356Endpoint, final int sessionCount) { + JSR356Producer(final JSR356Endpoint jsr356Endpoint) { super(jsr356Endpoint); - this.sessionCount = sessionCount; } @Override @@ -60,20 +62,31 @@ public class JSR356Producer extends DefaultAsyncProducer { @Override protected void doStart() throws Exception { super.doStart(); - final String endpointKey = getEndpoint().getEndpointUri().substring("websocket-jsr356://".length()); - if (!endpointKey.contains("://")) { // we act as a client in all cases - // here - throw new IllegalArgumentException("You should pass a client uri"); + final URI uri = getEndpoint().getUri(); + if (uri.getScheme() != null && !uri.getScheme().equals("ws")) { + throw new IllegalArgumentException("WebSocket endpoint URI must be in the format: websocket-jsr356:ws://host:port/path"); } final ClientEndpointConfig.Builder clientConfig = ClientEndpointConfig.Builder.create(); - manager = new ClientSessions(sessionCount, URI.create(endpointKey), clientConfig.build(), null); + manager = new ClientSessions(getEndpoint().getSessionCount(), uri, clientConfig.build(), null); manager.prepare(); onExchange = (exchange, callback) -> manager.execute(session -> doSend(exchange, callback, session)); } private void doSend(final Exchange exchange, final AsyncCallback callback, final Session session) { try { - JSR356WebSocketComponent.sendMessage(session, exchange.getIn().getBody()); + Object body = exchange.getMessage().getBody(); + synchronized (session) { + final RemoteEndpoint.Basic basicRemote = session.getBasicRemote(); + if (String.class.isInstance(body)) { + basicRemote.sendText(String.valueOf(body)); + } else if (ByteBuffer.class.isInstance(body)) { + basicRemote.sendBinary(ByteBuffer.class.cast(body)); + } else if (InputStream.class.isInstance(body)) { + IOHelper.copy(InputStream.class.cast(body), basicRemote.getSendStream()); + } else { + throw new IllegalArgumentException("Unsupported input: " + body); + } + } } catch (final IOException e) { exchange.setException(e); } finally { diff --git a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java index 498437d..b270190 100644 --- a/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java +++ b/components/camel-websocket-jsr356/src/main/java/org/apache/camel/websocket/jsr356/JSR356WebSocketComponent.java @@ -16,50 +16,30 @@ */ package org.apache.camel.websocket.jsr356; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; +import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import javax.websocket.RemoteEndpoint; -import javax.websocket.Session; import javax.websocket.server.ServerContainer; import org.apache.camel.Endpoint; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; -import org.apache.camel.util.IOHelper; import static java.util.Optional.ofNullable; @Component("websocket-jsr356") public class JSR356WebSocketComponent extends DefaultComponent { - // didn't find a better way to handle that unless we can assume the - // CamelContext is in the ServletContext - private static final Map<String, ContextBag> SERVER_CONTAINERS = new ConcurrentHashMap<>(); - protected int sessionCount; + private static final Map<String, ContextBag> SERVER_CONTAINERS = new ConcurrentHashMap<>(); @Override - protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) { - return new JSR356Endpoint(this, uri); - } - - public static void sendMessage(final Session session, final Object message) throws IOException { - final RemoteEndpoint.Basic basicRemote = session.getBasicRemote(); // todo: handle async? - synchronized (session) { - if (String.class.isInstance(message)) { - basicRemote.sendText(String.valueOf(message)); - } else if (ByteBuffer.class.isInstance(message)) { - basicRemote.sendBinary(ByteBuffer.class.cast(message)); - } else if (InputStream.class.isInstance(message)) { - IOHelper.copy(InputStream.class.cast(message), basicRemote.getSendStream()); - } else { - throw new IllegalArgumentException("Unsupported input: " + message); - } - } + protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception { + JSR356Endpoint endpoint = new JSR356Endpoint(this, uri); + endpoint.setUri(new URI(remaining)); + setProperties(endpoint, parameters); + return endpoint; } public static void registerServer(final String contextPath, final ServerContainer container) { @@ -71,8 +51,9 @@ public class JSR356WebSocketComponent extends DefaultComponent { } public static ContextBag getContext(final String context) { - return ofNullable(context).map(SERVER_CONTAINERS::get) - .orElseGet(() -> SERVER_CONTAINERS.size() == 1 ? SERVER_CONTAINERS.values().iterator().next() : SERVER_CONTAINERS.get("")); + return ofNullable(context) + .map(SERVER_CONTAINERS::get) + .orElseGet(() -> SERVER_CONTAINERS.size() == 1 ? SERVER_CONTAINERS.values().iterator().next() : SERVER_CONTAINERS.get("")); } public static final class ContextBag { diff --git a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java index 5c5b253..018143f 100644 --- a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java +++ b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ConsumerTest.java @@ -56,7 +56,7 @@ public class JSR356ConsumerTest extends CamelTestSupport { final String message = ExistingServerEndpoint.class.getName() + "#" + testName.getMethodName(); final MockEndpoint mockEndpoint = getMockEndpoint("mock:" + testName.getMethodName()); mockEndpoint.expectedBodiesReceived(message); - ExistingServerEndpoint.self.doSend(); // to avoid lifecycle issue suring + ExistingServerEndpoint.doSend(); // to avoid lifecycle issue during // startup we send the message // only here mockEndpoint.assertIsSatisfied(); @@ -86,9 +86,9 @@ public class JSR356ConsumerTest extends CamelTestSupport { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("websocket-jsr356:///test").id("camel_consumer_acts_as_server").convertBodyTo(String.class).to("mock:ensureServerModeReceiveProperlyExchanges"); + from("websocket-jsr356:///test?sessionCount=5").id("camel_consumer_acts_as_server").convertBodyTo(String.class).to("mock:ensureServerModeReceiveProperlyExchanges"); - from("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver").id("camel_consumer_acts_as_client") + from("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver?sessionCount=5").id("camel_consumer_acts_as_client") .convertBodyTo(String.class).to("mock:ensureClientModeReceiveProperlyExchanges"); } }; @@ -97,18 +97,15 @@ public class JSR356ConsumerTest extends CamelTestSupport { @Dependent @ServerEndpoint("/existingserver") public static class ExistingServerEndpoint { - private static ExistingServerEndpoint self; - - private Session session; + private static Session session; @OnOpen public void onOpen(final Session session) { this.session = session; - self = this; } - void doSend() throws IOException { - session.getBasicRemote().sendText(getClass().getName() + "#ensureClientModeReceiveProperlyExchanges"); + static void doSend() throws IOException { + session.getBasicRemote().sendText(ExistingServerEndpoint.class.getName() + "#ensureClientModeReceiveProperlyExchanges"); } } } diff --git a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java index aebcbfd..f224aeb 100644 --- a/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java +++ b/components/camel-websocket-jsr356/src/test/java/org/apache/camel/websocket/jsr356/JSR356ProducerTest.java @@ -16,14 +16,11 @@ */ package org.apache.camel.websocket.jsr356; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.enterprise.context.Dependent; import javax.websocket.OnMessage; -import javax.websocket.OnOpen; -import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.apache.camel.Produce; @@ -36,9 +33,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import static java.util.Collections.singletonList; public class JSR356ProducerTest extends CamelTestSupport { + + private static LinkedBlockingQueue<String> messages = new LinkedBlockingQueue<>(); + @Rule public final MeecrowaveRule servlet = new MeecrowaveRule(new Meecrowave.Builder() { { @@ -57,8 +56,7 @@ public class JSR356ProducerTest extends CamelTestSupport { public void ensureServerModeSendsProperly() throws Exception { final String body = getClass().getName() + "#" + testName.getMethodName(); serverProducer.sendBody(body); - ExistingServerEndpoint.self.latch.await(); - assertEquals(singletonList(body), ExistingServerEndpoint.self.messages); + assertEquals(body, messages.poll(10, TimeUnit.SECONDS)); } @Override @@ -66,7 +64,7 @@ public class JSR356ProducerTest extends CamelTestSupport { return new RouteBuilder() { public void configure() { from("direct:ensureServerModeSendsProperly").id("camel_consumer_acts_as_client").convertBodyTo(String.class) - .to("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver"); + .to("websocket-jsr356://ws://localhost:" + servlet.getConfiguration().getHttpPort() + "/existingserver?sessionCount=5"); } }; } @@ -74,20 +72,9 @@ public class JSR356ProducerTest extends CamelTestSupport { @Dependent @ServerEndpoint("/existingserver") public static class ExistingServerEndpoint { - private static ExistingServerEndpoint self; - - private final Collection<String> messages = new ArrayList<>(); - private final CountDownLatch latch = new CountDownLatch(1); - - @OnOpen - public void onOpen(final Session session) { - self = this; - } - @OnMessage - public synchronized void onMessage(final String message) { + public void onMessage(final String message) { messages.add(message); - latch.countDown(); } } } diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java index f3db04a..4f9cc85 100644 --- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/JSR356WebSocketEndpointBuilderFactory.java @@ -474,9 +474,10 @@ public interface JSR356WebSocketEndpointBuilderFactory { * * Syntax: <code>websocket-jsr356:websocketPathOrUri</code> * - * Path parameter: websocketPathOrUri - * If a path (/foo) it will deploy locally the endpoint, if an uri it will - * connect to the corresponding server + * Path parameter: uri + * If a schemeless URI path is provided, a ServerEndpoint is deployed under + * that path. Else if the URI is prefixed with the 'ws://' scheme, then a + * connection is established to the corresponding server */ default JSR356WebSocketEndpointBuilder websocketJsr356(String path) { class JSR356WebSocketEndpointBuilderImpl extends AbstractEndpointBuilder implements JSR356WebSocketEndpointBuilder, AdvancedJSR356WebSocketEndpointBuilder { diff --git a/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc b/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc index c13459e..83acddc 100644 --- a/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc +++ b/docs/components/modules/ROOT/pages/websocket-jsr356-component.adoc @@ -64,7 +64,7 @@ with the following path and query parameters: [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type -| *websocketPathOrUri* | If a path (/foo) it will deploy locally the endpoint, if an uri it will connect to the corresponding server | | String +| *uri* | If a schemeless URI path is provided, a ServerEndpoint is deployed under that path. Else if the URI is prefixed with the 'ws://' scheme, then a connection is established to the corresponding server | | URI |===