This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 3417c9e CAMEL-13774: camel-zipfile now supports marshal an iterator to zipfile 3417c9e is described below commit 3417c9ef5fa45f86dfa386c8b38a9ce4bf0bdd30 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Aug 7 12:54:59 2019 +0200 CAMEL-13774: camel-zipfile now supports marshal an iterator to zipfile --- .../dataformat/zipfile/ZipFileDataFormat.java | 20 +++++- .../zipfile/ZipFileIteratorDataFormatTest.java | 67 +++++++++++++++++++ .../camel/support/InputStreamIteratorTest.java | 51 ++++++++++++++ .../apache/camel/support/InputStreamIterator.java | 78 ++++++++++++++++++++++ 4 files changed, 215 insertions(+), 1 deletion(-) diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java index fe889f5..5fd9f4e 100644 --- a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java +++ b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java @@ -16,6 +16,7 @@ */ package org.apache.camel.dataformat.zipfile; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -27,9 +28,12 @@ import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.TypeConverter; import org.apache.camel.spi.DataFormat; import org.apache.camel.spi.DataFormatName; import org.apache.camel.spi.annotations.Dataformat; +import org.apache.camel.support.InputStreamIterator; import org.apache.camel.support.builder.OutputStreamBuilder; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.IOHelper; @@ -71,7 +75,21 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat createZipEntries(zos, filename); } - InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph); + Object body = exchange.getIn().getBody(); + TypeConverter converter = exchange.getContext().getTypeConverter(); + // favour using input stream + InputStream is = converter.tryConvertTo(InputStream.class, exchange, body); + if (is == null) { + // okay so try to see if its an iterator which we can wrap as input stream + Iterator it = converter.tryConvertTo(Iterator.class, exchange, body); + if (it != null) { + is = new InputStreamIterator(converter, it); + is = new BufferedInputStream(is); + } + } + if (is == null) { + throw new InvalidPayloadException(exchange, body.getClass()); + } try { IOHelper.copy(is, zos); diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileIteratorDataFormatTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileIteratorDataFormatTest.java new file mode 100644 index 0000000..f576429 --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileIteratorDataFormatTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.zipfile; + +import java.io.File; +import java.util.stream.Stream; + +import org.apache.camel.Exchange; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ZipFileIteratorDataFormatTest extends CamelTestSupport { + + @Test + public void testZipFileIterator() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + Stream body = Stream.of("ABC", "DEF", "1234567890"); + template.sendBody("direct:zip", body); + + assertMockEndpointsSatisfied(); + + resetMocks(); + + // unzip the file + getMockEndpoint("mock:unzip").expectedBodiesReceived("ABCDEF1234567890"); + + File file = new File("target/output/report.txt.zip"); + template.sendBody("direct:unzip", file); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:zip") + .setHeader(Exchange.FILE_NAME, constant("report.txt")) + .marshal().zipFile() + .to("file:target/output") + .to("mock:result"); + + from("direct:unzip") + .unmarshal().zipFile() + .to("mock:unzip"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/support/InputStreamIteratorTest.java b/core/camel-core/src/test/java/org/apache/camel/support/InputStreamIteratorTest.java new file mode 100644 index 0000000..4b094f0 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/support/InputStreamIteratorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.io.ByteArrayOutputStream; +import java.util.Iterator; +import java.util.stream.Stream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.util.IOHelper; +import org.junit.Test; + +public class InputStreamIteratorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testInputStreamIterator() throws Exception { + context.start(); + + Iterator it = Stream.of("ABC", "DEF", "1234567890").iterator(); + InputStreamIterator is = new InputStreamIterator(context.getTypeConverter(), it); + // from the first chunk + assertEquals(3, is.available()); + + // copy input to output + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + IOHelper.copy(is, bos); + IOHelper.close(is, bos); + + // and we have the data back + assertEquals("ABCDEF1234567890", bos.toString()); + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/InputStreamIterator.java b/core/camel-support/src/main/java/org/apache/camel/support/InputStreamIterator.java new file mode 100644 index 0000000..5c80b2e --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/InputStreamIterator.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; + +import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.TypeConverter; + +/** + * An {@link InputStream} that wraps an {@link Iterator} which reads iterations as byte array data. + */ +public final class InputStreamIterator extends InputStream { + private final TypeConverter converter; + private final Iterator it; + private InputStream chunk; + + public InputStreamIterator(TypeConverter converter, Iterator it) { + this.converter = converter; + this.it = it; + } + + @Override + public int read() throws IOException { + if (chunk == null) { + chunk = nextChunk(); + } + if (chunk == null) { + return -1; + } + int data = chunk.read(); + if (data == -1) { + // initialize for next chunk + chunk = null; + return read(); + } + + return data; + } + + @Override + public int available() throws IOException { + if (chunk == null) { + chunk = nextChunk(); + } + return chunk != null ? chunk.available() : 0; + } + + private InputStream nextChunk() throws IOException { + if (it.hasNext()) { + try { + byte[] buf = converter.mandatoryConvertTo(byte[].class, it.next()); + return new ByteArrayInputStream(buf); + } catch (NoTypeConversionAvailableException e) { + throw new IOException(e); + } + } else { + return null; + } + } +}