Repository: camel Updated Branches: refs/heads/master 87eff0106 -> 49676b64f
CAMEL-11034 Undertow producer does not support ... ...`https` This started as a simple addition of `XnioSSL` to `UndertowClient::sendRequest`, but it has evolved into a partial rewrite of `UndertowProducer`. This I feel needed to be done as the former `UndertowProducerCallback` did not take into account flushing/closing the request channel. Which if added to current body of the callback resulted in `TruncatedResponseException` due to asynchronous nature of Undertow's IO. Also handling of errors was a bit inconsistent with the endpoint property `throwExceptionOnFailure` which could be intentional, but I felt it was not. So, this adds UndertowClientCallback that ensures consistent error handling, resource management (assured close) and Camel async exchange assured end. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49676b64 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49676b64 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49676b64 Branch: refs/heads/master Commit: 49676b64fa9fc4b0c23a6ad19fd9c8be2df3f08f Parents: 87eff01 Author: Zoran Regvart <zregv...@apache.org> Authored: Tue Mar 21 14:59:47 2017 +0100 Committer: Zoran Regvart <zregv...@apache.org> Committed: Wed Mar 22 18:43:29 2017 +0100 ---------------------------------------------------------------------- .../undertow/UndertowClientCallback.java | 273 +++++++++++++++++++ .../component/undertow/UndertowProducer.java | 252 ++++++----------- .../undertow/UndertowHttpsSpringTest.java | 2 +- .../src/test/resources/SpringTest.xml | 2 + 4 files changed, 364 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/49676b64/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java new file mode 100644 index 0000000..131255d --- /dev/null +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.undertow; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Consumer; + +import io.undertow.client.ClientCallback; +import io.undertow.client.ClientConnection; +import io.undertow.client.ClientExchange; +import io.undertow.client.ClientRequest; +import io.undertow.util.HeaderMap; +import io.undertow.util.HttpString; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.util.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xnio.ChannelListener; +import org.xnio.ChannelListeners; +import org.xnio.IoUtils; +import org.xnio.channels.StreamSinkChannel; + +/** + * Undertow {@link ClientCallback} that will get notified when the HTTP + * connection is ready or when the client failed to connect. It will also handle + * writing the request and reading the response in + * {@link #writeRequest(ClientExchange, ByteBuffer)} and + * {@link #setupResponseListner(ClientExchange)}. The main entry point is + * {@link #completed(ClientConnection)} or {@link #failed(IOException)} in case + * of errors, every error condition that should terminate Camel {@link Exchange} + * should go to {@link #hasFailedWith(Exception)} and successful execution of + * the exchange should end with {@link #finish(Message)}. Any + * {@link ClientCallback}s that are added here should extend + * {@link ErrorHandlingClientCallback}, best way to do that is to use the + * {@link #on(Consumer)} helper method. + */ +class UndertowClientCallback implements ClientCallback<ClientConnection> { + + /** + * {@link ClientCallback} that handles failures automatically by propagating + * the exception to Camel {@link Exchange} and notifies Camel that the + * exchange finished by calling {@link AsyncCallback#done(boolean)}. + */ + class ErrorHandlingClientCallback<T> implements ClientCallback<T> { + + private final Consumer<T> consumer; + + private ErrorHandlingClientCallback(final Consumer<T> consumer) { + this.consumer = consumer; + } + + @Override + public void completed(final T result) { + consumer.accept(result); + } + + @Override + public void failed(final IOException e) { + hasFailedWith(e); + } + + } + + private static final Logger LOG = LoggerFactory.getLogger(UndertowClientCallback.class); + + private final ByteBuffer body; + + private final AsyncCallback callback; + + /** + * A queue of resources that will be closed when the exchange ends, add more + * resources via {@link #deferClose(Closeable)}. + */ + private final BlockingDeque<Closeable> closables = new LinkedBlockingDeque<>(); + + private final UndertowEndpoint endpoint; + + private final Exchange exchange; + + private final ClientRequest request; + + private final Boolean throwExceptionOnFailure; + + UndertowClientCallback(final Exchange exchange, final AsyncCallback callback, final UndertowEndpoint endpoint, + final ClientRequest request, final ByteBuffer body) { + this.exchange = exchange; + this.callback = callback; + this.endpoint = endpoint; + this.request = request; + this.body = body; + throwExceptionOnFailure = endpoint.getThrowExceptionOnFailure(); + } + + @Override + public void completed(final ClientConnection connection) { + // we have established connection, make sure we close it + deferClose(connection); + + // now we can send the request and perform the exchange: writing the + // request and reading the response + connection.sendRequest(request, on(this::performClientExchange)); + } + + @Override + public void failed(final IOException e) { + hasFailedWith(e); + } + + ChannelListener<StreamSinkChannel> asyncWriter(final ByteBuffer body) { + return channel -> { + try { + write(channel, body); + + if (body.hasRemaining()) { + channel.resumeWrites(); + } else { + flush(channel); + } + } catch (final IOException e) { + hasFailedWith(e); + } + }; + } + + void deferClose(final Closeable closeable) { + try { + closables.putFirst(closeable); + } catch (final InterruptedException e) { + hasFailedWith(e); + } + } + + void finish(final Message result) { + for (final Closeable closeable : closables) { + IoUtils.safeClose(closeable); + } + + if (result != null) { + if (ExchangeHelper.isOutCapable(exchange)) { + exchange.setOut(result); + } else { + exchange.setIn(result); + } + } + + callback.done(false); + } + + void hasFailedWith(final Exception e) { + LOG.trace("Exchange has failed with", e); + if (Boolean.TRUE.equals(throwExceptionOnFailure)) { + exchange.setException(e); + } + + finish(null); + } + + <T> ClientCallback<T> on(final Consumer<T> consumer) { + return new ErrorHandlingClientCallback<>(consumer); + } + + void performClientExchange(final ClientExchange clientExchange) { + // add response listener to the exchange, we could receive the response + // at any time (async) + setupResponseListner(clientExchange); + + // write the request + writeRequest(clientExchange, body); + } + + void setupResponseListner(final ClientExchange clientExchange) { + clientExchange.setResponseListener(on(response -> { + LOG.trace("completed: {}", clientExchange); + + try { + storeCookies(clientExchange); + + final UndertowHttpBinding binding = endpoint.getUndertowHttpBinding(); + final Message result = binding.toCamelMessage(clientExchange, exchange); + + // we end Camel exchange here + finish(result); + } catch (final Exception e) { + hasFailedWith(e); + } + })); + } + + void storeCookies(final ClientExchange clientExchange) throws IOException, URISyntaxException { + if (endpoint.getCookieHandler() != null) { + // creating the url to use takes 2-steps + final String url = UndertowHelper.createURL(exchange, endpoint); + final URI uri = UndertowHelper.createURI(exchange, url, endpoint); + final HeaderMap headerMap = clientExchange.getResponse().getResponseHeaders(); + final Map<String, List<String>> m = new HashMap<>(); + for (final HttpString headerName : headerMap.getHeaderNames()) { + final List<String> headerValue = new LinkedList<>(); + for (int i = 0; i < headerMap.count(headerName); i++) { + headerValue.add(headerMap.get(headerName, i)); + } + m.put(headerName.toString(), headerValue); + } + endpoint.getCookieHandler().storeCookies(exchange, uri, m); + } + } + + void writeRequest(final ClientExchange clientExchange, final ByteBuffer body) { + final StreamSinkChannel requestChannel = clientExchange.getRequestChannel(); + if (body != null) { + try { + // try writing, we could be on IO thread and ready to write to + // the socket (or not) + write(requestChannel, body); + + if (body.hasRemaining()) { + // we did not write all of body (or at all) register a write + // listener to write asynchronously + requestChannel.getWriteSetter().set(asyncWriter(body)); + requestChannel.resumeWrites(); + } else { + // we are done, we need to flush the request + flush(requestChannel); + } + } catch (final IOException e) { + hasFailedWith(e); + } + } + } + + static void flush(final StreamSinkChannel channel) throws IOException { + // the canonical way of flushing Xnio channels + channel.shutdownWrites(); + if (!channel.flush()) { + channel.getWriteSetter().set(ChannelListeners.flushingChannelListener(IoUtils::safeClose, + ChannelListeners.closingChannelExceptionHandler())); + channel.resumeWrites(); + } + } + + static void write(final StreamSinkChannel channel, final ByteBuffer body) throws IOException { + int written = 1; + while (body.hasRemaining() && written > 0) { + written = channel.write(body); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/49676b64/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java index b1b1766..0bca663 100644 --- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java +++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java @@ -20,54 +20,56 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -import io.undertow.client.ClientCallback; -import io.undertow.client.ClientConnection; -import io.undertow.client.ClientExchange; +import javax.net.ssl.SSLContext; + import io.undertow.client.ClientRequest; import io.undertow.client.UndertowClient; +import io.undertow.protocols.ssl.UndertowXnioSsl; import io.undertow.server.DefaultByteBufferPool; import io.undertow.util.HeaderMap; import io.undertow.util.Headers; import io.undertow.util.HttpString; -import io.undertow.util.Protocols; + import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.TypeConverter; +import org.apache.camel.http.common.cookie.CookieHandler; import org.apache.camel.impl.DefaultAsyncProducer; -import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.IOHelper; -import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.StringHelper; import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.xnio.IoFuture; import org.xnio.OptionMap; import org.xnio.Xnio; import org.xnio.XnioWorker; +import org.xnio.ssl.XnioSsl; /** * The Undertow producer. * - * The implementation of Producer is considered as experimental. The Undertow client classes are not thread safe, - * their purpose is for the reverse proxy usage inside Undertow itself. This may change in the future versions and - * general purpose HTTP client wrapper will be added. Therefore this Producer may be changed too. + * The implementation of Producer is considered as experimental. The Undertow + * client classes are not thread safe, their purpose is for the reverse proxy + * usage inside Undertow itself. This may change in the future versions and + * general purpose HTTP client wrapper will be added. Therefore this Producer + * may be changed too. */ public class UndertowProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(UndertowProducer.class); - private UndertowEndpoint endpoint; - private XnioWorker worker; + + private UndertowClient client; + private final UndertowEndpoint endpoint; + private final OptionMap options; private DefaultByteBufferPool pool; - private OptionMap options; + private XnioSsl ssl; + private XnioWorker worker; - public UndertowProducer(UndertowEndpoint endpoint, OptionMap options) { + public UndertowProducer(final UndertowEndpoint endpoint, final OptionMap options) { super(endpoint); this.endpoint = endpoint; this.options = options; @@ -79,79 +81,97 @@ public class UndertowProducer extends DefaultAsyncProducer { } @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - ClientConnection connection = null; - + public boolean process(final Exchange camelExchange, final AsyncCallback callback) { + final URI uri; + final HttpString method; try { - final UndertowClient client = UndertowClient.getInstance(); + final String exchangeUri = UndertowHelper.createURL(camelExchange, getEndpoint()); + uri = UndertowHelper.createURI(camelExchange, exchangeUri, getEndpoint()); + method = UndertowHelper.createMethod(camelExchange, endpoint, camelExchange.getIn().getBody() != null); + } catch (final URISyntaxException e) { + camelExchange.setException(e); + callback.done(true); + return true; + } - IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker, pool, options); + final String pathAndQuery = URISupport.pathAndQueryOf(uri); - // creating the url to use takes 2-steps - final String exchangeUri = UndertowHelper.createURL(exchange, getEndpoint()); - final URI uri = UndertowHelper.createURI(exchange, exchangeUri, getEndpoint()); + final UndertowHttpBinding undertowHttpBinding = endpoint.getUndertowHttpBinding(); - final String pathAndQuery = URISupport.pathAndQueryOf(uri); + final CookieHandler cookieHandler = endpoint.getCookieHandler(); + final Map<String, List<String>> cookieHeaders; + if (cookieHandler != null) { + try { + cookieHeaders = cookieHandler.loadCookies(camelExchange, uri); + } catch (final IOException e) { + camelExchange.setException(e); + callback.done(true); + return true; + } + } else { + cookieHeaders = Collections.emptyMap(); + } - // what http method to use - HttpString method = UndertowHelper.createMethod(exchange, endpoint, exchange.getIn().getBody() != null); + final ClientRequest request = new ClientRequest(); + request.setMethod(method); + request.setPath(pathAndQuery); - ClientRequest request = new ClientRequest(); - request.setProtocol(Protocols.HTTP_1_1); - request.setPath(pathAndQuery); - request.setMethod(method); + final HeaderMap requestHeaders = request.getRequestHeaders(); - final HeaderMap requestHeaders = request.getRequestHeaders(); + // Set the Host header + final Message message = camelExchange.getIn(); + final String host = message.getHeader(Headers.HOST_STRING, String.class); + requestHeaders.put(Headers.HOST, Optional.ofNullable(host).orElseGet(() -> uri.getAuthority())); - // Set the Host header - Message message = exchange.getIn(); - final String host = message.getHeader("Host", String.class); - requestHeaders.put(Headers.HOST, Optional.ofNullable(host).orElseGet(()-> uri.getAuthority())); + final Object body = undertowHttpBinding.toHttpRequest(request, camelExchange.getIn()); - Object body = getRequestBody(request, exchange); + final TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); + final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body); - TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); - ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body); + if (body != null) { + requestHeaders.put(Headers.CONTENT_LENGTH, bodyAsByte.remaining()); + } - if (body != null) { - requestHeaders.put(Headers.CONTENT_LENGTH, bodyAsByte.array().length); - } + for (final Map.Entry<String, List<String>> entry : cookieHeaders.entrySet()) { + requestHeaders.putAll(HttpString.tryFromString(entry.getKey()), entry.getValue()); + } - if (getEndpoint().getCookieHandler() != null) { - Map<String, List<String>> cookieHeaders = getEndpoint().getCookieHandler().loadCookies(exchange, uri); - for (Map.Entry<String, List<String>> entry : cookieHeaders.entrySet()) { - requestHeaders.putAll(new HttpString(entry.getKey()), entry.getValue()); - } - } + if (LOG.isDebugEnabled()) { + LOG.debug("Executing http {} method: {}", method, pathAndQuery); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Executing http {} method: {}", method, pathAndQuery); - } - connection = connect.get(); - connection.sendRequest(request, new UndertowProducerCallback(connection, bodyAsByte, exchange, callback)); + final UndertowClientCallback clientCallback = new UndertowClientCallback(camelExchange, callback, getEndpoint(), + request, bodyAsByte); - } catch (Exception e) { - IOHelper.close(connection); - exchange.setException(e); - callback.done(true); - return true; - } + // when connect succeeds or fails UndertowClientCallback will + // get notified on a I/O thread run by Xnio worker. The writing + // of request and reading of response is performed also in the + // callback + client.connect(clientCallback, uri, worker, ssl, pool, options); - // use async routing engine + // the call above will proceed on Xnio I/O thread we will + // notify the exchange asynchronously when the HTTP exchange + // ends with success or failure from UndertowClientCallback return false; } - private Object getRequestBody(ClientRequest request, Exchange camelExchange) { - return endpoint.getUndertowHttpBinding().toHttpRequest(request, camelExchange.getIn()); - } - @Override protected void doStart() throws Exception { super.doStart(); - pool = new DefaultByteBufferPool(true, 8192); + // as in Undertow tests + pool = new DefaultByteBufferPool(true, 17 * 1024); + + final Xnio xnio = Xnio.getInstance(); + worker = xnio.createWorker(options); - worker = Xnio.getInstance().createWorker(options); + final SSLContext sslContext = getEndpoint().getSslContext(); + if (sslContext != null) { + ssl = new UndertowXnioSsl(xnio, options, sslContext); + } + + final CamelContext camelContext = getEndpoint().getCamelContext(); + client = UndertowClient.getInstance(camelContext.getApplicationContextClassLoader()); LOG.debug("Created worker: {} with options: {}", worker, options); } @@ -166,100 +186,4 @@ public class UndertowProducer extends DefaultAsyncProducer { } } - /** - * Everything important happens in callback - */ - private class UndertowProducerCallback implements ClientCallback<ClientExchange> { - - private final ClientConnection connection; - private final ByteBuffer body; - private final Exchange camelExchange; - private final AsyncCallback callback; - - UndertowProducerCallback(ClientConnection connection, ByteBuffer body, Exchange camelExchange, AsyncCallback callback) { - this.connection = connection; - this.body = body; - this.camelExchange = camelExchange; - this.callback = callback; - } - - @Override - public void completed(final ClientExchange clientExchange) { - clientExchange.setResponseListener(new ClientCallback<ClientExchange>() { - @Override - public void completed(ClientExchange clientExchange) { - LOG.trace("completed: {}", clientExchange); - try { - storeCookies(clientExchange); - Message message = endpoint.getUndertowHttpBinding().toCamelMessage(clientExchange, camelExchange); - if (ExchangeHelper.isOutCapable(camelExchange)) { - camelExchange.setOut(message); - } else { - camelExchange.setIn(message); - } - } catch (Exception e) { - camelExchange.setException(e); - } finally { - IOHelper.close(connection); - // make sure to call callback - callback.done(false); - } - } - - @Override - public void failed(IOException e) { - LOG.trace("failed: {}", e); - camelExchange.setException(e); - try { - IOHelper.close(connection); - } finally { - // make sure to call callback - callback.done(false); - } - } - }); - - try { - //send body if exists - if (body != null) { - clientExchange.getRequestChannel().write(body); - } - } catch (IOException e) { - camelExchange.setException(e); - IOHelper.close(connection); - // make sure to call callback - callback.done(false); - } - } - - @Override - public void failed(IOException e) { - LOG.trace("failed: {}", e); - if (getEndpoint().getThrowExceptionOnFailure()) { - camelExchange.setException(e); - } - IOHelper.close(connection); - // make sure to call callback - callback.done(false); - } - - private void storeCookies(ClientExchange clientExchange) throws URISyntaxException, IOException { - if (endpoint.getCookieHandler() != null) { - // creating the url to use takes 2-steps - String url = UndertowHelper.createURL(camelExchange, getEndpoint()); - URI uri = UndertowHelper.createURI(camelExchange, url, getEndpoint()); - HeaderMap headerMap = clientExchange.getResponse().getResponseHeaders(); - Map<String, List<String>> m = new HashMap<String, List<String>>(); - for (HttpString headerName : headerMap.getHeaderNames()) { - List<String> headerValue = new LinkedList<String>(); - for (int i = 0; i < headerMap.count(headerName); i++) { - headerValue.add(headerMap.get(headerName, i)); - } - m.put(headerName.toString(), headerValue); - } - endpoint.getCookieHandler().storeCookies(camelExchange, uri, m); - } - } - } - } http://git-wip-us.apache.org/repos/asf/camel/blob/49676b64/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpsSpringTest.java ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpsSpringTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpsSpringTest.java index 57cdb0b..fa61fc5 100644 --- a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpsSpringTest.java +++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpsSpringTest.java @@ -59,7 +59,7 @@ public class UndertowHttpsSpringTest { public void testSSLConsumer() throws Exception { mockEndpoint.expectedBodiesReceived("Hello World"); - String out = template.requestBody("https://localhost:" + port + "/spring", "Hello World", String.class); + String out = template.requestBody("undertow:https://localhost:" + port + "/spring?sslContextParameters=#sslClient", "Hello World", String.class); assertEquals("Bye World", out); mockEndpoint.assertIsSatisfied(); http://git-wip-us.apache.org/repos/asf/camel/blob/49676b64/components/camel-undertow/src/test/resources/SpringTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-undertow/src/test/resources/SpringTest.xml b/components/camel-undertow/src/test/resources/SpringTest.xml index eeee003..9026712 100644 --- a/components/camel-undertow/src/test/resources/SpringTest.xml +++ b/components/camel-undertow/src/test/resources/SpringTest.xml @@ -32,6 +32,8 @@ </camel:trustManagers> </camel:sslContextParameters> + <camel:sslContextParameters id="sslClient" /> + <bean id="dynaPort" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> <property name="targetClass"> <value>org.apache.camel.test.AvailablePortFinder</value>