Repository: camel Updated Branches: refs/heads/camel-2.17.x 472d2f937 -> 0811dc599 refs/heads/master 12530e8e6 -> 41249ba9d
CAMEL-9905: TarAggregationStragegy should delete temporary files Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/41249ba9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/41249ba9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/41249ba9 Branch: refs/heads/master Commit: 41249ba9daa689852ed45c2e954efc28a3f1d730 Parents: 12530e8 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 22 20:58:55 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 22 20:58:55 2016 +0200 ---------------------------------------------------------------------- .../tarfile/TarAggregationStrategy.java | 72 ++++++++++++-------- ...gregationStrategyWithFilenameHeaderTest.java | 6 +- ...AggregationStrategyWithPreservationTest.java | 6 +- .../tarfile/TarAggregationStrategyTest.java | 6 +- 4 files changed, 60 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/41249ba9/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java index ae4ba2b..5394076 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java @@ -38,6 +38,8 @@ import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.compress.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This aggregation strategy will aggregate all incoming messages into a TAR file. @@ -54,10 +56,13 @@ import org.apache.commons.compress.utils.IOUtils; */ public class TarAggregationStrategy implements AggregationStrategy { + private static final Logger LOG = LoggerFactory.getLogger(TarAggregationStrategy.class); + private String filePrefix; private String fileSuffix = ".tar"; private boolean preserveFolderStructure; private boolean useFilenameHeader; + private File parentDir = new File(System.getProperty("java.io.tmpdir")); public TarAggregationStrategy() { this(false, false); @@ -82,38 +87,46 @@ public class TarAggregationStrategy implements AggregationStrategy { this.useFilenameHeader = useFilenameHeader; } - /** - * Gets the prefix used when creating the TAR file name. - * @return the prefix - */ public String getFilePrefix() { return filePrefix; } /** * Sets the prefix that will be used when creating the TAR filename. - * @param filePrefix prefix to use on TAR file. */ public void setFilePrefix(String filePrefix) { this.filePrefix = filePrefix; } - /** - * Gets the suffix used when creating the TAR file name. - * @return the suffix - */ public String getFileSuffix() { return fileSuffix; } /** * Sets the suffix that will be used when creating the ZIP filename. - * @param fileSuffix suffix to use on ZIP file. */ public void setFileSuffix(String fileSuffix) { this.fileSuffix = fileSuffix; } + public File getParentDir() { + return parentDir; + } + + /** + * Sets the parent directory to use for writing temporary files. + */ + public void setParentDir(File parentDir) { + this.parentDir = parentDir; + } + + /** + * Sets the parent directory to use for writing temporary files. + */ + public void setParentDir(String parentDir) { + this.parentDir = new File(parentDir); + } + @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { File tarFile; @@ -127,7 +140,8 @@ public class TarAggregationStrategy implements AggregationStrategy { // First time for this aggregation if (oldExchange == null) { try { - tarFile = FileUtil.createTempFile(this.filePrefix, this.fileSuffix, new File(System.getProperty("java.io.tmpdir"))); + tarFile = FileUtil.createTempFile(this.filePrefix, this.fileSuffix, parentDir); + LOG.trace("Created temporary file: {}", tarFile); } catch (IOException e) { throw new GenericFileOperationFailedException(e.getMessage(), e); } @@ -147,7 +161,8 @@ public class TarAggregationStrategy implements AggregationStrategy { FileConsumer.asGenericFile( tarFile.getParent(), tarFile, - null); // Do not set charset here, that will cause the tar file to be handled as ASCII later which breaks it.. + null, // Do not set charset here, that will cause the tar file to be handled as ASCII later which breaks it.. + false); genericFile.bindToExchange(answer); } } catch (Exception e) { @@ -157,11 +172,9 @@ public class TarAggregationStrategy implements AggregationStrategy { // Handle all other messages try { byte[] buffer = newExchange.getIn().getMandatoryBody(byte[].class); - String entryName = useFilenameHeader ? newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : newExchange.getIn() - .getMessageId(); + String entryName = useFilenameHeader ? newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : newExchange.getIn().getMessageId(); addEntryToTar(tarFile, entryName, buffer, buffer.length); - GenericFile<File> genericFile = FileConsumer.asGenericFile( - tarFile.getParent(), tarFile, null); + GenericFile<File> genericFile = FileConsumer.asGenericFile(tarFile.getParent(), tarFile, null, false); genericFile.bindToExchange(answer); } catch (Exception e) { throw new GenericFileOperationFailedException(e.getMessage(), e); @@ -171,14 +184,15 @@ public class TarAggregationStrategy implements AggregationStrategy { return answer; } - private static void addFileToTar(File source, File file, String fileName) throws IOException, ArchiveException { - File tmpTar = File.createTempFile(source.getName(), null); + private void addFileToTar(File source, File file, String fileName) throws IOException, ArchiveException { + File tmpTar = File.createTempFile(source.getName(), null, parentDir); tmpTar.delete(); if (!source.renameTo(tmpTar)) { throw new IOException("Could not make temp file (" + source.getName() + ")"); } - TarArchiveInputStream tin = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, new FileInputStream(tmpTar)); + FileInputStream fis = new FileInputStream(tmpTar); + TarArchiveInputStream tin = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, fis); TarArchiveOutputStream tos = new TarArchiveOutputStream(new FileOutputStream(source)); tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); tos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); @@ -200,18 +214,20 @@ public class TarAggregationStrategy implements AggregationStrategy { IOUtils.copy(in, tos); tos.closeArchiveEntry(); - IOHelper.close(in); - IOHelper.close(tin); - IOHelper.close(tos); + IOHelper.close(fis, in, tin, tos); + LOG.trace("Deleting temporary file: {}", tmpTar); + FileUtil.deleteFile(tmpTar); } - private static void addEntryToTar(File source, String entryName, byte[] buffer, int length) throws IOException, ArchiveException { - File tmpTar = File.createTempFile(source.getName(), null); + private void addEntryToTar(File source, String entryName, byte[] buffer, int length) throws IOException, ArchiveException { + File tmpTar = File.createTempFile(source.getName(), null, parentDir); tmpTar.delete(); if (!source.renameTo(tmpTar)) { throw new IOException("Cannot create temp file: " + source.getName()); } - TarArchiveInputStream tin = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, new FileInputStream(tmpTar)); + + FileInputStream fis = new FileInputStream(tmpTar); + TarArchiveInputStream tin = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, fis); TarArchiveOutputStream tos = new TarArchiveOutputStream(new FileOutputStream(source)); tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); tos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX); @@ -231,8 +247,9 @@ public class TarAggregationStrategy implements AggregationStrategy { tos.write(buffer, 0, length); tos.closeArchiveEntry(); - IOHelper.close(tin); - IOHelper.close(tos); + IOHelper.close(fis, tin, tos); + LOG.trace("Deleting temporary file: {}", tmpTar); + FileUtil.deleteFile(tmpTar); } /** @@ -253,6 +270,7 @@ public class TarAggregationStrategy implements AggregationStrategy { @Override public void onComplete(Exchange exchange) { + LOG.debug("Deleting tar file on completion: {} ", this.fileToDelete); FileUtil.deleteFile(this.fileToDelete); } } http://git-wip-us.apache.org/repos/asf/camel/blob/41249ba9/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java index 84ed738..65fbfb0 100644 --- a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java +++ b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java @@ -36,8 +36,12 @@ public class AggregationStrategyWithFilenameHeaderTest extends CamelTestSupport private static final List<String> FILE_NAMES = Arrays.asList("foo", "bar"); + private TarAggregationStrategy tar = new TarAggregationStrategy(false, true); + @Override public void setUp() throws Exception { + tar.setParentDir("target/temp"); + deleteDirectory("target/temp"); deleteDirectory("target/out"); super.setUp(); } @@ -81,7 +85,7 @@ public class AggregationStrategyWithFilenameHeaderTest extends CamelTestSupport @Override public void configure() throws Exception { from("direct:start") - .aggregate(new TarAggregationStrategy(false, true)) + .aggregate(tar) .constant(true) .completionTimeout(50) .to("file:target/out") http://git-wip-us.apache.org/repos/asf/camel/blob/41249ba9/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java index 603511b..b430f5e 100644 --- a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java +++ b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java @@ -34,8 +34,12 @@ public class AggregationStrategyWithPreservationTest extends CamelTestSupport { private static final int EXPECTED_NO_FILES = 5; + private TarAggregationStrategy tar = new TarAggregationStrategy(true, true); + @Override public void setUp() throws Exception { + tar.setParentDir("target/temp"); + deleteDirectory("target/temp"); deleteDirectory("target/out"); super.setUp(); } @@ -83,7 +87,7 @@ public class AggregationStrategyWithPreservationTest extends CamelTestSupport { public void configure() throws Exception { // Untar file and Split it according to FileEntry from("file:src/test/resources/org/apache/camel/aggregate/tarfile/data?consumer.delay=1000&noop=true&recursive=true") - .aggregate(new TarAggregationStrategy(true, true)) + .aggregate(tar) .constant(true) .completionFromBatchConsumer() .eagerCheckCompletion() http://git-wip-us.apache.org/repos/asf/camel/blob/41249ba9/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java index b0aaec7..f451cec 100644 --- a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java +++ b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java @@ -31,8 +31,12 @@ public class TarAggregationStrategyTest extends CamelTestSupport { private static final int EXPECTED_NO_FILES = 3; + private TarAggregationStrategy tar = new TarAggregationStrategy(); + @Override public void setUp() throws Exception { + tar.setParentDir("target/temp"); + deleteDirectory("target/temp"); deleteDirectory("target/out"); super.setUp(); } @@ -73,7 +77,7 @@ public class TarAggregationStrategyTest extends CamelTestSupport { // Untar file and Split it according to FileEntry from("file:src/test/resources/org/apache/camel/aggregate/tarfile/data?consumer.delay=1000&noop=true") .setHeader("foo", constant("bar")) - .aggregate(new TarAggregationStrategy()) + .aggregate(tar) .constant(true) .completionFromBatchConsumer() .eagerCheckCompletion()