CAMEL-9966: Restlet - Should not enable stream by default.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7a0c3f8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7a0c3f8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7a0c3f8 Branch: refs/heads/kube-lb Commit: c7a0c3f8373139b171e9b7fac41c32412e8bf6bb Parents: 81f23c1 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 13 16:50:20 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 16 09:59:33 2016 +0200 ---------------------------------------------------------------------- .../restlet/DefaultRestletBinding.java | 27 ++++++++++- .../component/restlet/RestletEndpoint.java | 51 ++++++++++++++++++-- .../component/restlet/RestletOnCompletion.java | 38 +++++++++++++++ .../RestletProducerBinaryStreamTest.java | 22 ++++++++- 4 files changed, 129 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java index b95ef7a..3ab9d1b 100644 --- a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java +++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/DefaultRestletBinding.java @@ -76,6 +76,8 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate private static final Logger LOG = LoggerFactory.getLogger(DefaultRestletBinding.class); private static final String RFC_2822_DATE_PATTERN = "EEE, dd MMM yyyy HH:mm:ss Z"; private HeaderFilterStrategy headerFilterStrategy; + private boolean streamRepresentation; + private boolean autoCloseStream; public void populateExchangeFromRestletRequest(Request request, Response response, Exchange exchange) throws Exception { Message inMessage = exchange.getIn(); @@ -363,9 +365,14 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate LOG.debug("Setting the Content-Type to be {}", mediaType.toString()); exchange.getOut().setHeader(Exchange.CONTENT_TYPE, mediaType.toString()); } - if (response.getEntity() instanceof StreamRepresentation) { + if (streamRepresentation && response.getEntity() instanceof StreamRepresentation) { Representation representationDecoded = new DecodeRepresentation(response.getEntity()); - exchange.getOut().setBody(representationDecoded.getStream()); + InputStream is = representationDecoded.getStream(); + exchange.getOut().setBody(is); + if (autoCloseStream) { + // ensure the input stream is closed when we are done routing + exchange.addOnCompletion(new RestletOnCompletion(is)); + } } else if (response.getEntity() instanceof Representation) { Representation representationDecoded = new DecodeRepresentation(response.getEntity()); exchange.getOut().setBody(representationDecoded.getText()); @@ -584,4 +591,20 @@ public class DefaultRestletBinding implements RestletBinding, HeaderFilterStrate public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { headerFilterStrategy = strategy; } + + public boolean isStreamRepresentation() { + return streamRepresentation; + } + + public void setStreamRepresentation(boolean streamRepresentation) { + this.streamRepresentation = streamRepresentation; + } + + public boolean isAutoCloseStream() { + return autoCloseStream; + } + + public void setAutoCloseStream(boolean autoCloseStream) { + this.autoCloseStream = autoCloseStream; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java index 63ae6fa..ec2871b 100644 --- a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java +++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.restlet; +import java.io.InputStream; import java.util.List; import java.util.Map; @@ -23,6 +24,7 @@ import org.apache.camel.AsyncEndpoint; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; @@ -66,18 +68,22 @@ public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, H private Method[] restletMethods; @UriParam(label = "consumer") private List<String> restletUriPatterns; - @UriParam + @UriParam(label = "security") private Map<String, String> restletRealm; - @UriParam + @UriParam(label = "advanced") private HeaderFilterStrategy headerFilterStrategy; - @UriParam + @UriParam(label = "advanced") private RestletBinding restletBinding; @UriParam(label = "producer", defaultValue = "true") private boolean throwExceptionOnFailure = true; - @UriParam + @UriParam(label = "advanced") private boolean disableStreamCache; - @UriParam + @UriParam(label = "security") private SSLContextParameters sslContextParameters; + @UriParam(label = "producer,advanced") + private boolean streamRepresentation; + @UriParam(label = "producer,advanced") + private boolean autoCloseStream; public RestletEndpoint(RestletComponent component, String remaining) throws Exception { super(remaining, component); @@ -306,6 +312,37 @@ public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, H this.sslContextParameters = scp; } + public boolean isStreamRepresentation() { + return streamRepresentation; + } + + /** + * Whether to support stream representation as response from calling a REST service using the restlet producer. + * If the response is streaming then this option can be enabled to use an {@link java.io.InputStream} as the + * message body on the Camel {@link Message} body. If using this option you may want to enable the + * autoCloseStream option as well to ensure the input stream is closed when the Camel {@link Exchange} + * is done being routed. However if you need to read the stream outside a Camel route, you may need + * to not auto close the stream. + */ + public void setStreamRepresentation(boolean streamRepresentation) { + this.streamRepresentation = streamRepresentation; + } + + public boolean isAutoCloseStream() { + return autoCloseStream; + } + + /** + * Whether to auto close the stream representation as response from calling a REST service using the restlet producer. + * If the response is streaming and the option streamRepresentation is enabled then you may want to auto close + * the {@link InputStream} from the streaming response to ensure the input stream is closed when the Camel {@link Exchange} + * is done being routed. However if you need to read the stream outside a Camel route, you may need + * to not auto close the stream. + */ + public void setAutoCloseStream(boolean autoCloseStream) { + this.autoCloseStream = autoCloseStream; + } + // Update the endpointUri with the restlet method information protected void updateEndpointUri() { String endpointUri = getEndpointUri(); @@ -336,6 +373,10 @@ public class RestletEndpoint extends DefaultEndpoint implements AsyncEndpoint, H if (restletBinding instanceof HeaderFilterStrategyAware) { ((HeaderFilterStrategyAware) restletBinding).setHeaderFilterStrategy(getHeaderFilterStrategy()); } + if (restletBinding instanceof DefaultRestletBinding) { + ((DefaultRestletBinding) restletBinding).setStreamRepresentation(isStreamRepresentation()); + ((DefaultRestletBinding) restletBinding).setAutoCloseStream(isAutoCloseStream()); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java new file mode 100644 index 0000000..5c2f684 --- /dev/null +++ b/components/camel-restlet/src/main/java/org/apache/camel/component/restlet/RestletOnCompletion.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.restlet; + +import java.io.InputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.support.SynchronizationAdapter; +import org.apache.camel.util.IOHelper; + +public class RestletOnCompletion extends SynchronizationAdapter { + + private final InputStream inputStream; + + public RestletOnCompletion(InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public void onDone(Exchange exchange) { + IOHelper.close(inputStream); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c7a0c3f8/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java ---------------------------------------------------------------------- diff --git a/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java b/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java index 4cb24e7..bc15e17 100644 --- a/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java +++ b/components/camel-restlet/src/test/java/org/apache/camel/component/restlet/RestletProducerBinaryStreamTest.java @@ -17,6 +17,8 @@ package org.apache.camel.component.restlet; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; @@ -34,7 +36,7 @@ public class RestletProducerBinaryStreamTest extends RestletTestSupport { @Test public void shouldHandleBinaryOctetStream() throws Exception { - Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream", null); + Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream?streamRepresentation=true", null); assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("application/octet-stream")); assertThat(response.getOut().getBody(byte[].class), equalTo(getAllBytes())); @@ -42,12 +44,28 @@ public class RestletProducerBinaryStreamTest extends RestletTestSupport { @Test public void shouldHandleBinaryAudioMpeg() throws Exception { - Exchange response = template.request("restlet:http://localhost:" + portNum + "/audio/mpeg", null); + Exchange response = template.request("restlet:http://localhost:" + portNum + "/audio/mpeg?streamRepresentation=true", null); assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("audio/mpeg")); assertThat(response.getOut().getBody(byte[].class), equalTo(getAllBytes())); } + @Test + public void shouldAutoClose() throws Exception { + Exchange response = template.request("restlet:http://localhost:" + portNum + "/application/octet-stream?streamRepresentation=true&autoCloseStream=true", null); + + assertThat(response.getOut().getHeader(CONTENT_TYPE, String.class), equalTo("application/octet-stream")); + InputStream is = (InputStream) response.getOut().getBody(); + assertNotNull(is); + + try { + is.read(); + fail("Should be closed"); + } catch (IOException e) { + // expected + } + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {