CAMEL-7354: camel-spark component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/183bbef2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/183bbef2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/183bbef2 Branch: refs/heads/master Commit: 183bbef21c8b4db6b1b0f83052df395c7adc8537 Parents: 3ff1072 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jun 27 17:34:29 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jun 27 17:34:29 2014 +0200 ---------------------------------------------------------------------- components/camel-spark/pom.xml | 32 +++++++ .../camel/component/spark/CamelSparkRoute.java | 9 +- .../component/spark/DefaultSparkBinding.java | 89 +++++++++++++++++++- .../camel/component/spark/SparkBinding.java | 22 ++++- .../component/spark/SparkConfiguration.java | 10 +++ .../camel/component/spark/SparkConstants.java | 26 ++++++ .../spark/SparkHeaderFilterStrategy.java | 4 + 7 files changed, 188 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-spark/pom.xml b/components/camel-spark/pom.xml index d3f48f2..16f7c81 100644 --- a/components/camel-spark/pom.xml +++ b/components/camel-spark/pom.xml @@ -101,4 +101,36 @@ </dependency> </dependencies> + <!-- unit testing requires java 8 --> + <profiles> + <profile> + <id>jdk8-test</id> + <activation> + <jdk>1.8</jdk> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>false</skipTests> + <forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java index c418f88..76f1fa0 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/CamelSparkRoute.java @@ -20,6 +20,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.util.ObjectHelper; import spark.Request; import spark.Response; import spark.Route; @@ -47,8 +48,12 @@ public class CamelSparkRoute implements Route { exchange.setException(e); } - if (exchange.getException() != null) { - // TODO: how to handle exchange failures + Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + + try { + endpoint.getSparkBinding().toSparkResponse(msg, response, endpoint.getSparkConfiguration()); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); } if (exchange.hasOut()) { http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java index f40b449..854f94b 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/DefaultSparkBinding.java @@ -16,16 +16,27 @@ */ package org.apache.camel.component.spark; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; +import java.util.Iterator; import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.TypeConverter; import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.MessageHelper; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import spark.Request; +import spark.Response; public class DefaultSparkBinding implements SparkBinding { @@ -90,12 +101,88 @@ public class DefaultSparkBinding implements SparkBinding { } String[] splat = request.splat(); - String key = "splat"; + String key = SparkConstants.SPLAT; if (headerFilterStrategy != null && !headerFilterStrategy.applyFilterToExternalHeaders(key, splat, exchange)) { SparkHelper.appendHeader(headers, key, splat); } + } + + @Override + public void toSparkResponse(Message message, Response response, SparkConfiguration configuration) throws Exception { + LOG.trace("toSparkResponse: {}", message); + + // the response code is 200 for OK and 500 for failed + boolean failed = message.getExchange().isFailed(); + int defaultCode = failed ? 500 : 200; + + int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class); + response.status(code); + LOG.trace("HTTP Status Code: {}", code); + + TypeConverter tc = message.getExchange().getContext().getTypeConverter(); + + // append headers + // must use entrySet to ensure case of keys is preserved + for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + // use an iterator as there can be multiple values. (must not use a delimiter) + final Iterator<?> it = ObjectHelper.createIterator(value, null); + while (it.hasNext()) { + String headerValue = tc.convertTo(String.class, it.next()); + if (headerValue != null && headerFilterStrategy != null + && !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { + LOG.trace("HTTP-Header: {}={}", key, headerValue); + response.header(key, headerValue); + } + } + } + + // set the content type in the response. + String contentType = MessageHelper.getContentType(message); + if (contentType != null) { + // set content-type + response.header(Exchange.CONTENT_TYPE, contentType); + LOG.trace("Content-Type: {}", contentType); + } + + Object body = message.getBody(); + Exception cause = message.getExchange().getException(); + + // if there was an exception then use that as body + if (cause != null) { + if (configuration.isTransferException()) { + // we failed due an exception, and transfer it as java serialized object + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(cause); + oos.flush(); + IOHelper.close(oos, bos); + + body = bos.toByteArray(); + // force content type to be serialized java object + message.setHeader(Exchange.CONTENT_TYPE, SparkConstants.CONTENT_TYPE_JAVA_SERIALIZED_OBJECT); + } else { + // we failed due an exception so print it as plain text + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + cause.printStackTrace(pw); + + // the body should then be the stacktrace + body = sw.toString().getBytes(); + // force content type to be text/plain as that is what the stacktrace is + message.setHeader(Exchange.CONTENT_TYPE, "text/plain"); + } + // and mark the exception as failure handled, as we handled it by returning it as the response + ExchangeHelper.setFailureHandled(message.getExchange()); + } + + if (body != null) { + String str = tc.mandatoryConvertTo(String.class, message.getExchange(), body); + response.body(str); + } } /** http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java index 61d8798..21053ae 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkBinding.java @@ -21,13 +21,14 @@ import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.Message; import spark.Request; +import spark.Response; public interface SparkBinding { /** * Binds from Spark {@link Request} to Camel {@link org.apache.camel.Message}. * - * @param request the netty http request + * @param request the Spark request * @param exchange the exchange that should contain the returned message. * @param configuration configuration * @return the message to store on the given exchange @@ -35,6 +36,25 @@ public interface SparkBinding { */ Message toCamelMessage(Request request, Exchange exchange, SparkConfiguration configuration) throws Exception; + /** + * Binds from Spark {@link Request} to Camel headers as a {@link Map}. + * + * @param request the Spark request + * @param headers the Camel headers that should be populated + * @param exchange the exchange that should contain the returned message. + * @param configuration the endpoint configuration + * @throws Exception is thrown if error during binding + */ void populateCamelHeaders(Request request, Map<String, Object> headers, Exchange exchange, SparkConfiguration configuration) throws Exception; + /** + * Binds from Camel {@link Message} to Spark {@link Response}. + * + * @param message the Camel message + * @param response the Spark response to bind to + * @param configuration the endpoint configuration + * @throws Exception is thrown if error during binding + */ + void toSparkResponse(Message message, Response response, SparkConfiguration configuration) throws Exception; + } http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java index 0c6d2b7..a24245a 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConfiguration.java @@ -28,6 +28,8 @@ public class SparkConfiguration { private boolean disableStreamCache; @UriParam private boolean urlDecodeHeaders; + @UriParam + private boolean transferException; public boolean isMapHeaders() { return mapHeaders; @@ -52,4 +54,12 @@ public class SparkConfiguration { public void setUrlDecodeHeaders(boolean urlDecodeHeaders) { this.urlDecodeHeaders = urlDecodeHeaders; } + + public boolean isTransferException() { + return transferException; + } + + public void setTransferException(boolean transferException) { + this.transferException = transferException; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java new file mode 100644 index 0000000..8a026fa --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkConstants.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +public final class SparkConstants { + + public static final String CONTENT_TYPE_JAVA_SERIALIZED_OBJECT = "application/x-java-serialized-object"; + public static final String SPLAT = "splat"; + + private SparkConstants() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/183bbef2/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java index 33e0eb0..f729bde 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkHeaderFilterStrategy.java @@ -31,6 +31,7 @@ public class SparkHeaderFilterStrategy extends DefaultHeaderFilterStrategy { getOutFilter().add("content-length"); getOutFilter().add("content-type"); getOutFilter().add("host"); + getOutFilter().add("user-agent"); // Add the filter for the Generic Message header // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.5 getOutFilter().add("cache-control"); @@ -48,6 +49,9 @@ public class SparkHeaderFilterStrategy extends DefaultHeaderFilterStrategy { // filter headers begin with "Camel" or "org.apache.camel" // must ignore case for Http based transports setOutFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); + + // filter out splat as its an internal header + getOutFilter().add(SparkConstants.SPLAT); } }