This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 0c46180 CAMEL-12835: Fixed camel-json-validator to deal with streaming content not being re-readable and therefore favour using stream caching. 0c46180 is described below commit 0c46180bc3fef6fb6daf9681a9b388ab74c86e30 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Nov 2 15:16:27 2018 +0100 CAMEL-12835: Fixed camel-json-validator to deal with streaming content not being re-readable and therefore favour using stream caching. --- .../jsonvalidator/JsonValidatorEndpoint.java | 42 +++++++++++++---- .../jsonvalidator/ValidatorInputStreamTest.java | 53 ++++++++++++++++++++++ 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java index fb07874..9841e6c 100644 --- a/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java +++ b/components/camel-json-validator/src/main/java/org/apache/camel/component/jsonvalidator/JsonValidatorEndpoint.java @@ -26,12 +26,12 @@ import com.networknt.schema.ValidationMessage; import org.apache.camel.Component; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.StreamCache; import org.apache.camel.ValidationException; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.ResourceEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; -import org.apache.camel.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +75,20 @@ public class JsonValidatorEndpoint extends ResourceEndpoint { @Override protected void onExchange(Exchange exchange) throws Exception { - InputStream is = null; + StreamCache cache = null; + + // if the content is an input stream then its likely not re-readable so we need to make it stream cached + Object content = getContentToValidate(exchange); + if (!(content instanceof StreamCache) && content instanceof InputStream) { + cache = exchange.getContext().getTypeConverter().convertTo(StreamCache.class, exchange, content); + if (cache != null) { + if (shouldUseHeader()) { + exchange.getIn().setHeader(headerName, cache); + } else { + exchange.getIn().setBody(cache); + } + } + } // Get a local copy of the current schema to improve concurrency. JsonSchema localSchema = this.schema; @@ -83,19 +96,26 @@ public class JsonValidatorEndpoint extends ResourceEndpoint { localSchema = getOrCreateSchema(); } try { - is = getContentToValidate(exchange, InputStream.class); if (shouldUseHeader()) { - if (is == null && isFailOnNullHeader()) { + if (content == null && isFailOnNullHeader()) { throw new NoJsonHeaderValidationException(exchange, headerName); } } else { - if (is == null && isFailOnNullBody()) { + if (content == null && isFailOnNullBody()) { throw new NoJsonBodyValidationException(exchange); } } - if (is != null) { + if (content != null) { + // favour using stream caching + if (cache == null) { + cache = exchange.getContext().getTypeConverter().convertTo(StreamCache.class, exchange, content); + } ObjectMapper mapper = new ObjectMapper(); + InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, cache != null ? cache : content); JsonNode node = mapper.readTree(is); + if (node == null) { + throw new NoJsonBodyValidationException(exchange); + } Set<ValidationMessage> errors = localSchema.validate(node); if (errors.size() > 0) { @@ -114,15 +134,17 @@ public class JsonValidatorEndpoint extends ResourceEndpoint { this.errorHandler.handleErrors(exchange, schema, e); } } finally { - IOHelper.close(is); + if (cache != null) { + cache.reset(); + } } } - private <T> T getContentToValidate(Exchange exchange, Class<T> clazz) { + private Object getContentToValidate(Exchange exchange) { if (shouldUseHeader()) { - return exchange.getIn().getHeader(headerName, clazz); + return exchange.getIn().getHeader(headerName); } else { - return exchange.getIn().getBody(clazz); + return exchange.getIn().getBody(); } } diff --git a/components/camel-json-validator/src/test/java/org/apache/camel/component/jsonvalidator/ValidatorInputStreamTest.java b/components/camel-json-validator/src/test/java/org/apache/camel/component/jsonvalidator/ValidatorInputStreamTest.java new file mode 100644 index 0000000..3bf1d5e --- /dev/null +++ b/components/camel-json-validator/src/test/java/org/apache/camel/component/jsonvalidator/ValidatorInputStreamTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jsonvalidator; + +import java.io.ByteArrayInputStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ValidatorInputStreamTest extends CamelTestSupport { + + @Test + public void testReadTwice() throws Exception { + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + + String body = "{ \"name\": \"Joe Doe\", \"id\": 1, \"price\": 12.5 }"; + ByteArrayInputStream bais = new ByteArrayInputStream(body.getBytes()); + + template.sendBody("direct:start", bais); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("json-validator:org/apache/camel/component/jsonvalidator/schema.json") + .to("mock:foo") + .to("json-validator:org/apache/camel/component/jsonvalidator/schema.json") + .to("mock:bar"); + } + }; + } +}