CAMEL-7080 Add support of lazy load to csv data format with thanks to Daneel
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1fc43d0b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1fc43d0b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1fc43d0b Branch: refs/heads/camel-2.12.x Commit: 1fc43d0bcbb6ef1fcbaac8cc0c6941ad77028962 Parents: 5fae2a3 Author: Willem Jiang <[email protected]> Authored: Sun Dec 22 11:16:44 2013 +0800 Committer: Willem Jiang <[email protected]> Committed: Sun Dec 22 11:48:31 2013 +0800 ---------------------------------------------------------------------- .../apache/camel/builder/DataFormatClause.java | 8 ++ .../camel/model/dataformat/CsvDataFormat.java | 21 +++- components/camel-csv/pom.xml | 6 ++ .../camel/dataformat/csv/CsvDataFormat.java | 68 +++++++++---- .../camel/dataformat/csv/CsvIterator.java | 63 ++++++++++++ .../camel/dataformat/csv/CsvIteratorTest.java | 100 +++++++++++++++++++ .../csv/CsvUnmarshalStreamSpringTest.java | 56 +++++++++++ .../dataformat/csv/CsvUnmarshalStreamTest.java | 79 +++++++++++++++ .../CsvUnmarshalStreamSpringTest-context.xml | 32 ++++++ 9 files changed, 410 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java index c2ce089..b28d062 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java +++ b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java @@ -181,6 +181,14 @@ public class DataFormatClause<T extends ProcessorDefinition<?>> { } /** + * Uses the CSV data format for a huge file. + * Sequential access through an iterator. + */ + public T csvLazyLoad() { + return dataFormat(new CsvDataFormat(true)); + } + + /** * Uses the custom data format */ public T custom(String ref) { http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java index 3d5d534..cfd4f6b 100644 --- a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java +++ b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java @@ -46,6 +46,8 @@ public class CsvDataFormat extends DataFormatDefinition { private String strategyRef; @XmlAttribute private Boolean skipFirstLine; + @XmlAttribute + private Boolean lazyLoad; public CsvDataFormat() { super("csv"); @@ -56,6 +58,11 @@ public class CsvDataFormat extends DataFormatDefinition { setDelimiter(delimiter); } + public CsvDataFormat(boolean lazyLoad) { + this(); + setLazyLoad(lazyLoad); + } + public Boolean isAutogenColumns() { return autogenColumns; } @@ -96,6 +103,14 @@ public class CsvDataFormat extends DataFormatDefinition { this.skipFirstLine = skipFirstLine; } + public Boolean getLazyLoad() { + return lazyLoad; + } + + public void setLazyLoad(Boolean lazyLoad) { + this.lazyLoad = lazyLoad; + } + @Override protected DataFormat createDataFormat(RouteContext routeContext) { DataFormat csvFormat = super.createDataFormat(routeContext); @@ -131,5 +146,9 @@ public class CsvDataFormat extends DataFormatDefinition { if (skipFirstLine != null) { setProperty(camelContext, dataFormat, "skipFirstLine", skipFirstLine); } + + if (lazyLoad != null) { + setProperty(camelContext, dataFormat, "lazyLoad", lazyLoad); + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/components/camel-csv/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-csv/pom.xml b/components/camel-csv/pom.xml index 10fd973..a37beca 100644 --- a/components/camel-csv/pom.xml +++ b/components/camel-csv/pom.xml @@ -57,6 +57,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>com.googlecode.jmockit</groupId> + <artifactId>jmockit</artifactId> + <version>1.5</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java ---------------------------------------------------------------------- diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java index 9ad7b53..4ce6e7e 100644 --- a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java @@ -16,13 +16,15 @@ */ package org.apache.camel.dataformat.csv; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -54,6 +56,10 @@ public class CsvDataFormat implements DataFormat { private boolean autogenColumns = true; private String delimiter; private boolean skipFirstLine; + /** + * Lazy row loading with iterator for big files. + */ + private boolean lazyLoad; public void marshal(Exchange exchange, Object object, OutputStream outputStream) throws Exception { if (delimiter != null) { @@ -96,32 +102,42 @@ public class CsvDataFormat implements DataFormat { strategy.setDelimiter(config.getDelimiter()); InputStreamReader in = new InputStreamReader(inputStream, IOHelper.getCharsetName(exchange)); - + CsvIterator csvIterator; try { - CSVParser parser = new CSVParser(in, strategy); - List<List<String>> list = new ArrayList<List<String>>(); - boolean isFirstLine = true; - while (true) { - String[] strings = parser.getLine(); - if (isFirstLine) { - isFirstLine = false; - if (skipFirstLine) { - // skip considering the first line if we're asked to do so - continue; - } - } - if (strings == null) { - break; - } - List<String> line = Arrays.asList(strings); - list.add(line); + CSVParser parser = createParser(in); + if (parser == null) { + IOHelper.close(in); + return Collections.emptyIterator(); } - return list; - } finally { + csvIterator = new CsvIterator(parser, in); + } catch (IOException e) { IOHelper.close(in); + throw e; + } + if (lazyLoad) { + return csvIterator; } + return loadAllAsList(csvIterator); } - + + private CSVParser createParser(InputStreamReader in) throws IOException { + CSVParser parser = new CSVParser(in, strategy); + if (skipFirstLine) { + if (null == parser.getLine()) { + return null; + } + } + return parser; + } + + private List<List<String>> loadAllAsList(CsvIterator iter) throws IOException { + List<List<String>> list = new ArrayList<List<String>>(); + while (iter.hasNext()) { + list.add(iter.next()); + } + return list; + } + public String getDelimiter() { return delimiter; } @@ -170,6 +186,14 @@ public class CsvDataFormat implements DataFormat { this.skipFirstLine = skipFirstLine; } + public boolean isLazyLoad() { + return lazyLoad; + } + + public void setLazyLoad(boolean lazyLoad) { + this.lazyLoad = lazyLoad; + } + private synchronized void updateFieldsInConfig(Set<?> set, Exchange exchange) { for (Object value : set) { if (value != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java ---------------------------------------------------------------------- diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java new file mode 100644 index 0000000..68df9c4 --- /dev/null +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java @@ -0,0 +1,63 @@ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.util.IOHelper; +import org.apache.commons.csv.CSVParser; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + */ +public class CsvIterator implements Iterator<List<String>>, Closeable { + + private final CSVParser parser; + private final InputStreamReader in; + private String[] line; + + public CsvIterator(CSVParser parser, InputStreamReader in) + throws IOException + { + this.parser = parser; + this.in = in; + line = parser.getLine(); + } + + @Override + public boolean hasNext() { + return line != null; + } + + @Override + public List<String> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + List<String> result = Arrays.asList(line); + try { + line = parser.getLine(); + } catch (IOException e) { + line = null; + IOHelper.close(in); + throw new IllegalStateException(e); + } + if (line == null) { + IOHelper.close(in); + } + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + in.close(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java new file mode 100644 index 0000000..db60e9a --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java @@ -0,0 +1,100 @@ +package org.apache.camel.dataformat.csv; + +import mockit.Expectations; +import mockit.Injectable; +import org.apache.commons.csv.CSVParser; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.NoSuchElementException; + +/** + */ +public class CsvIteratorTest { + + public static final String HDD_CRASH = "HDD crash"; + + @Test + public void closeIfError( + final @Injectable InputStreamReader reader, + final @Injectable CSVParser parser) + throws IOException + { + new Expectations() { + { + parser.getLine(); + result = new String[] { "1" }; + + parser.getLine(); + result = new String[] { "2" }; + + parser.getLine(); + result = new IOException(HDD_CRASH); + + reader.close(); + } + }; + + CsvIterator iterator = new CsvIterator(parser, reader); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("1"), iterator.next()); + Assert.assertTrue(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (IllegalStateException e) { + Assert.assertEquals(HDD_CRASH, e.getCause().getMessage()); + } + + Assert.assertFalse(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (NoSuchElementException e) { + // okay + } + } + + @Test + public void normalCycle(final @Injectable InputStreamReader reader, + final @Injectable CSVParser parser) + throws IOException + { + new Expectations() { + { + parser.getLine(); + result = new String[] { "1" }; + + parser.getLine(); + result = new String[] { "2" }; + + parser.getLine(); + result = null; + + reader.close(); + } + }; + + CsvIterator iterator = new CsvIterator(parser, reader); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("1"), iterator.next()); + + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("2"), iterator.next()); + + Assert.assertFalse(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (NoSuchElementException e) { + // okay + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java ---------------------------------------------------------------------- diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java new file mode 100644 index 0000000..0727801 --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.EndpointInject; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import java.util.Arrays; +import java.util.Iterator; + +public class CsvUnmarshalStreamSpringTest extends CamelSpringTestSupport { + + public static final String MESSAGE = "message"; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @SuppressWarnings("unchecked") + @Test + public void testCsvUnMarshal() throws Exception { + result.expectedMessageCount(1); + + template.sendBody("direct:start", MESSAGE + "\n"); + + assertMockEndpointsSatisfied(); + + Iterator<String> body = result.getReceivedExchanges().get(0) + .getIn().getBody(Iterator.class); + assertEquals(CsvIterator.class, body.getClass()); + assertEquals(Arrays.asList(MESSAGE), body.next()); + } + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext( + "org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java ---------------------------------------------------------------------- diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java new file mode 100644 index 0000000..21627b2 --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Spring based integration test for the <code>CsvDataFormat</code> + * @version + */ +public class CsvUnmarshalStreamTest extends CamelTestSupport { + + public static final int EXPECTED_COUNT = 3; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @SuppressWarnings("unchecked") + @Test + public void testCsvUnMarshal() throws Exception { + result.expectedMessageCount(EXPECTED_COUNT); + + String message = ""; + for (int i = 0; i < EXPECTED_COUNT; ++i) { + message += i + "|\"" + i + "\n" + i + "\"\n"; + } + + template.sendBody("direct:start", message); + + assertMockEndpointsSatisfied(); + + for (int i = 0; i < EXPECTED_COUNT; ++i) { + List<String> body = result.getReceivedExchanges().get(i) + .getIn().getBody(List.class); + assertEquals(2, body.size()); + assertEquals(String.valueOf(i), body.get(0)); + assertEquals(String.format("%d\n%d", i, i), body.get(1)); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + CsvDataFormat csv = new CsvDataFormat(); + csv.setLazyLoad(true); + csv.setDelimiter("|"); + + from("direct:start") + .unmarshal(csv) + .split(body()) + .to("mock:result"); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1fc43d0b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml ---------------------------------------------------------------------- diff --git a/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml new file mode 100644 index 0000000..f510201 --- /dev/null +++ b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version + 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 Unless required by + applicable law or agreed to in writing, software distributed under + the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and + limitations under the License. + --> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:start" /> + <unmarshal> + <csv delimiter="|" lazyLoad="true"/> + </unmarshal> + <to uri="mock:result" /> + </route> + </camelContext> +</beans>
