Repository: camel Updated Branches: refs/heads/camel-2.17.x fec76be80 -> 3628697f3
CAMEL-10007: camel-zipfile - ZipAggregationStrategy should ignore zero byte files. Also close resources so works better on windows. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/71c681bf Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/71c681bf Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/71c681bf Branch: refs/heads/camel-2.17.x Commit: 71c681bf6a6f77a4a7320d645d9531c657469d75 Parents: fec76be Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Jun 1 15:27:50 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Jun 1 15:31:31 2016 +0200 ---------------------------------------------------------------------- .../zipfile/ZipAggregationStrategy.java | 91 ++++++++++++-------- .../ZipAggregationStrategyEmptyFileTest.java | 91 ++++++++++++++++++++ 2 files changed, 145 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/71c681bf/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java index a6b7a0c..6cef428 100644 --- a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java +++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java @@ -27,6 +27,7 @@ import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; import org.apache.camel.Exchange; +import org.apache.camel.WrappedFile; import org.apache.camel.component.file.FileConsumer; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileMessage; @@ -129,18 +130,22 @@ public class ZipAggregationStrategy implements AggregationStrategy { } else { zipFile = oldExchange.getIn().getBody(File.class); } + + Object body = newExchange.getIn().getBody(); + if (body instanceof WrappedFile) { + body = ((WrappedFile) body).getFile(); + } - // Handle GenericFileMessages - if (GenericFileMessage.class.isAssignableFrom(newExchange.getIn().getClass())) { + if (body instanceof File) { try { - File appendFile = newExchange.getIn().getMandatoryBody(File.class); - if (appendFile != null) { - addFileToZip(zipFile, appendFile, this.preserveFolderStructure ? newExchange.getIn().toString() : null); + File appendFile = (File) body; + // do not try to append empty files + if (appendFile.length() > 0) { + String entryName = preserveFolderStructure ? newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : newExchange.getIn().getMessageId(); + addFileToZip(zipFile, appendFile, this.preserveFolderStructure ? entryName : null); GenericFile<File> genericFile = FileConsumer.asGenericFile( - zipFile.getParent(), - zipFile, - Charset.defaultCharset().toString()); + zipFile.getParent(), zipFile, Charset.defaultCharset().toString(), false); genericFile.bindToExchange(answer); } } catch (Exception e) { @@ -150,11 +155,14 @@ public class ZipAggregationStrategy implements AggregationStrategy { // Handle all other messages try { byte[] buffer = newExchange.getIn().getMandatoryBody(byte[].class); - String entryName = useFilenameHeader ? newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : newExchange.getIn().getMessageId(); - addEntryToZip(zipFile, entryName, buffer, buffer.length); - GenericFile<File> genericFile = FileConsumer.asGenericFile( - zipFile.getParent(), zipFile, Charset.defaultCharset().toString()); - genericFile.bindToExchange(answer); + // do not try to append empty data + if (buffer.length > 0) { + String entryName = useFilenameHeader ? newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : newExchange.getIn().getMessageId(); + addEntryToZip(zipFile, entryName, buffer, buffer.length); + GenericFile<File> genericFile = FileConsumer.asGenericFile( + zipFile.getParent(), zipFile, Charset.defaultCharset().toString(), false); + genericFile.bindToExchange(answer); + } } catch (Exception e) { throw new GenericFileOperationFailedException(e.getMessage(), e); } @@ -170,25 +178,30 @@ public class ZipAggregationStrategy implements AggregationStrategy { throw new IOException("Could not make temp file (" + source.getName() + ")"); } byte[] buffer = new byte[8192]; - ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); - ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); - InputStream in = new FileInputStream(file); - out.putNextEntry(new ZipEntry(fileName == null ? file.getName() : fileName)); - for (int read = in.read(buffer); read > -1; read = in.read(buffer)) { - out.write(buffer, 0, read); - } - out.closeEntry(); - IOHelper.close(in); + FileInputStream fis = new FileInputStream(tmpZip); + ZipInputStream zin = new ZipInputStream(fis); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); - for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - out.putNextEntry(ze); - for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { + try { + InputStream in = new FileInputStream(file); + out.putNextEntry(new ZipEntry(fileName == null ? file.getName() : fileName)); + for (int read = in.read(buffer); read > -1; read = in.read(buffer)) { out.write(buffer, 0, read); } out.closeEntry(); + IOHelper.close(in); + + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + out.putNextEntry(ze); + for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); + } + } finally { + IOHelper.close(fis, zin, out); } - IOHelper.close(zin, out); tmpZip.delete(); } @@ -198,21 +211,25 @@ public class ZipAggregationStrategy implements AggregationStrategy { if (!source.renameTo(tmpZip)) { throw new IOException("Cannot create temp file: " + source.getName()); } - ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); + + FileInputStream fis = new FileInputStream(tmpZip); + ZipInputStream zin = new ZipInputStream(fis); ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); - - out.putNextEntry(new ZipEntry(entryName)); - out.write(buffer, 0, length); - out.closeEntry(); + try { + out.putNextEntry(new ZipEntry(entryName)); + out.write(buffer, 0, length); + out.closeEntry(); - for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - out.putNextEntry(ze); - for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { - out.write(buffer, 0, read); + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + out.putNextEntry(ze); + for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); } - out.closeEntry(); + } finally { + IOHelper.close(fis, zin, out); } - IOHelper.close(zin, out); tmpZip.delete(); } http://git-wip-us.apache.org/repos/asf/camel/blob/71c681bf/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java new file mode 100644 index 0000000..c151c47 --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + +import java.io.File; +import java.io.FileInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.util.IOHelper; +import org.junit.Test; + +public class ZipAggregationStrategyEmptyFileTest extends CamelTestSupport { + + private static final int EXPECTED_NO_FILES = 3; + + @Override + public void setUp() throws Exception { + deleteDirectory("target/foo"); + deleteDirectory("target/out"); + super.setUp(); + } + + @Test + public void testEmptyFile() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregateToZipEntry"); + mock.expectedMessageCount(1); + + template.sendBody("file:target/foo", "Hello"); + // empty file which is not aggregated + template.sendBody("file:target/foo", ""); + template.sendBody("file:target/foo", "Bye"); + template.sendBody("file:target/foo", "Howdy"); + + assertMockEndpointsSatisfied(); + + Thread.sleep(500); + + File[] files = new File("target/out").listFiles(); + assertTrue(files != null); + assertTrue("Should be a file in target/out directory", files.length > 0); + + File resultFile = files[0]; + + ZipInputStream zin = new ZipInputStream(new FileInputStream(resultFile)); + try { + int fileCount = 0; + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + fileCount = fileCount + 1; + } + assertEquals("Zip file should contains " + ZipAggregationStrategyEmptyFileTest.EXPECTED_NO_FILES + " files", ZipAggregationStrategyEmptyFileTest.EXPECTED_NO_FILES, fileCount); + } finally { + IOHelper.close(zin); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/foo") + .aggregate(new ZipAggregationStrategy()) + .constant(true) + .completionSize(4) + .eagerCheckCompletion() + .to("file:target/out") + .to("mock:aggregateToZipEntry") + .log("Done processing zip file: ${header.CamelFileName}"); + } + }; + + } +}