CAMEL-7716 Migrate camel-csv component to Apache CSV 1.0

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3af1165a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3af1165a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3af1165a

Branch: refs/heads/master
Commit: 3af1165a4c7d3a9bc8d67ba479392b52a43855fb
Parents: e409033
Author: Antoine DESSAIGNE <antoine.dessai...@gmail.com>
Authored: Wed Oct 1 17:32:43 2014 +0200
Committer: Antoine DESSAIGNE <antoine.dessai...@gmail.com>
Committed: Wed Nov 26 15:20:33 2014 +0100

----------------------------------------------------------------------
 .../camel/model/dataformat/CsvDataFormat.java   | 527 ++++++----
 components/camel-csv/pom.xml                    | 142 +--
 .../camel/dataformat/csv/CsvDataFormat.java     | 960 ++++++++++++++-----
 .../camel/dataformat/csv/CsvIterator.java       |  78 --
 .../camel/dataformat/csv/CsvLineConverter.java  |  33 -
 .../camel/dataformat/csv/CsvLineConverters.java | 102 --
 .../camel/dataformat/csv/CsvMarshaller.java     | 147 +++
 .../dataformat/csv/CsvRecordConverter.java      |  38 +
 .../dataformat/csv/CsvRecordConverters.java     |  74 ++
 .../camel/dataformat/csv/CsvUnmarshaller.java   | 159 +++
 .../camel/dataformat/csv/CsvDataFormatTest.java | 438 +++++++++
 .../dataformat/csv/CsvLineConvertersTest.java   |  68 --
 .../csv/CsvMarshalAutogenColumnsSpringTest.java | 187 ++--
 .../csv/CsvMarshalPipeDelimiterSpringTest.java  | 150 +--
 .../csv/CsvMarshalPipeDelimiterTest.java        | 170 ++--
 .../camel/dataformat/csv/CsvMarshalTest.java    | 186 ++--
 .../dataformat/csv/CsvRecordConvertersTest.java |  68 ++
 .../dataformat/csv/CsvRouteCharsetTest.java     | 138 ++-
 .../camel/dataformat/csv/CsvRouteTest.java      | 414 ++++----
 .../csv/CsvUnmarshalMapLineSpringTest.java      | 112 +++
 .../dataformat/csv/CsvUnmarshalMapLineTest.java | 113 ---
 .../CsvUnmarshalPipeDelimiterSpringTest.java    | 116 +--
 .../csv/CsvUnmarshalPipeDelimiterTest.java      | 165 ++--
 .../CsvUnmarshalSkipFirstLineSpringTest.java    |  92 ++
 .../csv/CsvUnmarshalSkipFirstLineTest.java      |  93 --
 .../csv/CsvUnmarshalStreamSpringTest.java       | 125 +--
 .../dataformat/csv/CsvUnmarshalStreamTest.java  | 228 ++---
 .../csv/CsvUnmarshalTabDelimiterSpringTest.java | 143 +--
 .../csv/CsvUnmarshalTabDelimiterTest.java       | 163 ++--
 .../camel/dataformat/csv/CsvUnmarshalTest.java  | 158 +++
 ...UnmarshalTwoCsvDataFormatConcurrentTest.java | 174 ++--
 .../apache/camel/dataformat/csv/TestUtils.java  |  47 +
 ...vMarshalAutogenColumnsSpringTest-context.xml | 127 +--
 ...svMarshalPipeDelimiterSpringTest-context.xml |  64 +-
 .../CsvUnmarshalMapLineSpringTest-context.xml   | 115 ++-
 ...UnmarshalPipeDelimiterSpringTest-context.xml |  62 +-
 ...UnmarshalSkipFirstLineSpringTest-context.xml |  62 +-
 parent/pom.xml                                  |   2 +-
 38 files changed, 3853 insertions(+), 2387 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java 
b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
index b6fdae0..01613df 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
@@ -1,168 +1,359 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.model.dataformat;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.model.DataFormatDefinition;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.ObjectHelper;
-
-/**
- * Represents a CSV (Comma Separated Values) {@link 
org.apache.camel.spi.DataFormat}
- *
- * @version
- */
-@XmlRootElement(name = "csv")
-@XmlAccessorType(XmlAccessType.FIELD)
-public class CsvDataFormat extends DataFormatDefinition {
-    @XmlAttribute
-    private Boolean autogenColumns;
-    @XmlAttribute
-    private String delimiter;
-    @XmlAttribute
-    private String configRef;
-    @XmlAttribute
-    private String strategyRef;
-    @XmlAttribute
-    private Boolean skipFirstLine;
-    @XmlAttribute
-    private Boolean lazyLoad;
-    @XmlAttribute
-    private Boolean useMaps;
-
-    public CsvDataFormat() {
-        super("csv");
-    }
-
-    public CsvDataFormat(String delimiter) {
-        this();
-        setDelimiter(delimiter);
-    }
-
-    public CsvDataFormat(boolean lazyLoad) {
-        this();
-        setLazyLoad(lazyLoad);
-    }
-
-    public Boolean isAutogenColumns() {
-        return autogenColumns;
-    }
-
-    public void setAutogenColumns(Boolean autogenColumns) {
-        this.autogenColumns = autogenColumns;
-    }
-
-    public String getDelimiter() {
-        return delimiter;
-    }
-
-    public void setDelimiter(String delimiter) {
-        this.delimiter = delimiter;
-    }
-
-    public String getConfigRef() {
-        return configRef;
-    }
-
-    public void setConfigRef(String configRef) {
-        this.configRef = configRef;
-    }
-
-    public String getStrategyRef() {
-        return strategyRef;
-    }
-
-    public void setStrategyRef(String strategyRef) {
-        this.strategyRef = strategyRef;
-    }
-
-    public Boolean isSkipFirstLine() {
-        return autogenColumns;
-    }
-
-    public void setSkipFirstLine(Boolean skipFirstLine) {
-        this.skipFirstLine = skipFirstLine;
-    }
-
-    public Boolean getLazyLoad() {
-        return lazyLoad;
-    }
-
-    public void setLazyLoad(Boolean lazyLoad) {
-        this.lazyLoad = lazyLoad;
-    }
-
-    public Boolean getUseMaps() {
-        return useMaps;
-    }
-
-    public void setUseMaps(Boolean useMaps) {
-        this.useMaps = useMaps;
-    }
-
-    @Override
-    protected DataFormat createDataFormat(RouteContext routeContext) {
-        DataFormat csvFormat = super.createDataFormat(routeContext);
-
-        if (ObjectHelper.isNotEmpty(configRef)) {
-            Object config = 
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), configRef);
-            setProperty(routeContext.getCamelContext(), csvFormat, "config", 
config);
-        }
-        if (ObjectHelper.isNotEmpty(strategyRef)) {
-            Object strategy = 
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), strategyRef);
-            setProperty(routeContext.getCamelContext(), csvFormat, "strategy", 
strategy);
-        }
-
-        return csvFormat;
-    }
-
-    @Override
-    protected void configureDataFormat(DataFormat dataFormat, CamelContext 
camelContext) {
-        if (autogenColumns != null) {
-            setProperty(camelContext, dataFormat, "autogenColumns", 
autogenColumns);
-        }
-
-        if (delimiter != null) {
-            if (delimiter.length() > 1) {
-                throw new IllegalArgumentException("Delimiter must have a 
length of one!");
-            }
-            setProperty(camelContext, dataFormat, "delimiter", delimiter);
-        } else {
-            // the default delimiter is ','
-            setProperty(camelContext, dataFormat, "delimiter", ",");
-        }
-
-        if (skipFirstLine != null) {
-            setProperty(camelContext, dataFormat, "skipFirstLine", 
skipFirstLine);
-        }
-
-        if (lazyLoad != null) {
-            setProperty(camelContext, dataFormat, "lazyLoad", lazyLoad);
-        }
-
-        if (useMaps != null) {
-            setProperty(camelContext, dataFormat, "useMaps", useMaps);
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model.dataformat;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.model.DataFormatDefinition;
+import org.apache.camel.spi.DataFormat;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Represents a CSV (Comma Separated Values) {@link 
org.apache.camel.spi.DataFormat}
+ */
+@XmlRootElement(name = "csv")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class CsvDataFormat extends DataFormatDefinition {
+    // Format options
+    @XmlAttribute
+    private String formatRef;
+    @XmlAttribute
+    private String formatName;
+    @XmlAttribute
+    private Boolean commentMarkerDisabled;
+    @XmlAttribute
+    private String commentMarker;
+    @XmlAttribute
+    private String delimiter;
+    @XmlAttribute
+    private Boolean escapeDisabled;
+    @XmlAttribute
+    private String escape;
+    @XmlAttribute
+    private Boolean headerDisabled;
+    @XmlElement(name = "header")
+    private List<String> header;
+    @XmlAttribute
+    private Boolean allowMissingColumnNames;
+    @XmlAttribute
+    private Boolean ignoreEmptyLines;
+    @XmlAttribute
+    private Boolean ignoreSurroundingSpaces;
+    @XmlAttribute
+    private Boolean nullStringDisabled;
+    @XmlAttribute
+    private String nullString;
+    @XmlAttribute
+    private Boolean quoteDisabled;
+    @XmlAttribute
+    private String quote;
+    @XmlAttribute
+    private String recordSeparatorDisabled;
+    @XmlAttribute
+    private String recordSeparator;
+    @XmlAttribute
+    private Boolean skipHeaderRecord;
+
+    // Unmarshall options
+    @XmlAttribute
+    private Boolean lazyLoad;
+    @XmlAttribute
+    private Boolean useMaps;
+    @XmlAttribute
+    private String recordConverterRef;
+
+    public CsvDataFormat() {
+        super("csv");
+    }
+
+    public CsvDataFormat(String delimiter) {
+        this();
+        setDelimiter(delimiter);
+    }
+
+    public CsvDataFormat(boolean lazyLoad) {
+        this();
+        setLazyLoad(lazyLoad);
+    }
+
+    @Override
+    protected void configureDataFormat(DataFormat dataFormat, CamelContext 
camelContext) {
+        // Format options
+        if (ObjectHelper.isNotEmpty(formatRef)) {
+            Object format = CamelContextHelper.mandatoryLookup(camelContext, 
formatRef);
+            setProperty(camelContext, dataFormat, "format", format);
+        } else if (ObjectHelper.isNotEmpty(formatName)) {
+            setProperty(camelContext, dataFormat, "formatName", formatName);
+        }
+        if (commentMarkerDisabled != null) {
+            setProperty(camelContext, dataFormat, "commentMarkerDisabled", 
commentMarkerDisabled);
+        }
+        if (commentMarker != null) {
+            setProperty(camelContext, dataFormat, "commentMarker", 
singleChar(commentMarker, "commentMarker"));
+        }
+        if (delimiter != null) {
+            setProperty(camelContext, dataFormat, "delimiter", 
singleChar(delimiter, "delimiter"));
+        }
+        if (escapeDisabled != null) {
+            setProperty(camelContext, dataFormat, "escapeDisabled", 
escapeDisabled);
+        }
+        if (escape != null) {
+            setProperty(camelContext, dataFormat, "escape", singleChar(escape, 
"escape"));
+        }
+        if (headerDisabled != null) {
+            setProperty(camelContext, dataFormat, "headerDisabled", 
headerDisabled);
+        }
+        if (header != null && !header.isEmpty()) {
+            setProperty(camelContext, dataFormat, "header", header.toArray(new 
String[header.size()]));
+        }
+        if (allowMissingColumnNames != null) {
+            setProperty(camelContext, dataFormat, "allowMissingColumnNames", 
allowMissingColumnNames);
+        }
+        if (ignoreEmptyLines != null) {
+            setProperty(camelContext, dataFormat, "ignoreEmptyLines", 
ignoreEmptyLines);
+        }
+        if (ignoreSurroundingSpaces != null) {
+            setProperty(camelContext, dataFormat, "ignoreSurroundingSpaces", 
ignoreSurroundingSpaces);
+        }
+        if (nullStringDisabled != null) {
+            setProperty(camelContext, dataFormat, "nullStringDisabled", 
nullStringDisabled);
+        }
+        if (nullString != null) {
+            setProperty(camelContext, dataFormat, "nullString", nullString);
+        }
+        if (quoteDisabled != null) {
+            setProperty(camelContext, dataFormat, "quoteDisabled", 
quoteDisabled);
+        }
+        if (quote != null) {
+            setProperty(camelContext, dataFormat, "quote", singleChar(quote, 
"quote"));
+        }
+        if (recordSeparatorDisabled != null) {
+            setProperty(camelContext, dataFormat, "recordSeparatorDisabled", 
recordSeparatorDisabled);
+        }
+        if (recordSeparator != null) {
+            setProperty(camelContext, dataFormat, "recordSeparator", 
recordSeparator);
+        }
+        if (skipHeaderRecord != null) {
+            setProperty(camelContext, dataFormat, "skipHeaderRecord", 
skipHeaderRecord);
+        }
+
+        // Unmarshall options
+        if (lazyLoad != null) {
+            setProperty(camelContext, dataFormat, "lazyLoad", lazyLoad);
+        }
+        if (useMaps != null) {
+            setProperty(camelContext, dataFormat, "useMaps", useMaps);
+        }
+        if (ObjectHelper.isNotEmpty(recordConverterRef)) {
+            Object recordConverter = 
CamelContextHelper.mandatoryLookup(camelContext, recordConverterRef);
+            setProperty(camelContext, dataFormat, "recordConverter", 
recordConverter);
+        }
+    }
+
+    private static Character singleChar(String value, String attributeName) {
+        if (value.length() != 1) {
+            throw new IllegalArgumentException(String.format("The '%s' 
attribute must be exactly one character long.", attributeName));
+        }
+        return value.charAt(0);
+    }
+
+    //region Getters/Setters
+
+    public String getFormatRef() {
+        return formatRef;
+    }
+
+    public void setFormatRef(String formatRef) {
+        this.formatRef = formatRef;
+    }
+
+    public String getFormatName() {
+        return formatName;
+    }
+
+    public void setFormatName(String formatName) {
+        this.formatName = formatName;
+    }
+
+    public Boolean getCommentMarkerDisabled() {
+        return commentMarkerDisabled;
+    }
+
+    public void setCommentMarkerDisabled(Boolean commentMarkerDisabled) {
+        this.commentMarkerDisabled = commentMarkerDisabled;
+    }
+
+    public String getCommentMarker() {
+        return commentMarker;
+    }
+
+    public void setCommentMarker(String commentMarker) {
+        this.commentMarker = commentMarker;
+    }
+
+    public String getDelimiter() {
+        return delimiter;
+    }
+
+    public void setDelimiter(String delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    public Boolean getEscapeDisabled() {
+        return escapeDisabled;
+    }
+
+    public void setEscapeDisabled(Boolean escapeDisabled) {
+        this.escapeDisabled = escapeDisabled;
+    }
+
+    public String getEscape() {
+        return escape;
+    }
+
+    public void setEscape(String escape) {
+        this.escape = escape;
+    }
+
+    public Boolean getHeaderDisabled() {
+        return headerDisabled;
+    }
+
+    public void setHeaderDisabled(Boolean headerDisabled) {
+        this.headerDisabled = headerDisabled;
+    }
+
+    public List<String> getHeader() {
+        return header;
+    }
+
+    public void setHeader(List<String> header) {
+        this.header = header;
+    }
+
+    public Boolean getAllowMissingColumnNames() {
+        return allowMissingColumnNames;
+    }
+
+    public void setAllowMissingColumnNames(Boolean allowMissingColumnNames) {
+        this.allowMissingColumnNames = allowMissingColumnNames;
+    }
+
+    public Boolean getIgnoreEmptyLines() {
+        return ignoreEmptyLines;
+    }
+
+    public void setIgnoreEmptyLines(Boolean ignoreEmptyLines) {
+        this.ignoreEmptyLines = ignoreEmptyLines;
+    }
+
+    public Boolean getIgnoreSurroundingSpaces() {
+        return ignoreSurroundingSpaces;
+    }
+
+    public void setIgnoreSurroundingSpaces(Boolean ignoreSurroundingSpaces) {
+        this.ignoreSurroundingSpaces = ignoreSurroundingSpaces;
+    }
+
+    public Boolean getNullStringDisabled() {
+        return nullStringDisabled;
+    }
+
+    public void setNullStringDisabled(Boolean nullStringDisabled) {
+        this.nullStringDisabled = nullStringDisabled;
+    }
+
+    public String getNullString() {
+        return nullString;
+    }
+
+    public void setNullString(String nullString) {
+        this.nullString = nullString;
+    }
+
+    public Boolean getQuoteDisabled() {
+        return quoteDisabled;
+    }
+
+    public void setQuoteDisabled(Boolean quoteDisabled) {
+        this.quoteDisabled = quoteDisabled;
+    }
+
+    public String getQuote() {
+        return quote;
+    }
+
+    public void setQuote(String quote) {
+        this.quote = quote;
+    }
+
+    public String getRecordSeparatorDisabled() {
+        return recordSeparatorDisabled;
+    }
+
+    public void setRecordSeparatorDisabled(String recordSeparatorDisabled) {
+        this.recordSeparatorDisabled = recordSeparatorDisabled;
+    }
+
+    public String getRecordSeparator() {
+        return recordSeparator;
+    }
+
+    public void setRecordSeparator(String recordSeparator) {
+        this.recordSeparator = recordSeparator;
+    }
+
+    public Boolean getSkipHeaderRecord() {
+        return skipHeaderRecord;
+    }
+
+    public void setSkipHeaderRecord(Boolean skipHeaderRecord) {
+        this.skipHeaderRecord = skipHeaderRecord;
+    }
+
+    public Boolean getLazyLoad() {
+        return lazyLoad;
+    }
+
+    public void setLazyLoad(Boolean lazyLoad) {
+        this.lazyLoad = lazyLoad;
+    }
+
+    public Boolean getUseMaps() {
+        return useMaps;
+    }
+
+    public void setUseMaps(Boolean useMaps) {
+        this.useMaps = useMaps;
+    }
+
+    public String getRecordConverterRef() {
+        return recordConverterRef;
+    }
+
+    public void setRecordConverterRef(String recordConverterRef) {
+        this.recordConverterRef = recordConverterRef;
+    }
+    //endregion
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-csv/pom.xml b/components/camel-csv/pom.xml
index 85be515..bfcc3a9 100644
--- a/components/camel-csv/pom.xml
+++ b/components/camel-csv/pom.xml
@@ -1,66 +1,76 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.camel</groupId>
-    <artifactId>components</artifactId>
-    <version>2.15-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>camel-csv</artifactId>
-  <packaging>bundle</packaging>
-  <name>Camel :: CSV</name>
-  <description>Camel CSV data format support</description>
-
-  <properties>
-       
<camel.osgi.export.pkg>org.apache.camel.dataformat.csv.*</camel.osgi.export.pkg>
-       
<camel.osgi.export.service>org.apache.camel.spi.DataFormatResolver;dataformat=csv</camel.osgi.export.service>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.servicemix.bundles</groupId>
-      <artifactId>org.apache.servicemix.bundles.commons-csv</artifactId>
-      <version>${commons-csv-bundle-version}</version>
-    </dependency>
-
-    <!-- testing -->
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-test-spring</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.15-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-csv</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: CSV</name>
+  <description>Camel CSV data format support</description>
+
+  <properties>
+       
<camel.osgi.export.pkg>org.apache.camel.dataformat.csv.*</camel.osgi.export.pkg>
+       
<camel.osgi.export.service>org.apache.camel.spi.DataFormatResolver;dataformat=csv</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>${commons-csv-version}</version>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core-xml</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
index 7f40a0a..658b647 100644
--- 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
+++ 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
@@ -1,262 +1,698 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.dataformat.csv;
-
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.spi.DataFormat;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVStrategy;
-import org.apache.commons.csv.writer.CSVConfig;
-import org.apache.commons.csv.writer.CSVField;
-import org.apache.commons.csv.writer.CSVWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * CSV Data format.
- * <p/>
- * By default, columns are autogenerated in the resulting CSV. Subsequent
- * messages use the previously created columns with new fields being added at
- * the end of the line. Thus, field order is the same from message to message.
- * Autogeneration can be disabled. In this case, only the fields defined in
- * csvConfig are written on the output.
- *
- * @version
- */
-public class CsvDataFormat implements DataFormat {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(CsvDataFormat.class);
-
-    private CSVStrategy strategy = 
cloneCSVStrategyIfNecessary(CSVStrategy.DEFAULT_STRATEGY);
-    private CSVConfig config = new CSVConfig();
-    private boolean autogenColumns = true;
-    private String delimiter;
-    private boolean skipFirstLine;
-    /**
-     * Lazy row loading with iterator for big files.
-     */
-    private boolean lazyLoad;
-    private boolean useMaps;
-
-    private static CSVStrategy cloneCSVStrategyIfNecessary(CSVStrategy 
csvStrategy) {
-        for (Field field : CSVStrategy.class.getFields()) {
-            try {
-                if (field.get(null) == csvStrategy) {
-                    // return a safe copy of the declared static constant so 
that we don't cause any side effect
-                    // by (potentially) other CsvDataFormat objects in use, as 
we change the properties of the
-                    // strategy itself (e.g. it's set delimiter through the 
#unmarshal() method below)
-                    LOGGER.debug("Returning a clone of {} as it is the 
declared constant {} by the CSVStrategy class", csvStrategy, field.getName());
-
-                    return (CSVStrategy) csvStrategy.clone();
-                }
-            } catch (Exception e) {
-                ObjectHelper.wrapRuntimeCamelException(e);
-            }
-        }
-
-        // not a declared static constant of CSVStrategy so return it as is
-        return csvStrategy;
-    }
-
-    public void marshal(Exchange exchange, Object object, OutputStream 
outputStream) throws Exception {
-        if (delimiter != null) {
-            config.setDelimiter(delimiter.charAt(0));
-        }
-
-        OutputStreamWriter out = new OutputStreamWriter(outputStream, 
IOHelper.getCharsetName(exchange));
-        CSVWriter csv = new CSVWriter(config);
-        csv.setWriter(out);
-
-        try {
-            List<?> list = ExchangeHelper.convertToType(exchange, List.class, 
object);
-            if (list != null) {
-                for (Object child : list) {
-                    Map<?, ?> row = 
ExchangeHelper.convertToMandatoryType(exchange, Map.class, child);
-                    doMarshalRecord(exchange, row, out, csv);
-                }
-            } else {
-                Map<?, ?> row = 
ExchangeHelper.convertToMandatoryType(exchange, Map.class, object);
-                doMarshalRecord(exchange, row, out, csv);
-            }
-        } finally {
-            IOHelper.close(out);
-        }
-    }
-
-    private void doMarshalRecord(Exchange exchange, Map<?, ?> row, Writer out, 
CSVWriter csv) throws Exception {
-        if (autogenColumns) {
-            // no specific config has been set so lets add fields
-            Set<?> set = row.keySet();
-            updateFieldsInConfig(set, exchange);
-        }
-        csv.writeRecord(row);
-    }
-
-    public Object unmarshal(Exchange exchange, InputStream inputStream) throws 
Exception {
-        if (delimiter != null) {
-            config.setDelimiter(delimiter.charAt(0));
-        }
-        strategy.setDelimiter(config.getDelimiter());
-
-        Reader reader = null;
-        boolean error = false;
-        try {
-            reader = IOHelper.buffered(new InputStreamReader(inputStream, 
IOHelper.getCharsetName(exchange)));
-            CSVParser parser = new CSVParser(reader, strategy);
-            if (skipFirstLine) {
-                // read one line ahead and skip it
-                parser.getLine();
-            }
-            CsvLineConverter<?> lineConverter;
-            if (useMaps) {
-                final CSVField[] fields = this.config.getFields();
-                final String[] fieldS;
-                if (fields != null && fields.length > 0) {
-                    fieldS = new String[fields.length];
-                    for (int i = 0; i < fields.length; i++) {
-                        fieldS[i] = fields[i].getName();
-                    }
-                } else {
-                    fieldS = parser.getLine();
-                }
-                lineConverter = CsvLineConverters.getMapLineConverter(fieldS);
-            } else {
-                lineConverter = CsvLineConverters.getListConverter();
-            }
-
-            @SuppressWarnings({"unchecked", "rawtypes"}) CsvIterator<?> 
csvIterator = new CsvIterator(parser, reader, lineConverter);
-            return lazyLoad ? csvIterator : loadAllAsList(csvIterator);
-        } catch (Exception e) {
-            error = true;
-            throw e;
-        } finally {
-            if (error) {
-                IOHelper.close(reader);
-            }
-        }
-    }
-
-    private <T> List<T> loadAllAsList(CsvIterator<T> iter) {
-        try {
-            List<T> list = new ArrayList<T>();
-            while (iter.hasNext()) {
-                list.add(iter.next());
-            }
-            return list;
-        } finally {
-            // close the iterator (which would also close the reader) as we've 
loaded all the data upfront
-            IOHelper.close(iter);
-        }
-    }
-
-    public String getDelimiter() {
-        return delimiter;
-    }
-
-    public void setDelimiter(String delimiter) {
-        if (delimiter != null && delimiter.length() > 1) {
-            throw new IllegalArgumentException("Delimiter must have a length 
of one!");
-        }
-        this.delimiter = delimiter;
-    }
-
-    public CSVConfig getConfig() {
-        return config;
-    }
-
-    public void setConfig(CSVConfig config) {
-        this.config = config;
-    }
-
-    public CSVStrategy getStrategy() {
-        return strategy;
-    }
-
-    public void setStrategy(CSVStrategy strategy) {
-        this.strategy = cloneCSVStrategyIfNecessary(strategy);
-    }
-
-    public boolean isAutogenColumns() {
-        return autogenColumns;
-    }
-
-    /**
-     * Auto generate columns.
-     *
-     * @param autogenColumns set to false to disallow column autogeneration 
(default true)
-     */
-    public void setAutogenColumns(boolean autogenColumns) {
-        this.autogenColumns = autogenColumns;
-    }
-
-    public boolean isSkipFirstLine() {
-        return skipFirstLine;
-    }
-
-    public void setSkipFirstLine(boolean skipFirstLine) {
-        this.skipFirstLine = skipFirstLine;
-    }
-
-    public boolean isLazyLoad() {
-        return lazyLoad;
-    }
-
-    public void setLazyLoad(boolean lazyLoad) {
-        this.lazyLoad = lazyLoad;
-    }
-
-    public boolean isUseMaps() {
-        return useMaps;
-    }
-
-    /**
-     * Sets whether or not the result of the unmarshalling should be a {@code 
java.util.Map} instead of a {@code java.util.List}. It uses the first line as a
-     * header line and uses it as keys of the maps.
-     *
-     * @param useMaps {@code true} in order to use {@code java.util.Map} 
instead of {@code java.util.List}, {@code false} otherwise.
-     */
-    public void setUseMaps(boolean useMaps) {
-        this.useMaps = useMaps;
-    }
-
-    private synchronized void updateFieldsInConfig(Set<?> set, Exchange 
exchange) {
-        for (Object value : set) {
-            if (value != null) {
-                String text = 
exchange.getContext().getTypeConverter().convertTo(String.class, value);
-                // do not add field twice
-                if (config.getField(text) == null) {
-                    CSVField field = new CSVField(text);
-                    config.addField(field);
-                }
-            }
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.csv;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.DataFormat;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+
+/**
+ * CSV Data format.
+ * <p/>
+ * By default, columns are autogenerated in the resulting CSV. Subsequent
+ * messages use the previously created columns with new fields being added at
+ * the end of the line. Thus, field order is the same from message to message.
+ * Autogeneration can be disabled. In this case, only the fields defined in
+ * csvConfig are written on the output.
+ */
+public class CsvDataFormat implements DataFormat {
+    // CSV format options
+    private CSVFormat format = CSVFormat.DEFAULT;
+    private boolean commentMarkerDisabled;
+    private Character commentMarker;
+    private Character delimiter;
+    private boolean escapeDisabled;
+    private Character escape;
+    private boolean headerDisabled;
+    private String[] header;
+    private Boolean allowMissingColumnNames;
+    private Boolean ignoreEmptyLines;
+    private Boolean ignoreSurroundingSpaces;
+    private boolean nullStringDisabled;
+    private String nullString;
+    private boolean quoteDisabled;
+    private Character quote;
+    private QuoteMode quoteMode;
+    private boolean recordSeparatorDisabled;
+    private String recordSeparator;
+    private Boolean skipHeaderRecord;
+
+    // Unmarshal options
+    private boolean lazyLoad;
+    private boolean useMaps;
+    private CsvRecordConverter<?> recordConverter;
+
+    private volatile CsvMarshaller marshaller;
+    private volatile CsvUnmarshaller unmarshaller;
+
+    public CsvDataFormat() {
+    }
+
+    public CsvDataFormat(CSVFormat format) {
+        setFormat(format);
+    }
+
+    public void marshal(Exchange exchange, Object object, OutputStream 
outputStream) throws Exception {
+        if (marshaller == null) {
+            marshaller = CsvMarshaller.create(getActiveFormat(), this);
+        }
+        marshaller.marshal(exchange, object, outputStream);
+    }
+
+    public Object unmarshal(Exchange exchange, InputStream inputStream) throws 
Exception {
+        if (unmarshaller == null) {
+            unmarshaller = CsvUnmarshaller.create(getActiveFormat(), this);
+        }
+        return unmarshaller.unmarshal(exchange, inputStream);
+    }
+
+    CSVFormat getActiveFormat() {
+        CSVFormat answer = format;
+
+        if (commentMarkerDisabled) {
+            answer = answer.withCommentMarker(null); // null disables the 
comment marker
+        } else if (commentMarker != null) {
+            answer = answer.withCommentMarker(commentMarker);
+        }
+
+        if (delimiter != null) {
+            answer = answer.withDelimiter(delimiter);
+        }
+
+        if (escapeDisabled) {
+            answer = answer.withEscape(null); // null disables the escape
+        } else if (escape != null) {
+            answer = answer.withEscape(escape);
+        }
+
+        if (headerDisabled) {
+            answer = answer.withHeader((String[]) null); // null disables the 
header
+        } else if (header != null) {
+            answer = answer.withHeader(header);
+        }
+
+        if (allowMissingColumnNames != null) {
+            answer = 
answer.withAllowMissingColumnNames(allowMissingColumnNames);
+        }
+
+        if (ignoreEmptyLines != null) {
+            answer = answer.withIgnoreEmptyLines(ignoreEmptyLines);
+        }
+
+        if (ignoreSurroundingSpaces != null) {
+            answer = 
answer.withIgnoreSurroundingSpaces(ignoreSurroundingSpaces);
+        }
+
+        if (nullStringDisabled) {
+            answer = answer.withNullString(null); // null disables the null 
string replacement
+        } else if (nullString != null) {
+            answer = answer.withNullString(nullString);
+        }
+
+        if (quoteDisabled) {
+            answer = answer.withQuote(null); // null disables quotes
+        } else if (quote != null) {
+            answer = answer.withQuote(quote);
+        }
+
+        if (quoteMode != null) {
+            answer = answer.withQuoteMode(quoteMode);
+        }
+
+        if (recordSeparatorDisabled) {
+            answer = answer.withRecordSeparator(null); // null disables the 
record separator
+        } else if (recordSeparator != null) {
+            answer = answer.withRecordSeparator(recordSeparator);
+        }
+
+        if (skipHeaderRecord != null) {
+            answer = answer.withSkipHeaderRecord(skipHeaderRecord);
+        }
+
+        return answer;
+    }
+
+    private void reset() {
+        marshaller = null;
+        unmarshaller = null;
+    }
+
+    //region Getters/Setters
+
+    /**
+     * Gets the CSV format before applying any changes.
+     * It cannot be {@code null}, the default one is {@link 
org.apache.commons.csv.CSVFormat#DEFAULT}.
+     *
+     * @return CSV format
+     */
+    public CSVFormat getFormat() {
+        return format;
+    }
+
+    /**
+     * Sets the CSV format before applying any changes.
+     * If {@code null}, then {@link org.apache.commons.csv.CSVFormat#DEFAULT} 
is used instead.
+     *
+     * @param format CSV format
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat
+     * @see org.apache.commons.csv.CSVFormat#DEFAULT
+     */
+    public CsvDataFormat setFormat(CSVFormat format) {
+        this.format = (format == null) ? CSVFormat.DEFAULT : format;
+        reset();
+        return this;
+    }
+
+    /**
+     * Sets the CSV format by name before applying any changes.
+     *
+     * @param name CSV format name
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see #setFormat(org.apache.commons.csv.CSVFormat)
+     * @see org.apache.commons.csv.CSVFormat
+     */
+    public CsvDataFormat setFormatName(String name) {
+        if (name == null) {
+            setFormat(null);
+        } else if ("DEFAULT".equals(name)) {
+            setFormat(CSVFormat.DEFAULT);
+        } else if ("RFC4180".equals(name)) {
+            setFormat(CSVFormat.RFC4180);
+        } else if ("EXCEL".equals(name)) {
+            setFormat(CSVFormat.EXCEL);
+        } else if ("TDF".equals(name)) {
+            setFormat(CSVFormat.TDF);
+        } else if ("MYSQL".equals(name)) {
+            setFormat(CSVFormat.MYSQL);
+        } else {
+            throw new IllegalArgumentException("Unsupported format");
+        }
+        return this;
+    }
+
+    /**
+     * Indicates whether or not the comment markers are disabled.
+     *
+     * @return {@code true} if the comment markers are disabled, {@code false} 
otherwise
+     */
+    public boolean isCommentMarkerDisabled() {
+        return commentMarkerDisabled;
+    }
+
+    /**
+     * Sets whether or not the comment markers are disabled.
+     *
+     * @param commentMarkerDisabled {@code true} if the comment markers are 
disabled, {@code false} otherwise
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see 
org.apache.commons.csv.CSVFormat#withCommentMarker(java.lang.Character)
+     */
+    public CsvDataFormat setCommentMarkerDisabled(boolean 
commentMarkerDisabled) {
+        this.commentMarkerDisabled = commentMarkerDisabled;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the comment marker.
+     * If {@code null} then the default one of the format used.
+     *
+     * @return Comment marker
+     */
+    public Character getCommentMarker() {
+        return commentMarker;
+    }
+
+    /**
+     * Sets the comment marker to use.
+     * If {@code null} then the default one of the format used.
+     *
+     * @param commentMarker Comment marker
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withCommentMarker(Character)
+     */
+    public CsvDataFormat setCommentMarker(Character commentMarker) {
+        this.commentMarker = commentMarker;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the delimiter.
+     * If {@code null} then the default one of the format used.
+     *
+     * @return Delimiter
+     */
+    public Character getDelimiter() {
+        return delimiter;
+    }
+
+    /**
+     * Sets the delimiter.
+     * If {@code null} then the default one of the format used.
+     *
+     * @param delimiter Delimiter
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withDelimiter(char)
+     */
+    public CsvDataFormat setDelimiter(Character delimiter) {
+        this.delimiter = delimiter;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not the escaping is disabled.
+     *
+     * @return {@code true} if the escaping is disabled, {@code false} 
otherwise
+     */
+    public boolean isEscapeDisabled() {
+        return escapeDisabled;
+    }
+
+    /**
+     * Sets whether or not the escaping is disabled.
+     *
+     * @param escapeDisabled {@code true} if the escaping is disabled, {@code 
false} otherwise
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withEscape(Character)
+     */
+    public CsvDataFormat setEscapeDisabled(boolean escapeDisabled) {
+        this.escapeDisabled = escapeDisabled;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the escape character.
+     * If {@code null} then the default one of the format used.
+     *
+     * @return Escape character
+     */
+    public Character getEscape() {
+        return escape;
+    }
+
+    /**
+     * Sets the escape character.
+     * If {@code null} then the default one of the format used.
+     *
+     * @param escape Escape character
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withEscape(Character)
+     */
+    public CsvDataFormat setEscape(Character escape) {
+        this.escape = escape;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not the headers are disabled.
+     *
+     * @return {@code true} if the headers are disabled, {@code false} 
otherwise
+     */
+    public boolean isHeaderDisabled() {
+        return headerDisabled;
+    }
+
+    /**
+     * Sets whether or not the headers are disabled.
+     *
+     * @param headerDisabled {@code true} if the headers are disabled, {@code 
false} otherwise
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withHeader(String...)
+     */
+    public CsvDataFormat setHeaderDisabled(boolean headerDisabled) {
+        this.headerDisabled = headerDisabled;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the header.
+     * If {@code null} then the default one of the format used. If empty then 
it will be automatically handled.
+     *
+     * @return Header
+     */
+    public String[] getHeader() {
+        return header;
+    }
+
+    /**
+     * Gets the header.
+     * If {@code null} then the default one of the format used. If empty then 
it will be automatically handled.
+     *
+     * @param header Header
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withHeader(String...)
+     */
+    public CsvDataFormat setHeader(String[] header) {
+        this.header = Arrays.copyOf(header, header.length);
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not missing column names are allowed.
+     * If {@code null} then the default value of the format used.
+     *
+     * @return Whether or not missing column names are allowed
+     */
+    public Boolean getAllowMissingColumnNames() {
+        return allowMissingColumnNames;
+    }
+
+    /**
+     * Sets whether or not missing column names are allowed.
+     * If {@code null} then the default value of the format used.
+     *
+     * @param allowMissingColumnNames Whether or not missing column names are 
allowed
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see 
org.apache.commons.csv.CSVFormat#withAllowMissingColumnNames(boolean)
+     */
+    public CsvDataFormat setAllowMissingColumnNames(Boolean 
allowMissingColumnNames) {
+        this.allowMissingColumnNames = allowMissingColumnNames;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not empty lines must be ignored.
+     * If {@code null} then the default value of the format used.
+     *
+     * @return Whether or not empty lines must be ignored
+     */
+    public Boolean getIgnoreEmptyLines() {
+        return ignoreEmptyLines;
+    }
+
+    /**
+     * Sets whether or not empty lines must be ignored.
+     * If {@code null} then the default value of the format used.
+     *
+     * @param ignoreEmptyLines Whether or not empty lines must be ignored
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withIgnoreEmptyLines(boolean)
+     */
+    public CsvDataFormat setIgnoreEmptyLines(Boolean ignoreEmptyLines) {
+        this.ignoreEmptyLines = ignoreEmptyLines;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not surrounding spaces must be ignored.
+     * If {@code null} then the default value of the format used.
+     *
+     * @return Whether or not surrounding spaces must be ignored
+     */
+    public Boolean getIgnoreSurroundingSpaces() {
+        return ignoreSurroundingSpaces;
+    }
+
+    /**
+     * Sets whether or not surrounding spaces must be ignored.
+     * If {@code null} then the default value of the format used.
+     *
+     * @param ignoreSurroundingSpaces Whether or not surrounding spaces must 
be ignored
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see 
org.apache.commons.csv.CSVFormat#withIgnoreSurroundingSpaces(boolean)
+     */
+    public CsvDataFormat setIgnoreSurroundingSpaces(Boolean 
ignoreSurroundingSpaces) {
+        this.ignoreSurroundingSpaces = ignoreSurroundingSpaces;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not the null string replacement is disabled.
+     *
+     * @return {@code true} if the null string replacement is disabled, {@code 
false} otherwise
+     */
+    public boolean isNullStringDisabled() {
+        return nullStringDisabled;
+    }
+
+    /**
+     * Sets whether or not the null string replacement is disabled.
+     *
+     * @param nullStringDisabled {@code true} if the null string replacement 
is disabled, {@code false} otherwise
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withNullString(String)
+     */
+    public CsvDataFormat setNullStringDisabled(boolean nullStringDisabled) {
+        this.nullStringDisabled = nullStringDisabled;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the null string replacement.
+     * If {@code null} then the default one of the format used.
+     *
+     * @return Null string replacement
+     */
+    public String getNullString() {
+        return nullString;
+    }
+
+    /**
+     * Sets the null string replacement.
+     * If {@code null} then the default one of the format used.
+     *
+     * @param nullString Null string replacement
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withNullString(String)
+     */
+    public CsvDataFormat setNullString(String nullString) {
+        this.nullString = nullString;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not quotes are disabled.
+     *
+     * @return {@code true} if quotes are disabled, {@code false} otherwise
+     */
+    public boolean isQuoteDisabled() {
+        return quoteDisabled;
+    }
+
+    /**
+     * Sets whether or not quotes are disabled
+     *
+     * @param quoteDisabled {@code true} if quotes are disabled, {@code false} 
otherwise
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withQuote(Character)
+     */
+    public CsvDataFormat setQuoteDisabled(boolean quoteDisabled) {
+        this.quoteDisabled = quoteDisabled;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the quote character.
+     * If {@code null} then the default one of the format used.
+     *
+     * @return Quote character
+     */
+    public Character getQuote() {
+        return quote;
+    }
+
+    /**
+     * Sets the quote character.
+     * If {@code null} then the default one of the format used.
+     *
+     * @param quote Quote character
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withQuote(Character)
+     */
+    public CsvDataFormat setQuote(Character quote) {
+        this.quote = quote;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the quote mode.
+     * If {@code null} then the default one of the format used.
+     *
+     * @return Quote mode
+     */
+    public QuoteMode getQuoteMode() {
+        return quoteMode;
+    }
+
+    /**
+     * Sets the quote mode.
+     * If {@code null} then the default one of the format used.
+     *
+     * @param quoteMode Quote mode
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see 
org.apache.commons.csv.CSVFormat#withQuoteMode(org.apache.commons.csv.QuoteMode)
+     */
+    public CsvDataFormat setQuoteMode(QuoteMode quoteMode) {
+        this.quoteMode = quoteMode;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not the record separator is disabled.
+     *
+     * @return {@code true} if the record separator disabled, {@code false} 
otherwise
+     */
+    public boolean isRecordSeparatorDisabled() {
+        return recordSeparatorDisabled;
+    }
+
+    /**
+     * Sets whether or not the record separator is disabled.
+     *
+     * @param recordSeparatorDisabled {@code true} if the record separator 
disabled, {@code false} otherwise
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withRecordSeparator(String)
+     */
+    public CsvDataFormat setRecordSeparatorDisabled(boolean 
recordSeparatorDisabled) {
+        this.recordSeparatorDisabled = recordSeparatorDisabled;
+        reset();
+        return this;
+    }
+
+    /**
+     * Gets the record separator.
+     * If {@code null} then the default one of the format used.
+     *
+     * @return Record separator
+     */
+    public String getRecordSeparator() {
+        return recordSeparator;
+    }
+
+    /**
+     * Sets the record separator.
+     * If {@code null} then the default one of the format used.
+     *
+     * @param recordSeparator Record separator
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withRecordSeparator(String)
+     */
+    public CsvDataFormat setRecordSeparator(String recordSeparator) {
+        this.recordSeparator = recordSeparator;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not header record must be skipped.
+     * If {@code null} then the default value of the format used.
+     *
+     * @return Whether or not header record must be skipped
+     */
+    public Boolean getSkipHeaderRecord() {
+        return skipHeaderRecord;
+    }
+
+    /**
+     * Sets whether or not header record must be skipped.
+     * If {@code null} then the default value of the format used.
+     *
+     * @param skipHeaderRecord Whether or not header record must be skipped
+     * @return Current {@code CsvDataFormat}, fluent API
+     * @see org.apache.commons.csv.CSVFormat#withSkipHeaderRecord(boolean)
+     */
+    public CsvDataFormat setSkipHeaderRecord(Boolean skipHeaderRecord) {
+        this.skipHeaderRecord = skipHeaderRecord;
+        reset();
+        return this;
+    }
+
+    /**
+     * Indicates whether or not the unmarshalling should lazily load the 
records.
+     *
+     * @return {@code true} for lazy loading, {@code false} otherwise
+     */
+    public boolean isLazyLoad() {
+        return lazyLoad;
+    }
+
+    /**
+     * Indicates whether or not the unmarshalling should lazily load the 
records.
+     *
+     * @param lazyLoad {@code true} for lazy loading, {@code false} otherwise
+     * @return Current {@code CsvDataFormat}, fluent API
+     */
+    public CsvDataFormat setLazyLoad(boolean lazyLoad) {
+        this.lazyLoad = lazyLoad;
+        return this;
+    }
+
+    /**
+     * Indicates whether or not the unmarshalling should produce maps instead 
of lists.
+     *
+     * @return {@code true} for maps, {@code false} for lists
+     */
+    public boolean isUseMaps() {
+        return useMaps;
+    }
+
+    /**
+     * Sets whether or not the unmarshalling should produce maps instead of 
lists.
+     *
+     * @param useMaps {@code true} for maps, {@code false} for lists
+     * @return Current {@code CsvDataFormat}, fluent API
+     */
+    public CsvDataFormat setUseMaps(boolean useMaps) {
+        this.useMaps = useMaps;
+        return this;
+    }
+
+    /**
+     * Gets the record converter to use. If {@code null} then it will use 
{@link CsvDataFormat#isUseMaps()} for finding
+     * the proper converter.
+     *
+     * @return Record converter to use
+     */
+    public CsvRecordConverter<?> getRecordConverter() {
+        return recordConverter;
+    }
+
+    /**
+     * Sets the record converter to use. If {@code null} then it will use 
{@link CsvDataFormat#isUseMaps()} for finding
+     * the proper converter.
+     *
+     * @param recordConverter Record converter to use
+     * @return Current {@code CsvDataFormat}, fluent API
+     */
+    public CsvDataFormat setRecordConverter(CsvRecordConverter<?> 
recordConverter) {
+        this.recordConverter = recordConverter;
+        return this;
+    }
+
+    //endregion
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
deleted file mode 100644
index 9d26edb..0000000
--- 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.dataformat.csv;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Reader;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.camel.util.IOHelper;
-import org.apache.commons.csv.CSVParser;
-
-/**
- */
-public class CsvIterator<T> implements Iterator<T>, Closeable {
-
-    private final CSVParser parser;
-    private final Reader reader;
-    private final CsvLineConverter<T> lineConverter;
-    private String[] line;
-
-    public CsvIterator(CSVParser parser, Reader reader, CsvLineConverter<T> 
lineConverter) throws IOException {
-        this.parser = parser;
-        this.reader = reader;
-        this.lineConverter = lineConverter;
-        line = parser.getLine();
-    }
-
-    @Override
-    public boolean hasNext() {
-        return line != null;
-    }
-
-    @Override
-    public T next() {
-        if (!hasNext()) {
-            throw new NoSuchElementException();
-        }
-        T result = lineConverter.convertLine(line);
-        try {
-            line = parser.getLine();
-        } catch (IOException e) {
-            line = null;
-            close();
-            throw new IllegalStateException(e);
-        }
-        if (line == null) {
-            close();
-        }
-        return result;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-        IOHelper.close(reader);
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverter.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverter.java
deleted file mode 100644
index 8bc3c67..0000000
--- 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.dataformat.csv;
-
-/**
- * This interface helps converting a single CSV line into another 
representation.
- *
- * @param <T> Class for representing a single line
- */
-public interface CsvLineConverter<T> {
-
-    /**
-     * Converts a single CSV line.
-     *
-     * @param line CSV line
-     * @return Another representation of the CSV line
-     */
-    T convertLine(String[] line);
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverters.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverters.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverters.java
deleted file mode 100644
index bba71d4..0000000
--- 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvLineConverters.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.dataformat.csv;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * This {@code CsvLineConverters} class provides common implementations of the 
{@code CsvLineConverter} interface.
- */
-public final class CsvLineConverters {
-
-    private CsvLineConverters() {
-        // Prevent instantiation
-    }
-    /**
-     * Provides an implementation of {@code CsvLineConverter} that converts a 
line into a {@code List}.
-     *
-     * @return List-based {@code CsvLineConverter} implementation
-     */
-    public static CsvLineConverter<List<String>> getListConverter() {
-        return ListLineConverter.SINGLETON;
-    }
-
-    /**
-     * Provides an implementation of {@code CsvLineConverter} that converts a 
line into a {@code Map}.
-     * <p/>
-     * It requires to have unique {@code headers} values as well as the same 
number of item in each line.
-     *
-     * @param headers Headers of the CSV file
-     * @return Map-based {@code CsvLineConverter} implementation
-     */
-    public static CsvLineConverter<Map<String, String>> 
getMapLineConverter(String[] headers) {
-        return new MapLineConverter(headers);
-    }
-
-    private static final class ListLineConverter implements 
CsvLineConverter<List<String>> {
-        public static final ListLineConverter SINGLETON = new 
ListLineConverter();
-
-        @Override
-        public List<String> convertLine(String[] line) {
-            return Arrays.asList(line);
-        }
-    }
-
-    private static final class MapLineConverter implements 
CsvLineConverter<Map<String, String>> {
-        private final String[] headers;
-
-        private MapLineConverter(String[] headers) {
-            this.headers = checkHeaders(headers);
-        }
-
-        @Override
-        public Map<String, String> convertLine(String[] line) {
-            if (line.length != headers.length) {
-                throw new IllegalStateException("This line does not have the 
same number of items than the header");
-            }
-
-            Map<String, String> result = new HashMap<String, 
String>(line.length);
-            for (int i = 0; i < line.length; i++) {
-                result.put(headers[i], line[i]);
-            }
-            return result;
-        }
-
-        private static String[] checkHeaders(String[] headers) {
-            // Check that we have headers
-            if (headers == null || headers.length == 0) {
-                throw new IllegalArgumentException("Missing headers for the 
CSV parsing");
-            }
-
-            // Check that there is no duplicates
-            Set<String> headerSet = new HashSet<String>(headers.length);
-            Collections.addAll(headerSet, headers);
-            if (headerSet.size() != headers.length) {
-                throw new IllegalArgumentException("There are duplicate 
headers");
-            }
-
-            return headers;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvMarshaller.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvMarshaller.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvMarshaller.java
new file mode 100644
index 0000000..d845128
--- /dev/null
+++ 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvMarshaller.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.csv;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.util.ExchangeHelper;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+
+/**
+ * This class marshal data into a CSV format.
+ */
+abstract class CsvMarshaller {
+    /**
+     * Creates a new instance.
+     *
+     * @param format     CSV format
+     * @param dataFormat Camel CSV data format
+     * @return New instance
+     */
+    public static CsvMarshaller create(CSVFormat format, CsvDataFormat 
dataFormat) {
+        // If we don't want the header record, clear it
+        if (format.getSkipHeaderRecord()) {
+            format = format.withHeader((String[]) null);
+        }
+
+        String[] fixedColumns = dataFormat.getHeader();
+        if (fixedColumns != null && fixedColumns.length > 0) {
+            return new FixedColumnsMarshaller(format, fixedColumns);
+        }
+        return new DynamicColumnsMarshaller(format);
+    }
+
+    private final CSVFormat format;
+
+    private CsvMarshaller(CSVFormat format) {
+        this.format = format;
+    }
+
+    /**
+     * Marshals the given object into the given stream.
+     *
+     * @param exchange     Exchange (used for access to type conversion)
+     * @param object       Body to marshal
+     * @param outputStream Output stream of the CSV
+     * @throws NoTypeConversionAvailableException if the body cannot be 
converted
+     * @throws IOException                        if we cannot write into the 
given stream
+     */
+    public void marshal(Exchange exchange, Object object, OutputStream 
outputStream) throws NoTypeConversionAvailableException, IOException {
+        try (CSVPrinter printer = new CSVPrinter(new 
OutputStreamWriter(outputStream), format)) {
+            List list = ExchangeHelper.convertToType(exchange, List.class, 
object);
+            if (list != null) {
+                for (Object child : list) {
+                    printer.printRecord(getRecordValues(exchange, child));
+                }
+            } else {
+                printer.printRecord(getRecordValues(exchange, object));
+            }
+        }
+    }
+
+    private Iterable<?> getRecordValues(Exchange exchange, Object data) throws 
NoTypeConversionAvailableException {
+        Map map = ExchangeHelper.convertToType(exchange, Map.class, data);
+        if (map != null) {
+            return getMapRecordValues(map);
+        }
+        return ExchangeHelper.convertToMandatoryType(exchange, List.class, 
data);
+    }
+
+    /**
+     * Gets the CSV record values of the given map.
+     *
+     * @param map Input map
+     * @return CSV record values of the given map
+     */
+    protected abstract Iterable<?> getMapRecordValues(Map map);
+
+    //region Implementations
+
+    /**
+     * This marshaller has fixed columns
+     */
+    private static class FixedColumnsMarshaller extends CsvMarshaller {
+        private final String[] fixedColumns;
+
+        private FixedColumnsMarshaller(CSVFormat format, String[] 
fixedColumns) {
+            super(format);
+            this.fixedColumns = Arrays.copyOf(fixedColumns, 
fixedColumns.length);
+        }
+
+        @Override
+        protected Iterable<?> getMapRecordValues(Map map) {
+            List<Object> result = new ArrayList<>(fixedColumns.length);
+            for (String key : fixedColumns) {
+                result.add(map.get(key));
+            }
+            return result;
+        }
+    }
+
+    /**
+     * This marshaller adapts the columns but always keep them in the same 
order
+     */
+    private static class DynamicColumnsMarshaller extends CsvMarshaller {
+        private final LinkedHashSet<Object> columns = new LinkedHashSet<>();
+
+        private DynamicColumnsMarshaller(CSVFormat format) {
+            super(format);
+        }
+
+        @Override
+        protected Iterable<?> getMapRecordValues(Map map) {
+            columns.addAll(map.keySet());
+            List<Object> result = new ArrayList<>(columns.size());
+            for (Object key : columns) {
+                result.add(map.get(key));
+            }
+            return result;
+        }
+    }
+    //endregion
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverter.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverter.java
new file mode 100644
index 0000000..01abf9b
--- /dev/null
+++ 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverter.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.csv;
+
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * This interface is used to define a converter that transform a {@link 
org.apache.commons.csv.CSVRecord} into another
+ * type.
+ * <p/>
+ * The {@link org.apache.camel.dataformat.csv.CsvRecordConverters} class 
defines common converters.
+ *
+ * @param <T> Conversion type
+ * @see org.apache.camel.dataformat.csv.CsvRecordConverters
+ */
+interface CsvRecordConverter<T> {
+    /**
+     * Converts the CSV record into another type.
+     *
+     * @param record CSV record to convert
+     * @return converted CSV record
+     */
+    T convertRecord(CSVRecord record);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverters.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverters.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverters.java
new file mode 100644
index 0000000..a3a28a8
--- /dev/null
+++ 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvRecordConverters.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.csv;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * This class defines common {@link CsvRecordConverter} implementations.
+ *
+ * @see CsvRecordConverter
+ */
+final class CsvRecordConverters {
+    private CsvRecordConverters() {
+        // Prevent instantiation
+    }
+
+    /**
+     * Returns a converter that transforms the CSV record into a list.
+     *
+     * @return converter that transforms the CSV record into a list
+     */
+    public static CsvRecordConverter<List<String>> listConverter() {
+        return ListCsvRecordConverter.SINGLETON;
+    }
+
+    private static final class ListCsvRecordConverter implements 
CsvRecordConverter<List<String>> {
+        private static final ListCsvRecordConverter SINGLETON = new 
ListCsvRecordConverter();
+
+        @Override
+        public List<String> convertRecord(CSVRecord record) {
+            List<String> answer = new ArrayList<>(record.size());
+            for (int i = 0; i < record.size(); i++) {
+                answer.add(record.get(i));
+            }
+            return answer;
+        }
+    }
+
+    /**
+     * Returns a converter that transforms the CSV record into a map.
+     *
+     * @return converter that transforms the CSV record into a map
+     */
+    public static CsvRecordConverter<Map<String, String>> mapConverter() {
+        return MapCsvRecordConverter.SINGLETON;
+    }
+
+    private static class MapCsvRecordConverter implements 
CsvRecordConverter<Map<String, String>> {
+        private static final MapCsvRecordConverter SINGLETON = new 
MapCsvRecordConverter();
+
+        @Override
+        public Map<String, String> convertRecord(CSVRecord record) {
+            return record.toMap();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3af1165a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
----------------------------------------------------------------------
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
new file mode 100644
index 0000000..cfbd324
--- /dev/null
+++ 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.csv;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.util.IOHelper;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+/**
+ * This class unmarshal CSV into lists or maps depending on the configuration.
+ */
+abstract class CsvUnmarshaller {
+    public static CsvUnmarshaller create(CSVFormat format, CsvDataFormat 
dataFormat) {
+        // If we want to use maps, thus the header must be either fixed or 
automatic
+        if (dataFormat.isUseMaps() && format.getHeader() == null) {
+            format = format.withHeader();
+        }
+        // If we want to skip the header record it must automatic otherwise 
it's not working
+        if (format.getSkipHeaderRecord() && format.getHeader() == null) {
+            format = format.withHeader();
+        }
+
+        if (dataFormat.isLazyLoad()) {
+            return new StreamCsvUnmarshaller(format, dataFormat);
+        }
+        return new BulkCsvUnmarshaller(format, dataFormat);
+    }
+
+    protected final CSVFormat format;
+    protected final CsvRecordConverter<?> converter;
+
+    private CsvUnmarshaller(CSVFormat format, CsvDataFormat dataFormat) {
+        this.format = format;
+        this.converter = extractConverter(dataFormat);
+    }
+
+    /**
+     * Unmarshal the CSV
+     *
+     * @param exchange    Exchange (used for accessing type converter)
+     * @param inputStream Input CSV stream
+     * @return Unmarshalled CSV
+     * @throws IOException if the stream cannot be read properly
+     */
+    public abstract Object unmarshal(Exchange exchange, InputStream 
inputStream) throws IOException;
+
+    private static CsvRecordConverter<?> extractConverter(CsvDataFormat 
dataFormat) {
+        if (dataFormat.getRecordConverter() != null) {
+            return dataFormat.getRecordConverter();
+        } else if (dataFormat.isUseMaps()) {
+            return CsvRecordConverters.mapConverter();
+        } else {
+            return CsvRecordConverters.listConverter();
+        }
+    }
+
+    //region Implementations
+
+    /**
+     * This class reads all the CSV into one big list.
+     */
+    private static final class BulkCsvUnmarshaller extends CsvUnmarshaller {
+        private BulkCsvUnmarshaller(CSVFormat format, CsvDataFormat 
dataFormat) {
+            super(format, dataFormat);
+        }
+
+        public Object unmarshal(Exchange exchange, InputStream inputStream) 
throws IOException {
+            try (CSVParser parser = new CSVParser(new 
InputStreamReader(inputStream, IOHelper.getCharsetName(exchange)), format)) {
+                return asList(parser.iterator(), converter);
+            }
+        }
+
+        private <T> List<T> asList(Iterator<CSVRecord> iterator, 
CsvRecordConverter<T> converter) {
+            List<T> answer = new ArrayList<>();
+            while (iterator.hasNext()) {
+                answer.add(converter.convertRecord(iterator.next()));
+            }
+            return answer;
+        }
+    }
+
+    /**
+     * This class streams the content of the CSV
+     */
+    private static final class StreamCsvUnmarshaller extends CsvUnmarshaller {
+        private StreamCsvUnmarshaller(CSVFormat format, CsvDataFormat 
dataFormat) {
+            super(format, dataFormat);
+        }
+
+        @Override
+        public Object unmarshal(Exchange exchange, InputStream inputStream) 
throws IOException {
+            Reader reader = null;
+            try {
+                reader = new InputStreamReader(inputStream, 
IOHelper.getCharsetName(exchange));
+                CSVParser parser = new CSVParser(reader, format);
+                return new CsvIterator<>(parser.iterator(), converter);
+            } catch (Exception e) {
+                IOHelper.close(reader);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * This class converts the CSV iterator into the proper result type.
+     *
+     * @param <T> Converted type
+     */
+    private static final class CsvIterator<T> implements Iterator<T> {
+        private final Iterator<CSVRecord> iterator;
+        private final CsvRecordConverter<T> converter;
+
+        private CsvIterator(Iterator<CSVRecord> iterator, 
CsvRecordConverter<T> converter) {
+            this.iterator = iterator;
+            this.converter = converter;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public T next() {
+            return converter.convertRecord(iterator.next());
+        }
+
+        @Override
+        public void remove() {
+            iterator.remove();
+        }
+    }
+    //endregion
+}

Reply via email to