Repository: camel Updated Branches: refs/heads/master 5b44c4fd2 -> aeff910e9
CAMEL-9843: camel-beanio - Add BeanIOSplitter Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aeff910e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aeff910e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aeff910e Branch: refs/heads/master Commit: aeff910e9708306924b3119bc6a4a05e83a32573 Parents: 5b44c4f Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Apr 9 08:41:19 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Apr 9 08:41:19 2016 +0200 ---------------------------------------------------------------------- .../dataformat/beanio/BeanIOConfiguration.java | 90 +++++++++++++ .../dataformat/beanio/BeanIODataFormat.java | 131 ++++++------------ .../dataformat/beanio/BeanIOErrorHandler.java | 70 ++++++++++ .../camel/dataformat/beanio/BeanIOSplitter.java | 135 +++++++++++++++++-- .../dataformat/beanio/BeanIOSplitterTest.java | 11 +- 5 files changed, 326 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/aeff910e/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOConfiguration.java b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOConfiguration.java new file mode 100644 index 0000000..973badf --- /dev/null +++ b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOConfiguration.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.beanio; + +import java.nio.charset.Charset; +import java.util.Properties; + +/** + * To configure the BeanIO data format, or BeanIO splitter. + */ +public class BeanIOConfiguration { + + private String streamName; + private String mapping; + private boolean ignoreUnidentifiedRecords; + private boolean ignoreUnexpectedRecords; + private boolean ignoreInvalidRecords; + private Charset encoding = Charset.defaultCharset(); + private Properties properties; + + public String getMapping() { + return mapping; + } + + public void setMapping(String mapping) { + this.mapping = mapping; + } + + public String getStreamName() { + return streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public boolean isIgnoreUnidentifiedRecords() { + return ignoreUnidentifiedRecords; + } + + public void setIgnoreUnidentifiedRecords(boolean ignoreUnidentifiedRecords) { + this.ignoreUnidentifiedRecords = ignoreUnidentifiedRecords; + } + + public boolean isIgnoreUnexpectedRecords() { + return ignoreUnexpectedRecords; + } + + public void setIgnoreUnexpectedRecords(boolean ignoreUnexpectedRecords) { + this.ignoreUnexpectedRecords = ignoreUnexpectedRecords; + } + + public boolean isIgnoreInvalidRecords() { + return ignoreInvalidRecords; + } + + public void setIgnoreInvalidRecords(boolean ignoreInvalidRecords) { + this.ignoreInvalidRecords = ignoreInvalidRecords; + } + + public Charset getEncoding() { + return encoding; + } + + public void setEncoding(Charset encoding) { + this.encoding = encoding; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/aeff910e/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIODataFormat.java ---------------------------------------------------------------------- diff --git a/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIODataFormat.java b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIODataFormat.java index d27df2d..e33d7ff 100644 --- a/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIODataFormat.java +++ b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIODataFormat.java @@ -38,12 +38,8 @@ import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ResourceHelper; import org.beanio.BeanReader; -import org.beanio.BeanReaderErrorHandlerSupport; import org.beanio.BeanWriter; -import org.beanio.InvalidRecordException; import org.beanio.StreamFactory; -import org.beanio.UnexpectedRecordException; -import org.beanio.UnidentifiedRecordException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,25 +49,18 @@ import org.slf4j.LoggerFactory; */ public class BeanIODataFormat extends ServiceSupport implements DataFormat, DataFormatName, CamelContextAware { - private static final String LOG_PREFIX = "BeanIO: "; private static final Logger LOG = LoggerFactory.getLogger(BeanIODataFormat.class); private transient CamelContext camelContext; private transient StreamFactory factory; - private String streamName; - private String mapping; - private boolean ignoreUnidentifiedRecords; - private boolean ignoreUnexpectedRecords; - private boolean ignoreInvalidRecords; - private Charset encoding = Charset.defaultCharset(); - private Properties properties; + private BeanIOConfiguration configuration = new BeanIOConfiguration(); public BeanIODataFormat() { } public BeanIODataFormat(String mapping, String streamName) { - this.mapping = mapping; - this.streamName = streamName; + setMapping(mapping); + setStreamName(streamName); } @Override @@ -81,16 +70,16 @@ public class BeanIODataFormat extends ServiceSupport implements DataFormat, Data @Override protected void doStart() throws Exception { - ObjectHelper.notNull(streamName, "Stream name not configured."); + ObjectHelper.notNull(getStreamName(), "Stream name not configured."); if (factory == null) { // Create the stream factory that will be used to read/write objects. factory = StreamFactory.newInstance(); // Load the mapping file using the resource helper to ensure it can be loaded in OSGi and other environments - InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), mapping); + InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), getMapping()); try { - if (properties != null) { - factory.load(is, properties); + if (getProperties() != null) { + factory.load(is, getProperties()); } else { factory.load(is); } @@ -140,8 +129,8 @@ public class BeanIODataFormat extends ServiceSupport implements DataFormat, Data } private void writeModels(OutputStream stream, List<Object> models) { - BufferedWriter streamWriter = IOHelper.buffered(new OutputStreamWriter(stream, encoding)); - BeanWriter out = factory.createWriter(streamName, streamWriter); + BufferedWriter streamWriter = IOHelper.buffered(new OutputStreamWriter(stream, getEncoding())); + BeanWriter out = factory.createWriter(getStreamName(), streamWriter); for (Object obj : models) { out.write(obj); @@ -153,12 +142,12 @@ public class BeanIODataFormat extends ServiceSupport implements DataFormat, Data private List<Object> readModels(Exchange exchange, InputStream stream) { List<Object> results = new ArrayList<Object>(); - BufferedReader streamReader = IOHelper.buffered(new InputStreamReader(stream, encoding)); + BufferedReader streamReader = IOHelper.buffered(new InputStreamReader(stream, getEncoding())); - BeanReader in = factory.createReader(streamName, streamReader); + BeanReader in = factory.createReader(getStreamName(), streamReader); try { - registerErrorHandler(in); + in.setErrorHandler(new BeanIOErrorHandler(configuration)); Object readObject; while ((readObject = in.read()) != null) { @@ -174,101 +163,59 @@ public class BeanIODataFormat extends ServiceSupport implements DataFormat, Data return results; } - private void registerErrorHandler(BeanReader in) { - in.setErrorHandler(new BeanReaderErrorHandlerSupport() { - - @Override - public void invalidRecord(InvalidRecordException ex) throws Exception { - String msg = LOG_PREFIX + "InvalidRecord: " + ex.getMessage() + ": " + ex.getRecordContext().getRecordText(); - if (ignoreInvalidRecords) { - LOG.debug(msg); - } else { - LOG.warn(msg); - throw ex; - } - } - - @Override - public void unexpectedRecord(UnexpectedRecordException ex) throws Exception { - String msg = LOG_PREFIX + "UnexpectedRecord: " + ex.getMessage() + ": " + ex.getRecordContext().getRecordText(); - if (ignoreUnexpectedRecords) { - LOG.debug(msg); - } else { - LOG.warn(msg); - throw ex; - } - } - - @Override - public void unidentifiedRecord(UnidentifiedRecordException ex) throws Exception { - String msg = LOG_PREFIX + "UnidentifiedRecord: " + ex.getMessage() + ": " + ex.getRecordContext().getRecordText(); - if (ignoreUnidentifiedRecords) { - LOG.debug(msg); - } else { - LOG.warn(msg); - throw ex; - } - } - }); - } - - public Charset getEncoding() { - return encoding; + public String getMapping() { + return configuration.getMapping(); } - public void setEncoding(Charset encoding) { - this.encoding = encoding; - } - - public void setEncoding(String encoding) { - this.encoding = Charset.forName(encoding); + public void setIgnoreUnexpectedRecords(boolean ignoreUnexpectedRecords) { + configuration.setIgnoreUnexpectedRecords(ignoreUnexpectedRecords); } - public boolean isIgnoreInvalidRecords() { - return ignoreInvalidRecords; + public void setProperties(Properties properties) { + configuration.setProperties(properties); } - public void setIgnoreInvalidRecords(boolean ignoreInvalidRecords) { - this.ignoreInvalidRecords = ignoreInvalidRecords; + public void setStreamName(String streamName) { + configuration.setStreamName(streamName); } - public boolean isIgnoreUnexpectedRecords() { - return ignoreUnexpectedRecords; + public boolean isIgnoreUnidentifiedRecords() { + return configuration.isIgnoreUnidentifiedRecords(); } - public void setIgnoreUnexpectedRecords(boolean ignoreUnexpectedRecords) { - this.ignoreUnexpectedRecords = ignoreUnexpectedRecords; + public boolean isIgnoreInvalidRecords() { + return configuration.isIgnoreInvalidRecords(); } - public boolean isIgnoreUnidentifiedRecords() { - return ignoreUnidentifiedRecords; + public void setIgnoreInvalidRecords(boolean ignoreInvalidRecords) { + configuration.setIgnoreInvalidRecords(ignoreInvalidRecords); } - public void setIgnoreUnidentifiedRecords(boolean ignoreUnidentifiedRecords) { - this.ignoreUnidentifiedRecords = ignoreUnidentifiedRecords; + public void setEncoding(Charset encoding) { + configuration.setEncoding(encoding); } - public String getMapping() { - return mapping; + public boolean isIgnoreUnexpectedRecords() { + return configuration.isIgnoreUnexpectedRecords(); } - public void setMapping(String mapping) { - this.mapping = mapping; + public Properties getProperties() { + return configuration.getProperties(); } public String getStreamName() { - return streamName; + return configuration.getStreamName(); } - public void setStreamName(String streamName) { - this.streamName = streamName; + public void setMapping(String mapping) { + configuration.setMapping(mapping); } - public Properties getProperties() { - return properties; + public void setIgnoreUnidentifiedRecords(boolean ignoreUnidentifiedRecords) { + configuration.setIgnoreUnidentifiedRecords(ignoreUnidentifiedRecords); } - public void setProperties(Properties properties) { - this.properties = properties; + public Charset getEncoding() { + return configuration.getEncoding(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/aeff910e/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOErrorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOErrorHandler.java b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOErrorHandler.java new file mode 100644 index 0000000..26b1529 --- /dev/null +++ b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOErrorHandler.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.beanio; + +import org.beanio.BeanReaderErrorHandlerSupport; +import org.beanio.InvalidRecordException; +import org.beanio.UnexpectedRecordException; +import org.beanio.UnidentifiedRecordException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BeanIOErrorHandler extends BeanReaderErrorHandlerSupport { + + private static final String LOG_PREFIX = "BeanIO: "; + private static final Logger LOG = LoggerFactory.getLogger(BeanIOErrorHandler.class); + + private final BeanIOConfiguration configuration; + + public BeanIOErrorHandler(BeanIOConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public void invalidRecord(InvalidRecordException ex) throws Exception { + String msg = LOG_PREFIX + "InvalidRecord: " + ex.getMessage() + ": " + ex.getRecordContext().getRecordText(); + if (configuration.isIgnoreInvalidRecords()) { + LOG.debug(msg); + } else { + LOG.warn(msg); + throw ex; + } + } + + @Override + public void unexpectedRecord(UnexpectedRecordException ex) throws Exception { + String msg = LOG_PREFIX + "UnexpectedRecord: " + ex.getMessage() + ": " + ex.getRecordContext().getRecordText(); + if (configuration.isIgnoreUnexpectedRecords()) { + LOG.debug(msg); + } else { + LOG.warn(msg); + throw ex; + } + } + + @Override + public void unidentifiedRecord(UnidentifiedRecordException ex) throws Exception { + String msg = LOG_PREFIX + "UnidentifiedRecord: " + ex.getMessage() + ": " + ex.getRecordContext().getRecordText(); + if (configuration.isIgnoreUnidentifiedRecords()) { + LOG.debug(msg); + } else { + LOG.warn(msg); + throw ex; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/aeff910e/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOSplitter.java ---------------------------------------------------------------------- diff --git a/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOSplitter.java b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOSplitter.java index bb153cc..7eb80da 100644 --- a/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOSplitter.java +++ b/components/camel-beanio/src/main/java/org/apache/camel/dataformat/beanio/BeanIOSplitter.java @@ -17,34 +17,70 @@ package org.apache.camel.dataformat.beanio; import java.io.File; +import java.io.InputStream; import java.io.Reader; +import java.nio.charset.Charset; +import java.util.Properties; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Expression; -import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.WrappedFile; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.ResourceHelper; import org.beanio.BeanReader; import org.beanio.StreamFactory; +/** + * You can use {@link BeanIOSplitter} with the Camel Splitter EIP to split big payloads + * using a stream mode to avoid reading the entire content into memory. + */ public class BeanIOSplitter implements Expression { - // TODO: should not reuse dataformat but have its own configuration likely - // TODO: need some way of starting to load the stream factory - private BeanIODataFormat dataFormat; + private BeanIOConfiguration configuration = new BeanIOConfiguration(); + private StreamFactory factory; + + public BeanIOSplitter() throws Exception { + } - public BeanIOSplitter(BeanIODataFormat dataFormat) throws Exception { - this.dataFormat = dataFormat; - ServiceHelper.startService(dataFormat); + public BeanIOSplitter(BeanIOConfiguration configuration) { + this.configuration = configuration; } - public Object evaluate(Exchange exchange) throws InvalidPayloadException { + public BeanIOSplitter(String mapping, String streamName) { + setMapping(mapping); + setStreamName(streamName); + } + + protected StreamFactory createStreamFactory(CamelContext camelContext) throws Exception { + ObjectHelper.notNull(getStreamName(), "Stream name not configured."); + // Create the stream factory that will be used to read/write objects. + StreamFactory answer = StreamFactory.newInstance(); + + // Load the mapping file using the resource helper to ensure it can be loaded in OSGi and other environments + InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, getMapping()); + try { + if (getProperties() != null) { + answer.load(is, getProperties()); + } else { + answer.load(is); + } + } finally { + IOHelper.close(is); + } + + return answer; + } + + public Object evaluate(Exchange exchange) throws Exception { Message msg = exchange.getIn(); Object body = msg.getBody(); - StreamFactory sf = dataFormat.getFactory(); + if (factory == null) { + factory = createStreamFactory(exchange.getContext()); + } BeanReader beanReader = null; if (body instanceof WrappedFile) { @@ -52,13 +88,15 @@ public class BeanIOSplitter implements Expression { } if (body instanceof File) { File file = (File) body; - beanReader = sf.createReader(dataFormat.getStreamName(), file); + beanReader = factory.createReader(getStreamName(), file); } if (beanReader == null) { Reader reader = msg.getMandatoryBody(Reader.class); - beanReader = sf.createReader(dataFormat.getStreamName(), reader); + beanReader = factory.createReader(getStreamName(), reader); } + beanReader.setErrorHandler(new BeanIOErrorHandler(configuration)); + return new BeanIOIterator(beanReader); } @@ -67,9 +105,80 @@ public class BeanIOSplitter implements Expression { try { Object result = evaluate(exchange); return exchange.getContext().getTypeConverter().convertTo(type, exchange, result); - } catch (InvalidPayloadException e) { + } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } } + public BeanIOConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(BeanIOConfiguration configuration) { + this.configuration = configuration; + } + + public StreamFactory getFactory() { + return factory; + } + + public void setFactory(StreamFactory factory) { + this.factory = factory; + } + + public String getMapping() { + return configuration.getMapping(); + } + + public void setIgnoreUnexpectedRecords(boolean ignoreUnexpectedRecords) { + configuration.setIgnoreUnexpectedRecords(ignoreUnexpectedRecords); + } + + public void setProperties(Properties properties) { + configuration.setProperties(properties); + } + + public void setStreamName(String streamName) { + configuration.setStreamName(streamName); + } + + public boolean isIgnoreUnidentifiedRecords() { + return configuration.isIgnoreUnidentifiedRecords(); + } + + public boolean isIgnoreInvalidRecords() { + return configuration.isIgnoreInvalidRecords(); + } + + public void setIgnoreInvalidRecords(boolean ignoreInvalidRecords) { + configuration.setIgnoreInvalidRecords(ignoreInvalidRecords); + } + + public void setEncoding(Charset encoding) { + configuration.setEncoding(encoding); + } + + public boolean isIgnoreUnexpectedRecords() { + return configuration.isIgnoreUnexpectedRecords(); + } + + public Properties getProperties() { + return configuration.getProperties(); + } + + public String getStreamName() { + return configuration.getStreamName(); + } + + public void setMapping(String mapping) { + configuration.setMapping(mapping); + } + + public void setIgnoreUnidentifiedRecords(boolean ignoreUnidentifiedRecords) { + configuration.setIgnoreUnidentifiedRecords(ignoreUnidentifiedRecords); + } + + public Charset getEncoding() { + return configuration.getEncoding(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/aeff910e/components/camel-beanio/src/test/java/org/apache/camel/dataformat/beanio/BeanIOSplitterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-beanio/src/test/java/org/apache/camel/dataformat/beanio/BeanIOSplitterTest.java b/components/camel-beanio/src/test/java/org/apache/camel/dataformat/beanio/BeanIOSplitterTest.java index 2ecb4c4..e8d3033 100644 --- a/components/camel-beanio/src/test/java/org/apache/camel/dataformat/beanio/BeanIOSplitterTest.java +++ b/components/camel-beanio/src/test/java/org/apache/camel/dataformat/beanio/BeanIOSplitterTest.java @@ -53,17 +53,16 @@ public class BeanIOSplitterTest extends CamelTestSupport { @Override public void configure() throws Exception { // START SNIPPET: e1 - // setup beanio data format using the mapping file, loaded from the classpath - BeanIODataFormat format = new BeanIODataFormat( - "org/apache/camel/dataformat/beanio/mappings.xml", - "employeeFile"); - format.setCamelContext(context); + // setup beanio splitter using the mapping file, loaded from the classpath + BeanIOSplitter splitter = new BeanIOSplitter(); + splitter.setMapping("org/apache/camel/dataformat/beanio/mappings.xml"); + splitter.setStreamName("employeeFile"); // a route which uses the bean io data format to format a CSV data // to java objects from("direct:unmarshal") // and then split the message body so we get a message for each row - .split(new BeanIOSplitter(format)).streaming() + .split(splitter).streaming() .to("log:line") .to("mock:beanio-unmarshal"); // END SNIPPET: e1