This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 30b93af3fc6f07e0dcdf0ccc2c843d7ea0b217ee Author: Jan Bednář <m...@janbednar.eu> AuthorDate: Wed Jul 17 00:21:13 2019 +0200 CAMEL-13399: Optimized ZipAggregationStrategy to use ZipFileSystem --- .../aggregate/zipfile/ZipAggregationStrategy.java | 143 ++++++++++----------- .../AggregationStrategyWithFilenameHeaderTest.java | 13 +- .../AggregationStrategyWithPreservationTest.java | 28 ++-- .../ZipAggregationStrategyEmptyFileTest.java | 16 +-- .../zipfile/ZipAggregationStrategyTest.java | 15 +-- 5 files changed, 103 insertions(+), 112 deletions(-) diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java index 71a1117..b6e3454 100644 --- a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java +++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java @@ -17,15 +17,17 @@ package org.apache.camel.processor.aggregate.zipfile; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; import java.nio.file.Files; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; -import java.util.zip.ZipOutputStream; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; @@ -35,8 +37,8 @@ import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileMessage; import org.apache.camel.component.file.GenericFileOperationFailedException; import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.ExchangeHelper; import org.apache.camel.util.FileUtil; -import org.apache.camel.util.IOHelper; /** * This aggregation strategy will aggregate all incoming messages into a ZIP file. @@ -53,10 +55,11 @@ public class ZipAggregationStrategy implements AggregationStrategy { private String fileSuffix = ".zip"; private boolean preserveFolderStructure; private boolean useFilenameHeader; + private boolean useTempFile; private File parentDir = new File(System.getProperty("java.io.tmpdir")); public ZipAggregationStrategy() { - this(false, false); + this(false); } /** @@ -66,7 +69,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { public ZipAggregationStrategy(boolean preserveFolderStructure) { this(preserveFolderStructure, false); } - + /** * @param preserveFolderStructure if true, the folder structure is preserved when the source is * a type of {@link GenericFileMessage}. If used with a file, use recursive=true. @@ -74,8 +77,20 @@ public class ZipAggregationStrategy implements AggregationStrategy { * within the ZIP file. */ public ZipAggregationStrategy(boolean preserveFolderStructure, boolean useFilenameHeader) { + this(preserveFolderStructure, useFilenameHeader, false); + } + + /** + * @param preserveFolderStructure if true, the folder structure is preserved when the source is + * a type of {@link GenericFileMessage}. If used with a file, use recursive=true. + * @param useFilenameHeader if true, the filename header will be used to name aggregated byte arrays + * within the ZIP file. + * @param useTempFile if true, the ZipFileSystem will use temporary files for zip manipulations instead of memory. + */ + public ZipAggregationStrategy(boolean preserveFolderStructure, boolean useFilenameHeader, boolean useTempFile) { this.preserveFolderStructure = preserveFolderStructure; this.useFilenameHeader = useFilenameHeader; + this.useTempFile = useTempFile; } /** @@ -142,7 +157,8 @@ public class ZipAggregationStrategy implements AggregationStrategy { if (oldExchange == null) { try { zipFile = FileUtil.createTempFile(this.filePrefix, this.fileSuffix, this.parentDir); - } catch (IOException e) { + newZipFile(zipFile); + } catch (IOException | URISyntaxException e) { throw new GenericFileOperationFailedException(e.getMessage(), e); } answer = newExchange; @@ -150,12 +166,13 @@ public class ZipAggregationStrategy implements AggregationStrategy { } else { zipFile = oldExchange.getIn().getBody(File.class); } - Object body = newExchange.getIn().getBody(); if (body instanceof WrappedFile) { body = ((WrappedFile) body).getFile(); } - + + String charset = ExchangeHelper.getCharsetName(newExchange, true); + if (body instanceof File) { try { File appendFile = (File) body; @@ -163,10 +180,6 @@ public class ZipAggregationStrategy implements AggregationStrategy { if (appendFile.length() > 0) { String entryName = preserveFolderStructure ? newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : newExchange.getIn().getMessageId(); addFileToZip(zipFile, appendFile, this.preserveFolderStructure ? entryName : null); - GenericFile<File> genericFile = - FileConsumer.asGenericFile( - zipFile.getParent(), zipFile, Charset.defaultCharset().toString(), false); - genericFile.bindToExchange(answer); } } catch (Exception e) { throw new GenericFileOperationFailedException(e.getMessage(), e); @@ -178,81 +191,59 @@ public class ZipAggregationStrategy implements AggregationStrategy { // do not try to append empty data if (buffer.length > 0) { String entryName = useFilenameHeader ? newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : newExchange.getIn().getMessageId(); - addEntryToZip(zipFile, entryName, buffer, buffer.length); - GenericFile<File> genericFile = FileConsumer.asGenericFile( - zipFile.getParent(), zipFile, Charset.defaultCharset().toString(), false); - genericFile.bindToExchange(answer); + addEntryToZip(zipFile, entryName, buffer, charset); } } catch (Exception e) { throw new GenericFileOperationFailedException(e.getMessage(), e); } } - + + GenericFile<File> genericFile = FileConsumer.asGenericFile(zipFile.getParent(), zipFile, charset, false); + genericFile.bindToExchange(answer); + return answer; } - - private static void addFileToZip(File source, File file, String fileName) throws IOException { - File tmpZip = Files.createTempFile(source.getName(), null).toFile(); - tmpZip.delete(); - if (!source.renameTo(tmpZip)) { - throw new IOException("Could not make temp file (" + source.getName() + ")"); - } - byte[] buffer = new byte[8192]; - FileInputStream fis = new FileInputStream(tmpZip); - ZipInputStream zin = new ZipInputStream(fis); - ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); - - try { - InputStream in = new FileInputStream(file); - out.putNextEntry(new ZipEntry(fileName == null ? file.getName() : fileName)); - for (int read = in.read(buffer); read > -1; read = in.read(buffer)) { - out.write(buffer, 0, read); + private static void newZipFile(File zipFile) throws URISyntaxException, IOException { + if (zipFile.exists()) { + if (!zipFile.delete()) { // Delete, because ZipFileSystem needs to create file on its own (with correct END bytes in the file) + throw new IOException("Cannot delete file " + zipFile); } - out.closeEntry(); - IOHelper.close(in); + } + Map<String, String> env = new HashMap<>(); + env.put("create", Boolean.TRUE.toString()); - for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - out.putNextEntry(ze); - for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { - out.write(buffer, 0, read); - } - out.closeEntry(); - } - } finally { - IOHelper.close(fis, zin, out); + try (FileSystem ignored = FileSystems.newFileSystem(getZipURI(zipFile), env)) { + //noop, just open and close FileSystem to initialize correct headers in file } - tmpZip.delete(); } - - private static void addEntryToZip(File source, String entryName, byte[] buffer, int length) throws IOException { - File tmpZip = Files.createTempFile(source.getName(), null).toFile(); - tmpZip.delete(); - if (!source.renameTo(tmpZip)) { - throw new IOException("Cannot create temp file: " + source.getName()); - } - FileInputStream fis = new FileInputStream(tmpZip); - ZipInputStream zin = new ZipInputStream(fis); - ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); - try { - out.putNextEntry(new ZipEntry(entryName)); - out.write(buffer, 0, length); - out.closeEntry(); + private void addFileToZip(File zipFile, File file, String fileName) throws IOException, URISyntaxException { + String entryName = fileName == null ? file.getName() : fileName; + Map<String, String> env = new HashMap<>(); + env.put("useTempFile", Boolean.toString(this.useTempFile)); + try (FileSystem fs = FileSystems.newFileSystem(getZipURI(zipFile), env)) { + Path dest = fs.getPath("/", entryName); + Files.createDirectories(dest.getParent()); + Files.copy(file.toPath(), dest, StandardCopyOption.REPLACE_EXISTING); + } + } - for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - out.putNextEntry(ze); - for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { - out.write(buffer, 0, read); - } - out.closeEntry(); - } - } finally { - IOHelper.close(fis, zin, out); + private void addEntryToZip(File zipFile, String entryName, byte[] buffer, String charset) throws IOException, URISyntaxException { + Map<String, String> env = new HashMap<>(); + env.put("encoding", charset); + env.put("useTempFile", Boolean.toString(this.useTempFile)); + try (FileSystem fs = FileSystems.newFileSystem(getZipURI(zipFile), env)) { + Path dest = fs.getPath("/", entryName); + Files.createDirectories(dest.getParent()); + Files.write(dest, buffer, StandardOpenOption.CREATE); } - tmpZip.delete(); } - + + private static URI getZipURI(File zipFile) throws URISyntaxException { + return new URI("jar", zipFile.toURI().toString(), null); + } + /** * This callback class is used to clean up the temporary ZIP file once the exchange has completed. */ diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithFilenameHeaderTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithFilenameHeaderTest.java index 7ee1d33..eb40c98 100644 --- a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithFilenameHeaderTest.java +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithFilenameHeaderTest.java @@ -33,11 +33,12 @@ import org.junit.Test; public class AggregationStrategyWithFilenameHeaderTest extends CamelTestSupport { private static final List<String> FILE_NAMES = Arrays.asList("foo", "bar"); + private static final String TEST_DIR = "target/out_AggregationStrategyWithFilenameHeaderTest"; @Override @Before public void setUp() throws Exception { - deleteDirectory("target/out"); + deleteDirectory(TEST_DIR); super.setUp(); } @@ -51,11 +52,9 @@ public class AggregationStrategyWithFilenameHeaderTest extends CamelTestSupport template.sendBodyAndHeader("bar", Exchange.FILE_NAME, FILE_NAMES.get(1)); assertMockEndpointsSatisfied(); - Thread.sleep(500); - - File[] files = new File("target/out").listFiles(); - assertTrue(files != null); - assertTrue("Should be a file in target/out directory", files.length > 0); + File[] files = new File(TEST_DIR).listFiles(); + assertNotNull(files); + assertTrue("Should be a file in " + TEST_DIR + " directory", files.length > 0); File resultFile = files[0]; @@ -83,7 +82,7 @@ public class AggregationStrategyWithFilenameHeaderTest extends CamelTestSupport .aggregate(new ZipAggregationStrategy(false, true)) .constant(true) .completionTimeout(50) - .to("file:target/out") + .to("file:" + TEST_DIR) .to("mock:aggregateToZipEntry") .log("Done processing zip file: ${header.CamelFileName}"); } diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithPreservationTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithPreservationTest.java index 6ad8157..a923716 100644 --- a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithPreservationTest.java +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/AggregationStrategyWithPreservationTest.java @@ -33,11 +33,12 @@ import org.junit.Test; public class AggregationStrategyWithPreservationTest extends CamelTestSupport { private static final int EXPECTED_NO_FILES = 5; + private static final String TEST_DIR = "target/out_AggregationStrategyWithPreservationTest"; @Override @Before public void setUp() throws Exception { - deleteDirectory("target/out"); + deleteDirectory(TEST_DIR); super.setUp(); } @@ -45,28 +46,29 @@ public class AggregationStrategyWithPreservationTest extends CamelTestSupport { public void testSplitter() throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregateToZipEntry"); mock.expectedMessageCount(1); - assertMockEndpointsSatisfied(); - Thread.sleep(500); - - File[] files = new File("target/out").listFiles(); - assertTrue("Should be a file in target/out directory", files.length > 0); + File[] files = new File(TEST_DIR).listFiles(); + assertNotNull(files); + assertTrue("Should be a file in " + TEST_DIR + " directory", files.length > 0); File resultFile = files[0]; - Set<String> expectedZipFiles = new HashSet<>(Arrays.asList("another" + File.separator + "hello.txt", - "other" + File.separator + "greetings.txt", + Set<String> expectedZipFiles = new HashSet<>(Arrays.asList("another/hello.txt", + "other/greetings.txt", "chiau.txt", "hi.txt", "hola.txt")); ZipInputStream zin = new ZipInputStream(new FileInputStream(resultFile)); try { int fileCount = 0; for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - expectedZipFiles.remove(ze.toString()); - fileCount++; + if (!ze.isDirectory()) { + assertTrue("Found unexpected entry " + ze + " in zipfile", expectedZipFiles.remove(ze.toString())); + fileCount++; + } } - assertTrue("Zip file should contains " + AggregationStrategyWithPreservationTest.EXPECTED_NO_FILES + " files", + + assertTrue(String.format("Zip file should contains %d files, got %d files", AggregationStrategyWithPreservationTest.EXPECTED_NO_FILES, fileCount), fileCount == AggregationStrategyWithPreservationTest.EXPECTED_NO_FILES); - assertEquals("Should have found all of the zip files in the file.", 0, expectedZipFiles.size()); + assertEquals("Should have found all of the zip files in the file. Remaining: " + expectedZipFiles, 0, expectedZipFiles.size()); } finally { IOHelper.close(zin); } @@ -83,7 +85,7 @@ public class AggregationStrategyWithPreservationTest extends CamelTestSupport { .constant(true) .completionFromBatchConsumer() .eagerCheckCompletion() - .to("file:target/out") + .to("file:" + TEST_DIR) .to("mock:aggregateToZipEntry") .log("Done processing zip file: ${header.CamelFileName}"); } diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java index 5914205..eddada4 100644 --- a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyEmptyFileTest.java @@ -30,12 +30,14 @@ import org.junit.Test; public class ZipAggregationStrategyEmptyFileTest extends CamelTestSupport { private static final int EXPECTED_NO_FILES = 3; + private static final String TEST_DIR = "target/out_ZipAggregationStrategyEmptyFileTest"; + @Override @Before public void setUp() throws Exception { deleteDirectory("target/foo"); - deleteDirectory("target/out"); + deleteDirectory(TEST_DIR); super.setUp(); } @@ -52,11 +54,9 @@ public class ZipAggregationStrategyEmptyFileTest extends CamelTestSupport { assertMockEndpointsSatisfied(); - Thread.sleep(500); - - File[] files = new File("target/out").listFiles(); - assertTrue(files != null); - assertTrue("Should be a file in target/out directory", files.length > 0); + File[] files = new File(TEST_DIR).listFiles(); + assertNotNull(files); + assertTrue("Should be a file in " + TEST_DIR + " directory", files.length > 0); File resultFile = files[0]; @@ -64,7 +64,7 @@ public class ZipAggregationStrategyEmptyFileTest extends CamelTestSupport { try { int fileCount = 0; for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - fileCount = fileCount + 1; + fileCount++; } assertEquals("Zip file should contains " + ZipAggregationStrategyEmptyFileTest.EXPECTED_NO_FILES + " files", ZipAggregationStrategyEmptyFileTest.EXPECTED_NO_FILES, fileCount); } finally { @@ -82,7 +82,7 @@ public class ZipAggregationStrategyEmptyFileTest extends CamelTestSupport { .constant(true) .completionSize(4) .eagerCheckCompletion() - .to("file:target/out") + .to("file:" + TEST_DIR) .to("mock:aggregateToZipEntry") .log("Done processing zip file: ${header.CamelFileName}"); } diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java index d55e839..c928e23 100644 --- a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java @@ -30,11 +30,12 @@ import org.junit.Test; public class ZipAggregationStrategyTest extends CamelTestSupport { private static final int EXPECTED_NO_FILES = 3; + private static final String TEST_DIR = "target/out_ZipAggregationStrategyTest"; @Override @Before public void setUp() throws Exception { - deleteDirectory("target/out"); + deleteDirectory(TEST_DIR); super.setUp(); } @@ -46,11 +47,9 @@ public class ZipAggregationStrategyTest extends CamelTestSupport { assertMockEndpointsSatisfied(); - Thread.sleep(500); - - File[] files = new File("target/out").listFiles(); - assertTrue(files != null); - assertTrue("Should be a file in target/out directory", files.length > 0); + File[] files = new File(TEST_DIR).listFiles(); + assertNotNull(files); + assertTrue("Should be a file in " + TEST_DIR + " directory", files.length > 0); File resultFile = files[0]; @@ -58,7 +57,7 @@ public class ZipAggregationStrategyTest extends CamelTestSupport { try { int fileCount = 0; for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - fileCount = fileCount + 1; + fileCount++; } assertEquals("Zip file should contains " + ZipAggregationStrategyTest.EXPECTED_NO_FILES + " files", ZipAggregationStrategyTest.EXPECTED_NO_FILES, fileCount); } finally { @@ -78,7 +77,7 @@ public class ZipAggregationStrategyTest extends CamelTestSupport { .constant(true) .completionFromBatchConsumer() .eagerCheckCompletion() - .to("file:target/out") + .to("file:" + TEST_DIR) .to("mock:aggregateToZipEntry") .log("Done processing zip file: ${header.CamelFileName}"); }