This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push: new 53256bc Base knative-http component to vertx-web as the next undertow major release (3) will be based on it #126 53256bc is described below commit 53256bcdf4f0123be452ad835e17b50965b88f5a Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Aug 14 00:37:21 2019 +0200 Base knative-http component to vertx-web as the next undertow major release (3) will be based on it #126 --- camel-k-loader-kotlin/pom.xml | 1 - camel-knative-http/pom.xml | 11 +- .../camel/component/knative/http/KnativeHttp.java | 109 +------ .../component/knative/http/KnativeHttpBinding.java | 318 --------------------- .../knative/http/KnativeHttpClientCallback.java | 242 ---------------- .../knative/http/KnativeHttpComponent.java | 166 ++++++++--- .../knative/http/KnativeHttpConsumer.java | 234 +++++++++++---- .../http/KnativeHttpConsumerDispatcher.java | 193 +++++++++++++ .../knative/http/KnativeHttpDispatcher.java | 181 ------------ .../knative/http/KnativeHttpEndpoint.java | 38 +-- .../http/KnativeHttpHeaderFilterStrategy.java | 3 +- .../knative/http/KnativeHttpProducer.java | 178 +++++++----- .../component/knative/http/KnativeHttpSupport.java | 49 ++++ .../component/knative/http/KnativeHttpTest.java | 252 ++++++++-------- .../component/knative/CloudEventsV01Test.java | 14 +- .../component/knative/CloudEventsV02Test.java | 14 +- .../component/knative/KnativeComponentTest.java | 126 +------- pom.xml | 1 + 18 files changed, 829 insertions(+), 1301 deletions(-) diff --git a/camel-k-loader-kotlin/pom.xml b/camel-k-loader-kotlin/pom.xml index f7c3b75..7ca24f4 100644 --- a/camel-k-loader-kotlin/pom.xml +++ b/camel-k-loader-kotlin/pom.xml @@ -177,7 +177,6 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> - <useSystemClassLoader>false</useSystemClassLoader> <forkCount>0</forkCount> </configuration> </plugin> diff --git a/camel-knative-http/pom.xml b/camel-knative-http/pom.xml index 08c7a42..44b7e06 100644 --- a/camel-knative-http/pom.xml +++ b/camel-knative-http/pom.xml @@ -53,9 +53,14 @@ </dependency> <dependency> - <groupId>io.undertow</groupId> - <artifactId>undertow-core</artifactId> - <version>${undertow.version}</version> + <groupId>io.vertx</groupId> + <artifactId>vertx-web</artifactId> + <version>${vertx.version}</version> + </dependency> + <dependency> + <groupId>io.vertx</groupId> + <artifactId>vertx-web-client</artifactId> + <version>${vertx.version}</version> </dependency> <dependency> diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java index ac5c713..aad1725 100644 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java @@ -17,21 +17,26 @@ package org.apache.camel.component.knative.http; import java.util.Objects; -import javax.net.ssl.SSLContext; +import java.util.regex.Pattern; + +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServerRequest; public final class KnativeHttp { + public static final int DEFAULT_PORT = 8080; + public static final String DEFAULT_PATH = "/"; + public static final Pattern ENDPOINT_PATTERN = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)"); + private KnativeHttp() { } - public static final class HostKey { + public static final class ServerKey { private final String host; private final int port; - private final SSLContext sslContext; - public HostKey(String host, int port, SSLContext ssl) { + public ServerKey(String host, int port) { this.host = host; this.port = port; - this.sslContext = ssl; } public String getHost() { @@ -42,10 +47,6 @@ public final class KnativeHttp { return port; } - public SSLContext getSslContext() { - return sslContext; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -54,9 +55,8 @@ public final class KnativeHttp { if (o == null || getClass() != o.getClass()) { return false; } - HostKey key = (HostKey) o; - return getPort() == key.getPort() - && getHost().equals(key.getHost()); + ServerKey key = (ServerKey) o; + return getPort() == key.getPort() && getHost().equals(key.getHost()); } @Override @@ -65,88 +65,7 @@ public final class KnativeHttp { } } - - /** - * Options to configure an Undertow host. - */ - public static final class HostOptions { - - /** - * The number of worker threads to use in a Undertow host. - */ - private Integer workerThreads; - - /** - * The number of io threads to use in a Undertow host. - */ - private Integer ioThreads; - - /** - * The buffer size of the Undertow host. - */ - private Integer bufferSize; - - /** - * Set if the Undertow host should use direct buffers. - */ - private Boolean directBuffers; - - /** - * Set if the Undertow host should use http2 protocol. - */ - private Boolean http2Enabled; - - - public Integer getWorkerThreads() { - return workerThreads; - } - - public void setWorkerThreads(Integer workerThreads) { - this.workerThreads = workerThreads; - } - - public Integer getIoThreads() { - return ioThreads; - } - - public void setIoThreads(Integer ioThreads) { - this.ioThreads = ioThreads; - } - - public Integer getBufferSize() { - return bufferSize; - } - - public void setBufferSize(Integer bufferSize) { - this.bufferSize = bufferSize; - } - - public Boolean getDirectBuffers() { - return directBuffers; - } - - public void setDirectBuffers(Boolean directBuffers) { - this.directBuffers = directBuffers; - } - - public Boolean getHttp2Enabled() { - return http2Enabled; - } - - public void setHttp2Enabled(Boolean http2Enabled) { - this.http2Enabled = http2Enabled; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("UndertowHostOptions{"); - sb.append("workerThreads=").append(workerThreads); - sb.append(", ioThreads=").append(ioThreads); - sb.append(", bufferSize=").append(bufferSize); - sb.append(", directBuffers=").append(directBuffers); - sb.append(", http2Enabled=").append(http2Enabled); - sb.append('}'); - return sb.toString(); - } + public interface PredicatedHandler extends Handler<HttpServerRequest> { + boolean canHandle(HttpServerRequest event); } } diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java deleted file mode 100644 index 8dd6f34..0000000 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.knative.http; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.ArrayList; -import java.util.Deque; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; - -import io.undertow.client.ClientExchange; -import io.undertow.client.ClientRequest; -import io.undertow.client.ClientResponse; -import io.undertow.server.HttpServerExchange; -import io.undertow.util.HeaderMap; -import io.undertow.util.Headers; -import io.undertow.util.HttpString; -import io.undertow.util.Methods; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.TypeConverter; -import org.apache.camel.spi.HeaderFilterStrategy; -import org.apache.camel.support.DefaultMessage; -import org.apache.camel.support.ExchangeHelper; -import org.apache.camel.support.MessageHelper; -import org.apache.camel.support.ObjectHelper; -import org.apache.camel.util.IOHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.xnio.channels.BlockingReadableByteChannel; -import org.xnio.channels.StreamSourceChannel; - -public final class KnativeHttpBinding { - private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpBinding.class); - - private final HeaderFilterStrategy headerFilterStrategy; - private final Boolean transferException; - - public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy) { - this(headerFilterStrategy, Boolean.FALSE); - } - - public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy, Boolean transferException) { - this.headerFilterStrategy = Objects.requireNonNull(headerFilterStrategy, "headerFilterStrategy"); - this.transferException = transferException; - } - - public Message toCamelMessage(HttpServerExchange httpExchange, Exchange exchange) throws Exception { - Message result = new DefaultMessage(exchange.getContext()); - - populateCamelHeaders(httpExchange, result.getHeaders(), exchange); - - //extract body by myself if undertow parser didn't handle and the method is allowed to have one - //body is extracted as byte[] then auto TypeConverter kicks in - if (Methods.POST.equals(httpExchange.getRequestMethod()) || Methods.PUT.equals(httpExchange.getRequestMethod()) || Methods.PATCH.equals(httpExchange.getRequestMethod())) { - result.setBody(readFromChannel(httpExchange.getRequestChannel())); - } else { - result.setBody(null); - } - - return result; - } - - public Message toCamelMessage(ClientExchange clientExchange, Exchange exchange) throws Exception { - Message result = new DefaultMessage(exchange.getContext()); - - //retrieve response headers - populateCamelHeaders(clientExchange.getResponse(), result.getHeaders(), exchange); - - result.setBody(readFromChannel(clientExchange.getResponseChannel())); - - return result; - } - - public void populateCamelHeaders(HttpServerExchange httpExchange, Map<String, Object> headersMap, Exchange exchange) { - String path = httpExchange.getRequestPath(); - KnativeHttpEndpoint endpoint = (KnativeHttpEndpoint) exchange.getFromEndpoint(); - if (endpoint.getHttpURI() != null) { - // need to match by lower case as we want to ignore case on context-path - String endpointPath = endpoint.getHttpURI().getPath(); - String matchPath = path.toLowerCase(Locale.US); - String match = endpointPath.toLowerCase(Locale.US); - if (matchPath.startsWith(match)) { - path = path.substring(endpointPath.length()); - } - } - headersMap.put(Exchange.HTTP_PATH, path); - - for (HttpString name : httpExchange.getRequestHeaders().getHeaderNames()) { - if (name.toString().toLowerCase(Locale.US).equals("content-type")) { - name = Headers.CONTENT_TYPE; - } - - if (name.toString().toLowerCase(Locale.US).equals("authorization")) { - String value = httpExchange.getRequestHeaders().get(name).toString(); - // store a special header that this request was authenticated using HTTP Basic - if (value != null && value.trim().startsWith("Basic")) { - if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) { - appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic"); - } - } - } - - // add the headers one by one, and use the header filter strategy - for (Object value : httpExchange.getRequestHeaders().get(name)) { - if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) { - appendHeader(headersMap, name.toString(), value); - } - } - } - - //process uri parameters as headers - Map<String, Deque<String>> pathParameters = httpExchange.getQueryParameters(); - for (Map.Entry<String, Deque<String>> entry : pathParameters.entrySet()) { - String name = entry.getKey(); - for (Object value: entry.getValue()) { - if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) { - appendHeader(headersMap, name, value); - } - } - } - - headersMap.put(Exchange.HTTP_METHOD, httpExchange.getRequestMethod().toString()); - headersMap.put(Exchange.HTTP_URL, httpExchange.getRequestURL()); - headersMap.put(Exchange.HTTP_URI, httpExchange.getRequestURI()); - headersMap.put(Exchange.HTTP_QUERY, httpExchange.getQueryString()); - headersMap.put(Exchange.HTTP_RAW_QUERY, httpExchange.getQueryString()); - } - - public void populateCamelHeaders(ClientResponse response, Map<String, Object> headersMap, Exchange exchange) throws Exception { - headersMap.put(Exchange.HTTP_RESPONSE_CODE, response.getResponseCode()); - - for (HttpString name : response.getResponseHeaders().getHeaderNames()) { - // mapping the content-type - //String name = httpName.toString(); - if (name.toString().toLowerCase(Locale.US).equals("content-type")) { - name = Headers.CONTENT_TYPE; - } - - if (name.toString().toLowerCase(Locale.US).equals("authorization")) { - String value = response.getResponseHeaders().get(name).toString(); - // store a special header that this request was authenticated using HTTP Basic - if (value != null && value.trim().startsWith("Basic")) { - if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) { - appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic"); - } - } - } - - // add the headers one by one, and use the header filter strategy - for (Object value : response.getResponseHeaders().get(name)) { - if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) { - appendHeader(headersMap, name.toString(), value); - } - } - } - } - - public Object toHttpResponse(HttpServerExchange httpExchange, Message message) throws IOException { - final boolean failed = message.getExchange().isFailed(); - final int defaultCode = failed ? 500 : 200; - final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class); - final TypeConverter tc = message.getExchange().getContext().getTypeConverter(); - - httpExchange.setStatusCode(code); - - //copy headers from Message to Response - for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { - String key = entry.getKey(); - Object value = entry.getValue(); - // use an iterator as there can be multiple values. (must not use a delimiter) - for (Object it: ObjectHelper.createIterable(value, null)) { - String headerValue = tc.convertTo(String.class, it); - if (headerValue == null) { - continue; - } - if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { - httpExchange.getResponseHeaders().add(new HttpString(key), headerValue); - } - } - } - - Object body = message.getBody(); - Exception exception = message.getExchange().getException(); - - if (exception != null) { - if (transferException) { - // we failed due an exception, and transfer it as java serialized object - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(exception); - oos.flush(); - IOHelper.close(oos, bos); - - // the body should be the serialized java object of the exception - body = ByteBuffer.wrap(bos.toByteArray()); - // force content type to be serialized java object - message.setHeader(Exchange.CONTENT_TYPE, "application/x-java-serialized-object"); - } else { - // we failed due an exception so print it as plain text - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - exception.printStackTrace(pw); - - // the body should then be the stacktrace - body = ByteBuffer.wrap(sw.toString().getBytes()); - // force content type to be text/plain as that is what the stacktrace is - message.setHeader(Exchange.CONTENT_TYPE, "text/plain"); - } - - // and mark the exception as failure handled, as we handled it by returning it as the response - ExchangeHelper.setFailureHandled(message.getExchange()); - } - - // set the content type in the response. - String contentType = MessageHelper.getContentType(message); - if (contentType != null) { - // set content-type - httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType); - LOG.trace("Content-Type: {}", contentType); - } - return body; - } - - public Object toHttpRequest(ClientRequest clientRequest, Message message) { - final Object body = message.getBody(); - final HeaderMap requestHeaders = clientRequest.getRequestHeaders(); - - // set the content type in the response. - String contentType = MessageHelper.getContentType(message); - if (contentType != null) { - // set content-type - requestHeaders.put(Headers.CONTENT_TYPE, contentType); - } - - TypeConverter tc = message.getExchange().getContext().getTypeConverter(); - - //copy headers from Message to Request - for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { - String key = entry.getKey(); - Object value = entry.getValue(); - // use an iterator as there can be multiple values. (must not use a delimiter) - for (Object it: ObjectHelper.createIterable(value, null)) { - String headerValue = tc.convertTo(String.class, it); - if (headerValue == null) { - continue; - } - if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { - requestHeaders.add(new HttpString(key), headerValue); - } - } - } - - return body; - } - - @SuppressWarnings("unchecked") - public static void appendHeader(Map<String, Object> headers, String key, Object value) { - if (headers.containsKey(key)) { - Object existing = headers.get(key); - List<Object> list; - if (existing instanceof List) { - list = (List<Object>) existing; - } else { - list = new ArrayList<>(); - list.add(existing); - } - list.add(value); - value = list; - } - - headers.put(key, value); - } - - public static byte[] readFromChannel(StreamSourceChannel source) throws IOException { - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - final ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]); - final ReadableByteChannel blockingSource = new BlockingReadableByteChannel(source); - - for (;;) { - int res = blockingSource.read(buffer); - if (res == -1) { - return out.toByteArray(); - } else if (res == 0) { - LOG.error("Channel did not block"); - } else { - buffer.flip(); - out.write( - buffer.array(), - buffer.arrayOffset() + buffer.position(), - buffer.arrayOffset() + buffer.limit()); - buffer.clear(); - } - } - } -} diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java deleted file mode 100644 index 4adf801..0000000 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.knative.http; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Channel; -import java.util.Map; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import io.undertow.client.ClientCallback; -import io.undertow.client.ClientConnection; -import io.undertow.client.ClientExchange; -import io.undertow.client.ClientRequest; -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.http.common.HttpHelper; -import org.apache.camel.http.common.HttpOperationFailedException; -import org.apache.camel.support.ExchangeHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.xnio.ChannelExceptionHandler; -import org.xnio.ChannelListener; -import org.xnio.ChannelListeners; -import org.xnio.IoUtils; -import org.xnio.channels.StreamSinkChannel; - - -class KnativeHttpClientCallback implements ClientCallback<ClientConnection> { - private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpClientCallback.class); - - private final ByteBuffer body; - private final AsyncCallback callback; - private final BlockingDeque<Closeable> closeables; - private final KnativeHttpEndpoint endpoint; - private final Exchange exchange; - private final ClientRequest request; - - KnativeHttpClientCallback(Exchange exchange, AsyncCallback callback, KnativeHttpEndpoint endpoint, ClientRequest request, ByteBuffer body) { - this.closeables = new LinkedBlockingDeque<>(); - this.exchange = exchange; - this.callback = callback; - this.endpoint = endpoint; - this.request = request; - this.body = body; - } - - @Override - public void completed(final ClientConnection connection) { - // we have established connection, make sure we close it - deferClose(connection); - - // now we can send the request and perform the exchange: writing the - // request and reading the response - connection.sendRequest(request, on(this::performClientExchange)); - } - - @Override - public void failed(final IOException e) { - hasFailedWith(e); - } - - private ChannelListener<StreamSinkChannel> asyncWriter(final ByteBuffer body) { - return channel -> { - try { - write(channel, body); - - if (body.hasRemaining()) { - channel.resumeWrites(); - } else { - flush(channel); - } - } catch (final IOException e) { - hasFailedWith(e); - } - }; - } - - private void deferClose(final Closeable closeable) { - try { - closeables.putFirst(closeable); - } catch (final InterruptedException e) { - hasFailedWith(e); - } - } - - private void finish(final Message result) { - for (final Closeable closeable : closeables) { - IoUtils.safeClose(closeable); - } - - if (result != null) { - if (ExchangeHelper.isOutCapable(exchange)) { - exchange.setOut(result); - } else { - exchange.setIn(result); - } - } - - callback.done(false); - } - - private void hasFailedWith(final Throwable e) { - LOG.trace("Exchange has failed with", e); - if (Boolean.TRUE.equals(endpoint.getThrowExceptionOnFailure())) { - exchange.setException(e); - } - - finish(null); - } - - private <T> ClientCallback<T> on(final Consumer<T> completed) { - return on(completed, this::hasFailedWith); - } - - private <T> ClientCallback<T> on(Consumer<T> completed, Consumer<IOException> failed) { - return new ClientCallback<T>() { - @Override - public void completed(final T result) { - completed.accept(result); - } - - @Override - public void failed(final IOException e) { - failed(e); - } - }; - } - - private void performClientExchange(final ClientExchange clientExchange) { - // add response listener to the exchange, we could receive the response - // at any time (async) - setupResponseListener(clientExchange); - - // write the request - writeRequest(clientExchange, body); - } - - private void setupResponseListener(final ClientExchange clientExchange) { - clientExchange.setResponseListener(on(response -> { - try { - final KnativeHttpBinding binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy()); - final Message result = binding.toCamelMessage(clientExchange, exchange); - final int code = clientExchange.getResponse().getResponseCode(); - - if (!HttpHelper.isStatusCodeOk(code, "200-299") && endpoint.getThrowExceptionOnFailure()) { - // operation failed so populate exception to throw - final String uri = endpoint.getHttpURI().toString(); - final String statusText = clientExchange.getResponse().getStatus(); - - // Convert Message headers (Map<String, Object>) to Map<String, String> as expected by - // HttpOperationsFailedException using Message versus clientExchange as its header values - // have extra formatting - final Map<String, String> headers = result.getHeaders().entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().toString())); - - // Since result (Message) isn't associated with an Exchange yet, you can not use result.getBody(String.class) - final String bodyText = ExchangeHelper.convertToType(exchange, String.class, result.getBody()); - final Exception cause = new HttpOperationFailedException(uri, code, statusText, null, headers, bodyText); - - if (ExchangeHelper.isOutCapable(exchange)) { - exchange.setOut(result); - } else { - exchange.setIn(result); - } - - // make sure to fail with HttpOperationFailedException - hasFailedWith(cause); - } else { - // we end Camel exchange here - finish(result); - } - } catch (Throwable e) { - hasFailedWith(e); - } - })); - } - - private void writeRequest(final ClientExchange clientExchange, final ByteBuffer body) { - final StreamSinkChannel requestChannel = clientExchange.getRequestChannel(); - if (body != null) { - try { - // try writing, we could be on IO thread and ready to write to - // the socket (or not) - write(requestChannel, body); - - if (body.hasRemaining()) { - // we did not write all of body (or at all) register a write - // listener to write asynchronously - requestChannel.getWriteSetter().set(asyncWriter(body)); - requestChannel.resumeWrites(); - } else { - // we are done, we need to flush the request - flush(requestChannel); - } - } catch (final IOException e) { - hasFailedWith(e); - } - } - } - - private static void flush(final StreamSinkChannel channel) throws IOException { - // the canonical way of flushing Xnio channels - channel.shutdownWrites(); - if (!channel.flush()) { - final ChannelListener<StreamSinkChannel> safeClose = IoUtils::safeClose; - final ChannelExceptionHandler<Channel> closingChannelExceptionHandler = ChannelListeners - .closingChannelExceptionHandler(); - final ChannelListener<StreamSinkChannel> flushingChannelListener = ChannelListeners - .flushingChannelListener(safeClose, closingChannelExceptionHandler); - channel.getWriteSetter().set(flushingChannelListener); - channel.resumeWrites(); - } - } - - private static void write(final StreamSinkChannel channel, final ByteBuffer body) throws IOException { - int written = 1; - while (body.hasRemaining() && written > 0) { - written = channel.write(body); - } - } -} diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java index bea70a5..24edb27 100644 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java @@ -17,94 +17,188 @@ package org.apache.camel.component.knative.http; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.regex.Matcher; -import java.util.regex.Pattern; -import io.undertow.predicate.Predicate; -import io.undertow.server.HttpHandler; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.ext.web.client.WebClientOptions; import org.apache.camel.Endpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; import org.apache.camel.support.IntrospectionSupport; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Component("knative-http") public class KnativeHttpComponent extends DefaultComponent { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class); - private final Map<KnativeHttp.HostKey, KnativeHttpDispatcher> registry; + private final Map<KnativeHttp.ServerKey, KnativeHttpConsumerDispatcher> registry; + + @Metadata(label = "advanced") + private Vertx vertx; + @Metadata(label = "advanced") + private VertxOptions vertxOptions; @Metadata(label = "advanced") - private KnativeHttp.HostOptions hostOptions; + private HttpServerOptions vertxHttpServerOptions; + @Metadata(label = "advanced") + private WebClientOptions vertxHttpClientOptions; + + private boolean localVertx; + private ExecutorService executor; public KnativeHttpComponent() { this.registry = new ConcurrentHashMap<>(); + this.localVertx = false; } @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - final Pattern pattern = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)"); - final Matcher matcher = pattern.matcher(remaining); + protected void doInit() throws Exception { + super.doInit(); + + this.executor = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "knative-http-component"); + + if (this.vertx == null) { + Set<Vertx> instances = getCamelContext().getRegistry().findByType(Vertx.class); + if (instances.size() == 1) { + this.vertx = instances.iterator().next(); + } + } + if (this.vertx == null) { + VertxOptions options = ObjectHelper.supplyIfEmpty(this.vertxOptions, VertxOptions::new); + + this.vertx = Vertx.vertx(options); + this.localVertx = true; + } + } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + + if (this.vertx != null && this.localVertx) { + Future<?> future = this.executor.submit( + () -> { + CountDownLatch latch = new CountDownLatch(1); + + this.vertx.close(result -> { + try { + if (result.failed()) { + LOGGER.warn("Failed to close Vert.x HttpServer reason: {}", + result.cause().getMessage() + ); + + throw new RuntimeException(result.cause()); + } + + LOGGER.info("Vert.x HttpServer stopped"); + } finally { + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + ); + + try { + future.get(); + } finally { + this.vertx = null; + this.localVertx = false; + } + } + + if (this.executor != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(this.executor); + } + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + Matcher matcher = KnativeHttp.ENDPOINT_PATTERN.matcher(remaining); if (!matcher.find()) { throw new IllegalArgumentException("Bad URI: " + remaining); } - final String host; - final int port; - final String path; + KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this); + ep.setHeaderFilter(IntrospectionSupport.extractProperties(parameters, "filter.", true)); switch (matcher.groupCount()) { case 1: - host = matcher.group(1); - port = 8080; - path = "/"; + ep.setHost(matcher.group(1)); + ep.setPort(KnativeHttp.DEFAULT_PORT); + ep.setPath(KnativeHttp.DEFAULT_PATH); break; case 2: - host = matcher.group(1); - port = Integer.parseInt(matcher.group(2)); - path = "/"; + ep.setHost(matcher.group(1)); + ep.setPort(Integer.parseInt(matcher.group(2))); + ep.setPath(KnativeHttp.DEFAULT_PATH); break; case 3: - host = matcher.group(1); - port = Integer.parseInt(matcher.group(2)); - path = "/" + matcher.group(3); + ep.setHost(matcher.group(1)); + ep.setPort(Integer.parseInt(matcher.group(2))); + ep.setPath(KnativeHttp.DEFAULT_PATH + matcher.group(3)); break; default: throw new IllegalArgumentException("Bad URI: " + remaining); } - KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this); - ep.setHost(host); - ep.setPort(port); - ep.setPath(path); - ep.setHeaderFilter(IntrospectionSupport.extractProperties(parameters, "filter.", true)); - setProperties(ep, parameters); return ep; } - public KnativeHttp.HostOptions getHostOptions() { - return hostOptions; + public Vertx getVertx() { + return vertx; } - public void setHostOptions(KnativeHttp.HostOptions hostOptions) { - this.hostOptions = hostOptions; + public void setVertx(Vertx vertx) { + this.vertx = vertx; } - public void bind(KnativeHttp.HostKey key, HttpHandler handler, Predicate predicate) { - getUndertow(key).bind(handler, predicate); + public VertxOptions getVertxOptions() { + return vertxOptions; } - public void unbind(KnativeHttp.HostKey key, HttpHandler handler) { - getUndertow(key).unbind(handler); + public void setVertxOptions(VertxOptions vertxOptions) { + this.vertxOptions = vertxOptions; + } + + public HttpServerOptions getVertxHttpServerOptions() { + return vertxHttpServerOptions; + } + + public void setVertxHttpServerOptions(HttpServerOptions vertxHttpServerOptions) { + this.vertxHttpServerOptions = vertxHttpServerOptions; + } + + public WebClientOptions getVertxHttpClientOptions() { + return vertxHttpClientOptions; + } + + public void setVertxHttpClientOptions(WebClientOptions vertxHttpClientOptions) { + this.vertxHttpClientOptions = vertxHttpClientOptions; + } + KnativeHttpConsumerDispatcher getDispatcher(KnativeHttp.ServerKey key) { + return registry.computeIfAbsent(key, k -> new KnativeHttpConsumerDispatcher(executor, vertx, k, vertxHttpServerOptions)); } - private KnativeHttpDispatcher getUndertow(KnativeHttp.HostKey key) { - return registry.computeIfAbsent(key, k -> new KnativeHttpDispatcher(k, hostOptions)); + ExecutorService getExecutorService() { + return this.executor; } } diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index 26f057b..a9bbedf 100644 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -16,31 +16,57 @@ */ package org.apache.camel.component.knative.http; -import java.nio.ByteBuffer; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.Predicate; -import io.undertow.predicate.Predicates; -import io.undertow.server.HttpHandler; -import io.undertow.server.HttpServerExchange; -import io.undertow.util.HeaderMap; -import io.undertow.util.HttpString; -import io.undertow.util.MimeMappings; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; +import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.DefaultMessage; +import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.MessageHelper; import org.apache.camel.util.ObjectHelper; -public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler { - private final KnativeHttpBinding binding; +public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.PredicatedHandler { + private final Predicate<HttpServerRequest> filter; public KnativeHttpConsumer(KnativeHttpEndpoint endpoint, Processor processor) { super(endpoint, processor); - this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy()); + filter = v -> { + if (!Objects.equals(endpoint.getPath(), v.path())) { + return false; + } + if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) { + return true; + } + + for (Map.Entry<String, Object> entry : endpoint.getHeaderFilter().entrySet()) { + String ref = entry.getValue().toString(); + String val = v.getHeader(entry.getKey()); + boolean matches = Objects.equals(ref, val) || val.matches(ref); + + if (!matches) { + return false; + } + } + + return true; + }; } @Override @@ -52,30 +78,9 @@ public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler protected void doStart() throws Exception { final KnativeHttpEndpoint endpoint = getEndpoint(); final KnativeHttpComponent component = endpoint.getComponent(); - final KnativeHttp.HostKey key = endpoint.getHostKey(); - - component.bind(key, this, Predicates.and( - Predicates.path(endpoint.getPath()), - value -> { - if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) { - return true; - } - - HeaderMap hm = value.getRequestHeaders(); + final KnativeHttp.ServerKey key = endpoint.getServerKey(); - for (Map.Entry<String, Object> entry: endpoint.getHeaderFilter().entrySet()) { - String ref = entry.getValue().toString(); - String val = hm.getFirst(entry.getKey()); - boolean matches = Objects.equals(ref, val) || val.matches(ref); - - if (!matches) { - return false; - } - } - - return true; - } - )); + component.getDispatcher(key).bind(this); super.doStart(); } @@ -84,49 +89,152 @@ public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler protected void doStop() throws Exception { final KnativeHttpEndpoint endpoint = getEndpoint(); final KnativeHttpComponent component = endpoint.getComponent(); - final KnativeHttp.HostKey key = endpoint.getHostKey(); + final KnativeHttp.ServerKey key = endpoint.getServerKey(); - component.unbind(key, this); + component.getDispatcher(key).unbind(this); super.doStop(); } @Override - public void handleRequest(HttpServerExchange httpExchange) throws Exception { - //create new Exchange - //binding is used to extract header and payload(if available) - Exchange camelExchange = createExchange(httpExchange); - - //Unit of Work to process the Exchange - createUoW(camelExchange); - try { - getProcessor().process(camelExchange); - } catch (Exception e) { - getExceptionHandler().handleException(e); - } finally { - doneUoW(camelExchange); + public boolean canHandle(HttpServerRequest request) { + return filter.test(request); + } + + @Override + public void handle(HttpServerRequest request) { + if (request.method() == HttpMethod.POST) { + final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut); + final Message in = toMessage(request, exchange); + + request.bodyHandler(buffer -> { + in.setBody(buffer.getBytes()); + + exchange.setIn(in); + + try { + createUoW(exchange); + getAsyncProcessor().process(exchange, doneSync -> { + try { + HttpServerResponse response = toHttpResponse(request, exchange.getMessage()); + Buffer body = computeResponseBody(exchange.getMessage()); + + // set the content type in the response. + String contentType = MessageHelper.getContentType(exchange.getMessage()); + if (contentType != null) { + // set content-type + response.putHeader(Exchange.CONTENT_TYPE, contentType); + } + + if (body == null) { + request.response().setStatusCode(204); + request.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); + request.response().end("No response available"); + } else { + request.response().end(body); + } + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + }); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } finally { + doneUoW(exchange); + } + }); + } else { + request.response().setStatusCode(405); + request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain"); + request.response().end("Unsupported method"); + + throw new IllegalArgumentException("Unsupported method: " + request.method()); + } + } + + private Message toMessage(HttpServerRequest request, Exchange exchange) { + KnativeHttpEndpoint endpoint = getEndpoint(); + Message message = new DefaultMessage(exchange.getContext()); + String path = request.path(); + + if (endpoint.getPath() != null) { + String endpointPath = endpoint.getPath(); + String matchPath = path.toLowerCase(Locale.US); + String match = endpointPath.toLowerCase(Locale.US); + + if (matchPath.startsWith(match)) { + path = path.substring(endpointPath.length()); + } + } + + for (Map.Entry<String, String> entry : request.headers().entries()) { + if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) { + KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue()); + } + } + for (Map.Entry<String, String> entry : request.params().entries()) { + if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) { + KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue()); + } } - Object body = binding.toHttpResponse(httpExchange, camelExchange.getMessage()); - TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter(); + message.setHeader(Exchange.HTTP_PATH, path); + message.setHeader(Exchange.HTTP_METHOD, request.method()); + message.setHeader(Exchange.HTTP_URI, request.uri()); + message.setHeader(Exchange.HTTP_QUERY, request.query()); - if (body == null) { - httpExchange.getResponseHeaders().put(new HttpString(Exchange.CONTENT_TYPE), MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt")); - httpExchange.getResponseSender().send("No response available"); - } else { - ByteBuffer bodyAsByteBuffer = tc.mandatoryConvertTo(ByteBuffer.class, body); - httpExchange.getResponseSender().send(bodyAsByteBuffer); + return message; + } + + private HttpServerResponse toHttpResponse(HttpServerRequest request, Message message) { + final HttpServerResponse response = request.response(); + final boolean failed = message.getExchange().isFailed(); + final int defaultCode = failed ? 500 : 200; + final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class); + final TypeConverter tc = message.getExchange().getContext().getTypeConverter(); + + response.setStatusCode(code); + + for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { + final String key = entry.getKey(); + final Object value = entry.getValue(); + + for (Object it: org.apache.camel.support.ObjectHelper.createIterable(value, null)) { + String headerValue = tc.convertTo(String.class, it); + if (headerValue == null) { + continue; + } + if (!getEndpoint().getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { + response.putHeader(key, headerValue); + } + } } + + return response; } - public Exchange createExchange(HttpServerExchange httpExchange) throws Exception { - Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut); - Message in = binding.toCamelMessage(httpExchange, exchange); + private Buffer computeResponseBody(Message message) throws NoTypeConversionAvailableException { + Object body = message.getBody(); + Exception exception = message.getExchange().getException(); + + if (exception != null) { + // we failed due an exception so print it as plain text + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + exception.printStackTrace(pw); - exchange.setProperty(Exchange.CHARSET_NAME, httpExchange.getRequestCharset()); - in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, httpExchange.getRequestCharset()); + // the body should then be the stacktrace + body = sw.toString().getBytes(StandardCharsets.UTF_8); + // force content type to be text/plain as that is what the stacktrace is + message.setHeader(Exchange.CONTENT_TYPE, "text/plain"); + + // and mark the exception as failure handled, as we handled it by returning + // it as the response + ExchangeHelper.setFailureHandled(message.getExchange()); + } - exchange.setIn(in); - return exchange; + return Buffer.buffer( + message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body) + ); } } diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java new file mode 100644 index 0000000..446dc6b --- /dev/null +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.http; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import org.apache.camel.Exchange; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ReferenceCount; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class KnativeHttpConsumerDispatcher { + private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumerDispatcher.class); + + private final Vertx vertx; + private final KnativeHttp.ServerKey key; + private final ReferenceCount refCnt; + private final Set<KnativeHttp.PredicatedHandler> handlers; + private final HttpServerWrapper server; + private final HttpServerOptions serverOptions; + private final ExecutorService executor; + + public KnativeHttpConsumerDispatcher(ExecutorService executor, Vertx vertx, KnativeHttp.ServerKey key, HttpServerOptions serverOptions) { + this.executor = executor; + this.vertx = vertx; + this.serverOptions = ObjectHelper.supplyIfEmpty(serverOptions, HttpServerOptions::new); + this.server = new HttpServerWrapper(); + + this.handlers = new CopyOnWriteArraySet<>(); + this.key = key; + this.refCnt = ReferenceCount.on(server::start, server::stop); + } + + public void bind(KnativeHttp.PredicatedHandler handler) { + if (handlers.add(handler)) { + refCnt.retain(); + } + } + + public void unbind(KnativeHttp.PredicatedHandler handler) { + if (handlers.remove(handler)) { + refCnt.release(); + } + } + + private final class HttpServerWrapper extends ServiceSupport implements Handler<HttpServerRequest> { + private HttpServer server; + + @Override + protected void doStart() throws Exception { + LOGGER.info("Starting Vert.x HttpServer on {}:{}}", + key.getHost(), + key.getPort() + ); + + startAsync().toCompletableFuture().join(); + } + + @Override + protected void doStop() throws Exception { + LOGGER.info("Stopping Vert.x HttpServer on {}:{}", + key.getHost(), + key.getPort()); + + try { + if (server != null) { + stopAsync().toCompletableFuture().join(); + } + } finally { + this.server = null; + } + } + + private CompletionStage<Void> startAsync() { + server = vertx.createHttpServer(serverOptions); + server.requestHandler(this); + + return CompletableFuture.runAsync( + () -> { + CountDownLatch latch = new CountDownLatch(1); + + server.listen(key.getPort(), key.getHost(), result -> { + try { + if (result.failed()) { + LOGGER.warn("Failed to start Vert.x HttpServer on {}:{}, reason: {}", + key.getHost(), + key.getPort(), + result.cause().getMessage() + ); + + throw new RuntimeException(result.cause()); + } + + LOGGER.info("Vert.x HttpServer started on {}:{}", key.getPort(), key.getHost()); + } finally { + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + executor + ); + } + + protected CompletionStage<Void> stopAsync() { + return CompletableFuture.runAsync( + () -> { + CountDownLatch latch = new CountDownLatch(1); + + server.close(result -> { + try { + if (result.failed()) { + LOGGER.warn("Failed to close Vert.x HttpServer reason: {}", + result.cause().getMessage() + ); + + throw new RuntimeException(result.cause()); + } + + LOGGER.info("Vert.x HttpServer stopped"); + } finally { + latch.countDown(); + } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + executor + ); + } + + @Override + public void handle(HttpServerRequest request) { + LOGGER.debug("received exchange on path: {}, headers: {}", + request.path(), + request.headers() + ); + + for (KnativeHttp.PredicatedHandler handler: handlers) { + if (handler.canHandle(request)) { + handler.handle(request); + return; + } + } + + LOGGER.warn("No handler found for path: {}, headers: {}", + request.path(), + request.headers() + ); + + HttpServerResponse response = request.response(); + response.setStatusCode(404); + response.putHeader(Exchange.CONTENT_TYPE, "text/plain"); + response.end("No matching condition found"); + } + } +} diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java deleted file mode 100644 index 2d774cd..0000000 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.knative.http; - -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -import io.undertow.Undertow; -import io.undertow.UndertowOptions; -import io.undertow.predicate.Predicate; -import io.undertow.server.HttpHandler; -import io.undertow.server.HttpServerExchange; -import io.undertow.server.handlers.PathHandler; -import io.undertow.util.Headers; -import io.undertow.util.StatusCodes; -import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ReferenceCount; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class KnativeHttpDispatcher implements HttpHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpDispatcher.class); - - private final KnativeHttp.HostKey key; - private final KnativeHttp.HostOptions options; - private final ReferenceCount refCnt; - private final Set<PredicatedHandlerWrapper> handlers; - private final Undertow undertow; - private final PathHandler handler; - - public KnativeHttpDispatcher(KnativeHttp.HostKey key, KnativeHttp.HostOptions option) { - this.handlers = new CopyOnWriteArraySet<>(); - this.key = key; - this.options = option; - this.handler = new PathHandler(this); - this.undertow = createUndertow(); - this.refCnt = ReferenceCount.on(this::startUndertow, this::stopUndertow); - } - - @Override - public void handleRequest(HttpServerExchange exchange) throws Exception { - LOGGER.debug("received exchange on path: {}, headers: {}", - exchange.getRelativePath(), - exchange.getRequestHeaders() - ); - - for (PredicatedHandlerWrapper handler: handlers) { - if (handler.dispatch(exchange)) { - return; - } - } - - exchange.setStatusCode(StatusCodes.NOT_FOUND); - exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain"); - exchange.getResponseSender().send("No matching condition found"); - } - - public void bind(HttpHandler handler, Predicate predicate) { - if (handlers.add(new PredicatedHandlerWrapper(handler, predicate))) { - refCnt.retain(); - } - } - - public void unbind(HttpHandler handler) { - if (handlers.removeIf(phw -> phw.handler == handler)) { - refCnt.release(); - } - } - - private void startUndertow() { - try { - LOGGER.info("Starting Undertow server on {}://{}:{}}", - key.getSslContext() != null ? "https" : "http", - key.getHost(), - key.getPort() - ); - - undertow.start(); - } catch (RuntimeException e) { - LOGGER.warn("Failed to start Undertow server on {}://{}:{}, reason: {}", - key.getSslContext() != null ? "https" : "http", - key.getHost(), - key.getPort(), - e.getMessage() - ); - - undertow.stop(); - - throw e; - } - } - - private void stopUndertow() { - LOGGER.info("Stopping Undertow server on {}://{}:{}", - key.getSslContext() != null ? "https" : "http", - key.getHost(), - key.getPort()); - - undertow.stop(); - } - - private Undertow createUndertow() { - Undertow.Builder builder = Undertow.builder(); - if (key.getSslContext() != null) { - builder.addHttpsListener(key.getPort(), key.getHost(), key.getSslContext()); - } else { - builder.addHttpListener(key.getPort(), key.getHost()); - } - - if (options != null) { - ObjectHelper.ifNotEmpty(options.getIoThreads(), builder::setIoThreads); - ObjectHelper.ifNotEmpty(options.getWorkerThreads(), builder::setWorkerThreads); - ObjectHelper.ifNotEmpty(options.getBufferSize(), builder::setBufferSize); - ObjectHelper.ifNotEmpty(options.getDirectBuffers(), builder::setDirectBuffers); - ObjectHelper.ifNotEmpty(options.getHttp2Enabled(), e -> builder.setServerOption(UndertowOptions.ENABLE_HTTP2, e)); - } - - return builder.setHandler(new PathHandler(handler)).build(); - } - - private static final class PredicatedHandlerWrapper { - private final HttpHandler handler; - private final Predicate predicate; - - public PredicatedHandlerWrapper(HttpHandler handler, Predicate predicate) { - this.handler = ObjectHelper.notNull(handler, "handler"); - this.predicate = ObjectHelper.notNull(predicate, "predicate"); - } - - boolean dispatch(HttpServerExchange exchange) throws Exception { - if (predicate.resolve(exchange)) { - if (exchange.isInIoThread()) { - exchange.dispatch(handler); - } else { - handler.handleRequest(exchange); - } - - return true; - } - - LOGGER.debug("No handler for path: {}, headers: {}", - exchange.getRelativePath(), - exchange.getRequestHeaders() - ); - - return false; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PredicatedHandlerWrapper holder = (PredicatedHandlerWrapper) o; - return handler.equals(holder.handler); - } - - @Override - public int hashCode() { - return Objects.hash(handler); - } - } -} diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java index e02f736..e9d8c2b 100644 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.knative.http; -import java.net.URI; import java.util.Map; import org.apache.camel.Consumer; @@ -27,10 +26,9 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; +import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.DefaultEndpoint; import org.apache.camel.support.jsse.SSLContextParameters; -import org.apache.camel.util.ObjectHelper; -import org.xnio.OptionMap; @UriEndpoint( firstVersion = "3.0.0", @@ -65,12 +63,6 @@ public class KnativeHttpEndpoint extends DefaultEndpoint { this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); } - // ********************************** - // - // Properties - // - // ********************************** - public String getHost() { return host; } @@ -93,6 +85,10 @@ public class KnativeHttpEndpoint extends DefaultEndpoint { public void setPath(String path) { this.path = path; + + if (!this.path.startsWith("/")) { + this.path = "/" + path; + } } public HeaderFilterStrategy getHeaderFilterStrategy() { @@ -127,26 +123,10 @@ public class KnativeHttpEndpoint extends DefaultEndpoint { this.throwExceptionOnFailure = throwExceptionOnFailure; } - public KnativeHttp.HostKey getHostKey() { - return new KnativeHttp.HostKey(host, port, null); + public KnativeHttp.ServerKey getServerKey() { + return new KnativeHttp.ServerKey(host, port); } - public URI getHttpURI() { - String uri = "http://" + host + ":" + port; - - if (ObjectHelper.isNotEmpty(path)) { - uri += path; - } - - return URI.create(uri); - } - - // ********************************** - // - // Impl - // - // ********************************** - @Override public KnativeHttpComponent getComponent() { return (KnativeHttpComponent)super.getComponent(); @@ -154,11 +134,11 @@ public class KnativeHttpEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - return new KnativeHttpProducer(this, OptionMap.EMPTY); + return new KnativeHttpProducer(this, getComponent().getVertx(), getComponent().getVertxHttpClientOptions()); } @Override public Consumer createConsumer(Processor processor) throws Exception { - return new KnativeHttpConsumer(this, processor); + return new KnativeHttpConsumer(this, AsyncProcessorConverterHelper.convert(processor)); } } diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java index 96051c9..db3d165 100644 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java @@ -19,12 +19,11 @@ package org.apache.camel.component.knative.http; import org.apache.camel.support.DefaultHeaderFilterStrategy; public class KnativeHttpHeaderFilterStrategy extends DefaultHeaderFilterStrategy { - public KnativeHttpHeaderFilterStrategy() { initialize(); } - protected void initialize() { + protected final void initialize() { getOutFilter().add("content-length"); getOutFilter().add("content-type"); getOutFilter().add("host"); diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java index 7d79ccd..c4aafed 100644 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java @@ -16,44 +16,40 @@ */ package org.apache.camel.component.knative.http; -import java.net.URI; -import java.nio.ByteBuffer; - -import io.undertow.client.ClientRequest; -import io.undertow.client.UndertowClient; -import io.undertow.protocols.ssl.UndertowXnioSsl; -import io.undertow.server.DefaultByteBufferPool; -import io.undertow.util.HeaderMap; -import io.undertow.util.Headers; -import io.undertow.util.Methods; +import java.util.Map; + +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.TypeConverter; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Message; +import org.apache.camel.http.common.HttpOperationFailedException; import org.apache.camel.support.DefaultAsyncProducer; -import org.apache.camel.support.jsse.SSLContextParameters; -import org.apache.camel.util.URISupport; +import org.apache.camel.support.DefaultMessage; +import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.MessageHelper; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.xnio.OptionMap; -import org.xnio.Xnio; -import org.xnio.XnioWorker; -import org.xnio.ssl.XnioSsl; public class KnativeHttpProducer extends DefaultAsyncProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class); - private final OptionMap options; - private final KnativeHttpBinding binding; + private final Vertx vertx; + private final WebClientOptions clientOptions; + private WebClient client; - private UndertowClient client; - private DefaultByteBufferPool pool; - private XnioSsl ssl; - private XnioWorker worker; - - public KnativeHttpProducer(KnativeHttpEndpoint endpoint, OptionMap options) { + public KnativeHttpProducer(KnativeHttpEndpoint endpoint, Vertx vertx, WebClientOptions clientOptions) { super(endpoint); - this.options = options; - this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy()); + + this.vertx = ObjectHelper.notNull(vertx, "vertx"); + this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new); } @Override @@ -62,70 +58,98 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { } @Override - public boolean process(final Exchange camelExchange, final AsyncCallback callback) { - final KnativeHttpEndpoint endpoint = getEndpoint(); - final URI uri = endpoint.getHttpURI(); - final String pathAndQuery = URISupport.pathAndQueryOf(uri); - - final ClientRequest request = new ClientRequest(); - request.setMethod(Methods.POST); - request.setPath(pathAndQuery); - request.getRequestHeaders().put(Headers.HOST, uri.getHost()); - - final Object body = binding.toHttpRequest(request, camelExchange.getIn()); - final TypeConverter tc = endpoint.getCamelContext().getTypeConverter(); - final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body); - - // As tryConvertTo is used to convert the body, we should do null check - // or the call bodyAsByte.remaining() may throw an NPE - if (body != null && bodyAsByte != null) { - request.getRequestHeaders().put(Headers.CONTENT_LENGTH, bodyAsByte.remaining()); - } + public boolean process(Exchange exchange, AsyncCallback callback) { + final byte[] payload; - // when connect succeeds or fails UndertowClientCallback will - // get notified on a I/O thread run by Xnio worker. The writing - // of request and reading of response is performed also in the - // callback - client.connect( - new KnativeHttpClientCallback(camelExchange, callback, getEndpoint(), request, bodyAsByte), - uri, - worker, - ssl, - pool, - options); - - // the call above will proceed on Xnio I/O thread we will - // notify the exchange asynchronously when the HTTP exchange - // ends with success or failure from UndertowClientCallback - return false; - } + try { + payload = exchange.getMessage().getMandatoryBody(byte[].class); + } catch (InvalidPayloadException e) { + exchange.setException(e); + callback.done(true); - @Override - protected void doStart() throws Exception { - super.doStart(); + return true; + } - final Xnio xnio = Xnio.getInstance(); + KnativeHttpEndpoint endpoint = getEndpoint(); + Message message = exchange.getMessage(); - pool = new DefaultByteBufferPool(true, 17 * 1024); - worker = xnio.createWorker(options); + MultiMap headers = MultiMap.caseInsensitiveMultiMap(); + headers.add(HttpHeaders.HOST, endpoint.getHost()); + headers.add(HttpHeaders.CONTENT_LENGTH, Integer.toString(payload.length)); - SSLContextParameters sslContext = getEndpoint().getSslContextParameters(); - if (sslContext != null) { - ssl = new UndertowXnioSsl(xnio, options, sslContext.createSSLContext(getEndpoint().getCamelContext())); + String contentType = MessageHelper.getContentType(message); + if (contentType != null) { + headers.add(HttpHeaders.CONTENT_TYPE, contentType); } - client = UndertowClient.getInstance(); + for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { + if (!endpoint.getHeaderFilterStrategy().applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) { + headers.add(entry.getKey(), entry.getValue().toString()); + } + } + + client.post(endpoint.getPort(), endpoint.getHost(), endpoint.getPath()) + .putHeaders(headers) + .sendBuffer(Buffer.buffer(payload), response -> { + HttpResponse<Buffer> result = response.result(); + + Message answer = new DefaultMessage(exchange.getContext()); + answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode()); + + for (Map.Entry<String, String> entry : result.headers().entries()) { + if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) { + KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue()); + } + } + + exchange.setMessage(answer); + + if (response.failed() && endpoint.getThrowExceptionOnFailure()) { + Exception cause = new HttpOperationFailedException( + getURI(), + result.statusCode(), + result.statusMessage(), + null, + KnativeHttpSupport.asStringMap(answer.getHeaders()), + ExchangeHelper.convertToType(exchange, String.class, answer.getBody()) + ); - LOGGER.debug("Created worker: {} with options: {}", worker, options); + exchange.setException(cause); + } + + callback.done(false); + }); + + return false; + } + + @Override + protected void doInit() throws Exception { + super.doInit(); + + this.client = WebClient.create(vertx, clientOptions); } @Override protected void doStop() throws Exception { super.doStop(); - if (worker != null && !worker.isShutdown()) { - LOGGER.debug("Shutting down worker: {}", worker); - worker.shutdown(); + if (this.client != null) { + LOGGER.debug("Shutting down client: {}", client); + this.client.close(); + this.client = null; + } + } + + private String getURI() { + String p = getEndpoint().getPath(); + + if (p == null) { + p = KnativeHttp.DEFAULT_PATH; + } else if (!p.startsWith("/")) { + p = "/" + p; } + + return String.format("http://%s:%d%s", getEndpoint().getHost(), getEndpoint().getPort(), p); } } diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java new file mode 100644 index 0000000..a858f75 --- /dev/null +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.http; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class KnativeHttpSupport { + private KnativeHttpSupport() { + } + + @SuppressWarnings("unchecked") + public static void appendHeader(Map<String, Object> headers, String key, Object value) { + if (headers.containsKey(key)) { + Object existing = headers.get(key); + List<Object> list; + if (existing instanceof List) { + list = (List<Object>) existing; + } else { + list = new ArrayList<>(); + list.add(existing); + } + list.add(value); + value = list; + } + + headers.put(key, value); + } + + public static Map<String, String> asStringMap(Map<String, Object> map) { + return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); + } +} diff --git a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index e0d1ab6..0b40284 100644 --- a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -17,7 +17,6 @@ package org.apache.camel.component.knative.http; import org.apache.camel.CamelContext; -import org.apache.camel.CamelExecutionException; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -29,7 +28,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; public class KnativeHttpTest { @@ -51,7 +49,7 @@ public class KnativeHttpTest { } @AfterEach - public void after() throws Exception { + public void after() { if (this.context != null) { this.context.stop(); } @@ -65,150 +63,164 @@ public class KnativeHttpTest { @Test void testWithPaths() throws Exception { - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - fromF("knative-http:0.0.0.0:%d/a/1", port) - .routeId("r1") - .setBody().simple("${routeId}") - .convertBodyTo(String.class) - .to("mock:r1"); - fromF("knative-http:0.0.0.0:%d/a/2", port) - .routeId("r2") - .setBody().simple("${routeId}") - .convertBodyTo(String.class) - .to("mock:r2"); - from("direct:start") - .toD("undertow:http://localhost:" + port + "/a/${body}"); - } - } - ); - - MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class); - m1.expectedMessageCount(1); - m1.expectedBodiesReceived("r1"); - - MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class); - m2.expectedMessageCount(1); - m2.expectedBodiesReceived("r2"); - + RouteBuilder.addRoutes(context, b -> { + b.fromF("knative-http:0.0.0.0:%d/a/1", port) + .routeId("r1") + .setBody().simple("${routeId}") + .to("mock:r1"); + b.fromF("knative-http:0.0.0.0:%d/a/2", port) + .routeId("r2") + .setBody().simple("${routeId}") + .to("mock:r2"); + + b.from("direct:start") + .toD("undertow:http://localhost:" + port + "/a/${body}"); + }); + + context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1); + context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1); context.start(); - assertThat( - template.requestBody("direct:start", "1", String.class) - ).isEqualTo("r1"); - assertThat( - template.requestBody("direct:start", "2", String.class) - ).isEqualTo("r2"); + assertThat(template.requestBody("direct:start", "1", String.class)).isEqualTo("r1"); + assertThat(template.requestBody("direct:start", "2", String.class)).isEqualTo("r2"); - m1.assertIsSatisfied(); - m2.assertIsSatisfied(); + MockEndpoint.assertIsSatisfied(context); } @Test void testWithFilters() throws Exception { - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port) - .routeId("r1") - .setBody().simple("${routeId}") - .convertBodyTo(String.class) - .to("mock:r1"); - fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port) - .routeId("r2") - .setBody().simple("${routeId}") - .convertBodyTo(String.class) - .to("mock:r2"); - fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port) - .routeId("r3") - .setBody().simple("${routeId}") - .convertBodyTo(String.class) - .to("mock:r3"); - from("direct:start") - .setHeader("MyHeader").body() - .toF("undertow:http://localhost:%d", port); - } - } - ); - - MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class); - m1.expectedMessageCount(1); - m1.expectedBodiesReceived("r1"); - - MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class); - m2.expectedMessageCount(1); - m2.expectedBodiesReceived("r2"); - + RouteBuilder.addRoutes(context, b -> { + b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port) + .routeId("r1") + .setBody().simple("${routeId}") + .to("mock:r1"); + b.fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port) + .routeId("r2") + .setBody().simple("${routeId}") + .to("mock:r2"); + b.fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port) + .routeId("r3") + .setBody().simple("${routeId}") + .to("mock:r3"); + + b.from("direct:start") + .setHeader("MyHeader").body() + .toF("undertow:http://localhost:%d", port); + }); + + context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1); + context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1); context.start(); - assertThat( - template.requestBody("direct:start", "h1", String.class) - ).isEqualTo("r1"); - assertThat( - template.requestBody("direct:start", "h2", String.class) - ).isEqualTo("r2"); - assertThat( - template.requestBody("direct:start", "t1", String.class) - ).isEqualTo("r3"); - assertThat( - template.requestBody("direct:start", "t2", String.class) - ).isEqualTo("r3"); - - m1.assertIsSatisfied(); - m2.assertIsSatisfied(); + assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1"); + assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2"); + assertThat(template.requestBody("direct:start", "t1", String.class)).isEqualTo("r3"); + assertThat(template.requestBody("direct:start", "t2", String.class)).isEqualTo("r3"); + + MockEndpoint.assertIsSatisfied(context); } @Test void testWithRexFilters() throws Exception { - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port) - .routeId("r1") - .setBody().simple("${routeId}") - .convertBodyTo(String.class); - from("direct:start") - .setHeader("MyHeader").body() - .toF("undertow:http://localhost:%d", port); - } - } - ); + RouteBuilder.addRoutes(context, b -> { + b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port) + .routeId("r1") + .setBody().simple("${routeId}"); + + b.from("direct:start") + .setHeader("MyHeader").body() + .toF("undertow:http://localhost:%d", port); + }); + + context.start(); + + assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1"); + assertThat(template.request("direct:start", e -> e.getMessage().setBody("t1"))).satisfies(e -> { + assertThat(e.isFailed()).isTrue(); + assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class); + }); + } + + @Test + void testRemoveConsumer() throws Exception { + RouteBuilder.addRoutes(context, b -> { + b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port) + .routeId("r1") + .setBody().simple("${routeId}"); + b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port) + .routeId("r2") + .setBody().simple("${routeId}"); + }); + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .setHeader("h").body() + .toF("undertow:http://localhost:%d", port); + }); context.start(); - assertThat( - template.requestBody("direct:start", "h1", String.class) - ).isEqualTo("r1"); - assertThatThrownBy( - () -> template.requestBody("direct:start", "t1", String.class) - ).isInstanceOf(CamelExecutionException.class).hasCauseExactlyInstanceOf(HttpOperationFailedException.class); + assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1"); + assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2"); + + context.getRouteController().stopRoute("r2"); + + assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> { + assertThat(e.isFailed()).isTrue(); + assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class); + }); + } + + @Test + void testAddConsumer() throws Exception { + RouteBuilder.addRoutes(context, b -> { + b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port) + .routeId("r1") + .setBody().simple("${routeId}"); + }); + RouteBuilder.addRoutes(context, b -> { + b.from("direct:start") + .setHeader("h").body() + .toF("undertow:http://localhost:%d", port); + }); + + context.start(); + + assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1"); + assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> { + assertThat(e.isFailed()).isTrue(); + assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class); + }); + + RouteBuilder.addRoutes(context, b -> { + b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port) + .routeId("r2") + .setBody().simple("${routeId}"); + }); + + assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1"); + assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2"); } @Test void testInvokeEndpoint() throws Exception { - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - fromF("undertow:http://0.0.0.0:%d", port) - .routeId("endpoint") - .setBody().simple("${routeId}") - .convertBodyTo(String.class) - .to("mock:endpoint"); - from("direct:start") - .toF("knative-http:0.0.0.0:%d", port); - } - } - ); + RouteBuilder.addRoutes(context, b -> { + b.fromF("undertow:http://0.0.0.0:%d", port) + .routeId("endpoint") + .setBody().simple("${routeId}") + .to("mock:endpoint"); + + b.from("direct:start") + .toF("knative-http:0.0.0.0:%d", port); + }); MockEndpoint mock = context.getEndpoint("mock:endpoint", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedBodiesReceived("endpoint"); mock.expectedHeaderReceived("Host", "0.0.0.0"); + mock.expectedMessageCount(1); context.start(); - template.requestBody("direct:start", "1", String.class); + template.sendBody("direct:start", "1"); mock.assertIsSatisfied(); } diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java index 1376c92..cb17060 100644 --- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java +++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java @@ -96,7 +96,6 @@ public class CloudEventsV01Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.custom-event"); mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint"); @@ -104,6 +103,7 @@ public class CloudEventsV01Test { mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -166,7 +166,6 @@ public class CloudEventsV01Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint"); @@ -174,9 +173,9 @@ public class CloudEventsV01Test { mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessageCount(1); mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); mock2.expectedHeaderReceived("CE-EventType", "my.type"); mock2.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type"); @@ -184,6 +183,7 @@ public class CloudEventsV01Test { mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID")); mock2.expectedBodiesReceived("test2"); + mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -238,7 +238,6 @@ public class CloudEventsV01Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock.expectedHeaderReceived("CE-EventID", "myEventID"); @@ -246,6 +245,7 @@ public class CloudEventsV01Test { mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -301,7 +301,6 @@ public class CloudEventsV01Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock.expectedHeaderReceived("CE-EventID", "myEventID"); @@ -309,6 +308,7 @@ public class CloudEventsV01Test { mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -387,22 +387,22 @@ public class CloudEventsV01Test { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessageCount(1); mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock1.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock1.expectedHeaderReceived("CE-EventID", "myEventID1"); mock1.expectedHeaderReceived("CE-Source", "CE1"); mock1.expectedBodiesReceived("test"); + mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessageCount(1); mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock2.expectedHeaderReceived("CE-EventID", "myEventID2"); mock2.expectedHeaderReceived("CE-Source", "CE2"); mock2.expectedBodiesReceived("test"); + mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java index 75bfc0f..f0d9607 100644 --- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java +++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java @@ -96,7 +96,6 @@ public class CloudEventsV02Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); mock.expectedHeaderReceived("ce-type", "org.apache.camel.custom-event"); mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint"); @@ -104,6 +103,7 @@ public class CloudEventsV02Test { mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -166,7 +166,6 @@ public class CloudEventsV02Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint"); @@ -174,9 +173,9 @@ public class CloudEventsV02Test { mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessageCount(1); mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); mock2.expectedHeaderReceived("ce-type", "my.type"); mock2.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type"); @@ -184,6 +183,7 @@ public class CloudEventsV02Test { mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id")); mock2.expectedBodiesReceived("test2"); + mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -238,7 +238,6 @@ public class CloudEventsV02Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); mock.expectedHeaderReceived("ce-id", "myEventID"); @@ -246,6 +245,7 @@ public class CloudEventsV02Test { mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -301,7 +301,6 @@ public class CloudEventsV02Test { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); mock.expectedHeaderReceived("ce-id", "myEventID"); @@ -309,6 +308,7 @@ public class CloudEventsV02Test { mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -387,22 +387,22 @@ public class CloudEventsV02Test { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessageCount(1); mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); mock1.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); mock1.expectedHeaderReceived("ce-type", "org.apache.camel.event"); mock1.expectedHeaderReceived("ce-id", "myEventID1"); mock1.expectedHeaderReceived("ce-source", "CE1"); mock1.expectedBodiesReceived("test"); + mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessageCount(1); mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); mock2.expectedHeaderReceived("ce-type", "org.apache.camel.event"); mock2.expectedHeaderReceived("ce-id", "myEventID2"); mock2.expectedHeaderReceived("ce-source", "CE2"); mock2.expectedBodiesReceived("test"); + mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java index da81f6a..996d43e 100644 --- a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java +++ b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java @@ -31,7 +31,6 @@ import org.apache.camel.component.knative.http.KnativeHttpEndpoint; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.properties.PropertiesComponent; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.support.DefaultHeaderFilterStrategy; import org.apache.camel.test.AvailablePortFinder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -348,7 +347,6 @@ public class KnativeComponentTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1"); mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint"); @@ -356,13 +354,9 @@ public class KnativeComponentTest { mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); - context.createProducerTemplate().send( - "direct:source", - e -> { - e.getIn().setBody("test"); - } - ); + context.createProducerTemplate().sendBody("direct:source", "test"); mock.assertIsSatisfied(); } @@ -403,7 +397,6 @@ public class KnativeComponentTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1"); mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock.expectedHeaderReceived("CE-EventID", "myEventID"); @@ -411,6 +404,7 @@ public class KnativeComponentTest { mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -467,7 +461,6 @@ public class KnativeComponentTest { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1"); mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock.expectedHeaderReceived("CE-EventID", "myEventID"); @@ -475,6 +468,7 @@ public class KnativeComponentTest { mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock.expectedBodiesReceived("test"); + mock.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -553,22 +547,22 @@ public class KnativeComponentTest { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessageCount(1); mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock1.expectedHeaderReceived("CE-CloudEventsVersion", "0.1"); mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock1.expectedHeaderReceived("CE-EventID", "myEventID1"); mock1.expectedHeaderReceived("CE-Source", "CE1"); mock1.expectedBodiesReceived("test"); + mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessageCount(1); mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); mock2.expectedHeaderReceived("CE-CloudEventsVersion", "0.1"); mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); mock2.expectedHeaderReceived("CE-EventID", "myEventID2"); mock2.expectedHeaderReceived("CE-Source", "CE2"); mock2.expectedBodiesReceived("test"); + mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", @@ -596,112 +590,4 @@ public class KnativeComponentTest { mock1.assertIsSatisfied(); mock2.assertIsSatisfied(); } - - @Test - void testDefaultHeadersFilter() throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - - KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( - new KnativeEnvironment.KnativeServiceDefinition( - Knative.Type.endpoint, - Knative.Protocol.http, - "myEndpoint", - "localhost", - port, - KnativeSupport.mapOf( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", - Knative.CONTENT_TYPE, "text/plain" - )) - )); - - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion()); - component.setEnvironment(env); - - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:source") - .setHeader("CamelHeader") - .constant("CamelHeaderValue") - .setHeader("MyHeader") - .constant("MyHeaderValue") - .to("knative:endpoint/myEndpoint") - .to("mock:source"); - - fromF("undertow:http://localhost:%d", port) - .setBody().constant("test"); - } - }); - - context.start(); - - MockEndpoint mock = context.getEndpoint("mock:source", MockEndpoint.class); - mock.expectedMessageCount(1); - - context.createProducerTemplate().sendBody("direct:source", "test"); - - mock.assertIsSatisfied(); - - - assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).doesNotContainKey("CamelHeader"); - assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).containsEntry("MyHeader", "MyHeaderValue"); - } - - @Test - void testCustomHeadersFilter() throws Exception { - final int port = AvailablePortFinder.getNextAvailable(); - - KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( - new KnativeEnvironment.KnativeServiceDefinition( - Knative.Type.endpoint, - Knative.Protocol.http, - "myEndpoint", - "localhost", - port, - KnativeSupport.mapOf( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", - Knative.CONTENT_TYPE, "text/plain" - )) - )); - - KnativeComponent component = context.getComponent("knative", KnativeComponent.class); - component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion()); - component.setEnvironment(env); - - DefaultHeaderFilterStrategy hfs = new DefaultHeaderFilterStrategy(); - hfs.setOutFilterPattern("(?i)(My)[\\.|a-z|A-z|0-9]*"); - hfs.setInFilterPattern("(?i)(My)[\\.|a-z|A-z|0-9]*"); - - - context.getRegistry().bind("myFilterStrategy", hfs); - - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:source") - .setHeader("CamelHeader") - .constant("CamelHeaderValue") - .setHeader("MyHeader") - .constant("MyHeaderValue") - .to("knative:endpoint/myEndpoint?transport.headerFilterStrategy=#myFilterStrategy") - .to("mock:source"); - - fromF("undertow:http://localhost:%d?headerFilterStrategy=#myFilterStrategy", port) - .setBody().constant("test"); - } - }); - - context.start(); - - MockEndpoint mock = context.getEndpoint("mock:source", MockEndpoint.class); - mock.expectedMessageCount(1); - - context.createProducerTemplate().sendBody("direct:source", "test"); - - mock.assertIsSatisfied(); - - assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).doesNotContainKey("MyHeader"); - assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).containsEntry("CamelHeader", "CamelHeaderValue"); - } } diff --git a/pom.xml b/pom.xml index e9c0068..9458562 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ <immutables.version>2.7.5</immutables.version> <semver4j.version>3.0.0</semver4j.version> <undertow.version>1.4.26.Final</undertow.version> + <vertx.version>3.8.0</vertx.version> <graalvm.version>19.1.1</graalvm.version> <gmavenplus-plugin.version>1.7.1</gmavenplus-plugin.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>