CAMEL-9966: Restlet - Should not enable stream by default.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7c887123 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7c887123 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7c887123 Branch: refs/heads/camel-2.16.x Commit: 7c887123e4508f549fdb1c1de48ad59f85eb0da6 Parents: c1b40fb Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 13 16:50:20 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri May 13 16:58:59 2016 +0200 ---------------------------------------------------------------------- .../restlet/DefaultRestletBinding.java | 27 ++++++++++- .../component/restlet/RestletEndpoint.java | 51 ++++++++++++++++++-- .../component/restlet/RestletOnCompletion.java | 38 +++++++++++++++ .../RestletProducerBinaryStreamTest.java | 22 ++++++++- 4 files changed, 129 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7c887123/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java index 80ddd4c..6765ce4 100644 --- a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java +++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java @@ -76,6 +76,8 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate private static final Logger LOG = LoggerFactory.getLogger(DefaultRestletBinding.class); private static final String RFC_2822_DATE_PATTERN = "EEE, dd MMM yyyy HH:mm:ss Z"; private HeaderFilterStrategy headerFilterStrategy; + private boolean streamRepresentation; + private boolean autoCloseStream; public void populateExchangeFromRestletRequest(Request request, Response response, Exchange exchange) throws Exception { Message inMessage = exchange.getIn(); @@ -365,9 +367,14 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate LOG.debug("Setting the Content-Type to be {}", mediaType.toString()); exchange.getOut().setHeader(Exchange.CONTENT_TYPE, mediaType.toString()); } - if (response.getEntity() instanceof StreamRepresentation) { + if (streamRepresentation && response.getEntity() instanceof StreamRepresentation) { Representation representationDecoded = new DecodeRepresentation(response.getEntity()); - exchange.getOut().setBody(representationDecoded.getStream()); + InputStream is = representationDecoded.getStream(); + exchange.getOut().setBody(is); + if (autoCloseStream) { + // ensure the input stream is closed when we are done routing + exchange.addOnCompletion(new RestletOnCompletion(is)); + } } else if (response.getEntity() instanceof Representation) { Representation representationDecoded = new DecodeRepresentation(response.getEntity()); exchange.getOut().setBody(representationDecoded.getText()); @@ -579,4 +586,20 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { headerFilterStrategy = strategy; } + + public boolean isStreamRepresentation() { + return streamRepresentation; + } + + public void setStreamRepresentation(boolean streamRepresentation) { + this.streamRepresentation = streamRepresentation; + } + + public boolean isAutoCloseStream() { + return autoCloseStream; + } + + public void setAutoCloseStream(boolean autoCloseStream) { + this.autoCloseStream = autoCloseStream; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/7c887123/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java index a88a6fb..1fa26f5 100644 --- a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java +++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java @@ -16,12 +16,14 @@ */ package org.apache.camel.component.restlet; +import java.io.InputStream; import java.util.List; import java.util.Map; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; @@ -68,18 +70,22 @@ public class RestletEndpoint extends DefaultEndpoint implements HeaderFilterStra private Method[] restletMethods; @UriParam(label = "consumer") private List<String> restletUriPatterns; - @UriParam + @UriParam(label = "security") private Map<String, String> restletRealm; - @UriParam + @UriParam(label = "advanced") private HeaderFilterStrategy headerFilterStrategy; - @UriParam + @UriParam(label = "advanced") private RestletBinding restletBinding; @UriParam(label = "producer", defaultValue = "true") private boolean throwExceptionOnFailure = true; - @UriParam + @UriParam(label = "advanced") private boolean disableStreamCache; - @UriParam + @UriParam(label = "security") private SSLContextParameters sslContextParameters; + @UriParam(label = "producer,advanced") + private boolean streamRepresentation; + @UriParam(label = "producer,advanced") + private boolean autoCloseStream; public RestletEndpoint(RestletComponent component, String remaining) throws Exception { super(remaining, component); @@ -308,6 +314,37 @@ public class RestletEndpoint extends DefaultEndpoint implements HeaderFilterStra this.sslContextParameters = scp; } + public boolean isStreamRepresentation() { + return streamRepresentation; + } + + /** + * Whether to support stream representation as response from calling a REST service using the restlet producer. + * If the response is streaming then this option can be enabled to use an {@link java.io.InputStream} as the + * message body on the Camel {@link Message} body. If using this option you may want to enable the + * autoCloseStream option as well to ensure the input stream is closed when the Camel {@link Exchange} + * is done being routed. However if you need to read the stream outside a Camel route, you may need + * to not auto close the stream. + */ + public void setStreamRepresentation(boolean streamRepresentation) { + this.streamRepresentation = streamRepresentation; + } + + public boolean isAutoCloseStream() { + return autoCloseStream; + } + + /** + * Whether to auto close the stream representation as response from calling a REST service using the restlet producer. + * If the response is streaming and the option streamRepresentation is enabled then you may want to auto close + * the {@link InputStream} from the streaming response to ensure the input stream is closed when the Camel {@link Exchange} + * is done being routed. However if you need to read the stream outside a Camel route, you may need + * to not auto close the stream. + */ + public void setAutoCloseStream(boolean autoCloseStream) { + this.autoCloseStream = autoCloseStream; + } + // Update the endpointUri with the restlet method information protected void updateEndpointUri() { String endpointUri = getEndpointUri(); @@ -338,6 +375,10 @@ public class RestletEndpoint extends DefaultEndpoint implements HeaderFilterStra if (restletBinding instanceof HeaderFilterStrategyAware) { ((HeaderFilterStrategyAware) restletBinding).setHeaderFilterStrategy(getHeaderFilterStrategy()); } + if (restletBinding instanceof DefaultRestletBinding) { + ((DefaultRestletBinding) restletBinding).setStreamRepresentation(isStreamRepresentation()); + ((DefaultRestletBinding) restletBinding).setAutoCloseStream(isAutoCloseStream()); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/7c887123/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java new file mode 100644 index 0000000..5c2f684 --- /dev/null +++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.restlet; + +import java.io.InputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.support.SynchronizationAdapter; +import org.apache.camel.util.IOHelper; + +public class RestletOnCompletion extends SynchronizationAdapter { + + private final InputStream inputStream; + + public RestletOnCompletion(InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public void onDone(Exchange exchange) { + IOHelper.close(inputStream); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7c887123/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java b/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java index 4cb24e7..bc15e17 100644 --- a/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java +++ b/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java @@ -17,6 +17,8 @@ package org.apache.camel.component.restlet; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -34,7 +36,7 @@ public class RestletProducerBinaryStreamTest extends RestletTestSupport { @Test public void shouldHandleBinaryOctetStream() throws Exception { - Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream", null); + Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream?streamRepresentation=true", null); assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("application/octet-stream")); assertThat(response.getOut().getBody(byte[].class), equalTo(getAllBytes())); @@ -42,12 +44,28 @@ public class RestletProducerBinaryStreamTest extends RestletTestSupport { @Test public void shouldHandleBinaryAudioMpeg() throws Exception { - Exchange response = template.request("restlet:http://localhost:" + portNum + "/audio/mpeg", null); + Exchange response = template.request("restlet:http://localhost:" + portNum + "/audio/mpeg?streamRepresentation=true", null); assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("audio/mpeg")); assertThat(response.getOut().getBody(byte[].class), equalTo(getAllBytes())); } + @Test + public void shouldAutoClose() throws Exception { + Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream?streamRepresentation=true&autoCloseStream=true", null); + + assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("application/octet-stream")); + InputStream is = (InputStream) response.getOut().getBody(); + assertNotNull(is); + + try { + is.read(); + fail("Should be closed"); + } catch (IOException e) { + // expected + } + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {