Repository: camel Updated Branches: refs/heads/master b4cb8b46c -> af1cbd6fc
CAMEL-7415 fixed the issue that lazyLoad with CSV blows up on last line Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af1cbd6f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af1cbd6f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af1cbd6f Branch: refs/heads/master Commit: af1cbd6fc44a4a6ff807d02a0dab77ecd86f1d61 Parents: b4cb8b4 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue May 6 14:59:57 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue May 6 15:00:18 2014 +0800 ---------------------------------------------------------------------- .../camel/processor/UnmarshalProcessor.java | 10 ++++-- .../dataformat/csv/CsvUnmarshalStreamTest.java | 38 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/af1cbd6f/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java index c248fe1..934fd2a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/UnmarshalProcessor.java @@ -17,6 +17,7 @@ package org.apache.camel.processor; import java.io.InputStream; +import java.util.Iterator; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -55,6 +56,7 @@ public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor ObjectHelper.notNull(dataFormat, "dataFormat"); InputStream stream = null; + Object result = null; try { stream = exchange.getIn().getMandatoryBody(InputStream.class); @@ -62,7 +64,7 @@ public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor Message out = exchange.getOut(); out.copyFrom(exchange.getIn()); - Object result = dataFormat.unmarshal(exchange, stream); + result = dataFormat.unmarshal(exchange, stream); if (result instanceof Exchange) { if (result != exchange) { // it's not allowed to return another exchange other than the one provided to dataFormat @@ -79,9 +81,11 @@ public class UnmarshalProcessor extends ServiceSupport implements AsyncProcessor exchange.setOut(null); exchange.setException(e); } finally { - IOHelper.close(stream, "input stream"); + // The Iterator will close the stream itself + if (!(result instanceof Iterator)) { + IOHelper.close(stream, "input stream"); + } } - callback.done(true); return true; } http://git-wip-us.apache.org/repos/asf/camel/blob/af1cbd6f/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java ---------------------------------------------------------------------- diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java index 355cd1e..f41a570 100644 --- a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java @@ -16,6 +16,10 @@ */ package org.apache.camel.dataformat.csv; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.List; import org.apache.camel.EndpointInject; @@ -38,6 +42,7 @@ public class CsvUnmarshalStreamTest extends CamelTestSupport { @SuppressWarnings("unchecked") @Test public void testCsvUnMarshal() throws Exception { + result.reset(); result.expectedMessageCount(EXPECTED_COUNT); String message = ""; @@ -58,6 +63,39 @@ public class CsvUnmarshalStreamTest extends CamelTestSupport { } } + @SuppressWarnings("unchecked") + @Test + public void testCsvUnMarshalWithFile() throws Exception { + result.reset(); + result.expectedMessageCount(EXPECTED_COUNT); + + + template.sendBody("direct:start", new MyFileInputStream(new File("src/test/resources/data.csv"))); + + assertMockEndpointsSatisfied(); + + for (int i = 0; i < EXPECTED_COUNT; ++i) { + List<String> body = result.getReceivedExchanges().get(i) + .getIn().getBody(List.class); + assertEquals(2, body.size()); + assertEquals(String.valueOf(i), body.get(0)); + assertEquals(String.format("%d\n%d", i, i), body.get(1)); + } + } + + class MyFileInputStream extends FileInputStream { + + public MyFileInputStream(File file) throws FileNotFoundException { + super(file); + } + + public void close() throws IOException { + // Use this to find out how camel close the FileInputStream + super.close(); + } + + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {