CAMEL-9905: TarAggregationStragegy should delete temporary files

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0811dc59
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0811dc59
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0811dc59

Branch: refs/heads/camel-2.17.x
Commit: 0811dc59904a1602b4c541595db45f07f5f83019
Parents: 472d2f9
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Apr 22 20:58:55 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Apr 22 20:59:23 2016 +0200

----------------------------------------------------------------------
 .../tarfile/TarAggregationStrategy.java         | 72 ++++++++++++--------
 ...gregationStrategyWithFilenameHeaderTest.java |  6 +-
 ...AggregationStrategyWithPreservationTest.java |  6 +-
 .../tarfile/TarAggregationStrategyTest.java     |  6 +-
 4 files changed, 60 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0811dc59/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
 
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
index ae4ba2b..5394076 100644
--- 
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
+++ 
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
@@ -38,6 +38,8 @@ import 
org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.compress.utils.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This aggregation strategy will aggregate all incoming messages into a TAR 
file.
@@ -54,10 +56,13 @@ import org.apache.commons.compress.utils.IOUtils;
  */
 public class TarAggregationStrategy implements AggregationStrategy {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TarAggregationStrategy.class);
+
     private String filePrefix;
     private String fileSuffix = ".tar";
     private boolean preserveFolderStructure;
     private boolean useFilenameHeader;
+    private File parentDir = new File(System.getProperty("java.io.tmpdir"));
 
     public TarAggregationStrategy() {
         this(false, false);
@@ -82,38 +87,46 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
         this.useFilenameHeader = useFilenameHeader;
     }
 
-    /**
-     * Gets the prefix used when creating the TAR file name.
-     * @return the prefix
-     */
     public String getFilePrefix() {
         return filePrefix;
     }
 
     /**
      * Sets the prefix that will be used when creating the TAR filename.
-     * @param filePrefix prefix to use on TAR file.
      */
     public void setFilePrefix(String filePrefix) {
         this.filePrefix = filePrefix;
     }
 
-    /**
-     * Gets the suffix used when creating the TAR file name.
-     * @return the suffix
-     */
     public String getFileSuffix() {
         return fileSuffix;
     }
 
     /**
      * Sets the suffix that will be used when creating the ZIP filename.
-     * @param fileSuffix suffix to use on ZIP file.
      */
     public void setFileSuffix(String fileSuffix) {
         this.fileSuffix = fileSuffix;
     }
 
+    public File getParentDir() {
+        return parentDir;
+    }
+
+    /**
+     * Sets the parent directory to use for writing temporary files.
+     */
+    public void setParentDir(File parentDir) {
+        this.parentDir = parentDir;
+    }
+
+    /**
+     * Sets the parent directory to use for writing temporary files.
+     */
+    public void setParentDir(String parentDir) {
+        this.parentDir = new File(parentDir);
+    }
+
     @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         File tarFile;
@@ -127,7 +140,8 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
         // First time for this aggregation
         if (oldExchange == null) {
             try {
-                tarFile = FileUtil.createTempFile(this.filePrefix, 
this.fileSuffix, new File(System.getProperty("java.io.tmpdir")));
+                tarFile = FileUtil.createTempFile(this.filePrefix, 
this.fileSuffix, parentDir);
+                LOG.trace("Created temporary file: {}", tarFile);
             } catch (IOException e) {
                 throw new GenericFileOperationFailedException(e.getMessage(), 
e);
             }
@@ -147,7 +161,8 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
                             FileConsumer.asGenericFile(
                                     tarFile.getParent(),
                                     tarFile,
-                                    null);    // Do not set charset here, that 
will cause the tar file to be handled as ASCII later which breaks it..
+                                    null,      // Do not set charset here, 
that will cause the tar file to be handled as ASCII later which breaks it..
+                                    false);
                     genericFile.bindToExchange(answer);
                 }
             } catch (Exception e) {
@@ -157,11 +172,9 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
             // Handle all other messages
             try {
                 byte[] buffer = 
newExchange.getIn().getMandatoryBody(byte[].class);
-                String entryName = useFilenameHeader ? 
newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : 
newExchange.getIn()
-                        .getMessageId();
+                String entryName = useFilenameHeader ? 
newExchange.getIn().getHeader(Exchange.FILE_NAME, String.class) : 
newExchange.getIn().getMessageId();
                 addEntryToTar(tarFile, entryName, buffer, buffer.length);
-                GenericFile<File> genericFile = FileConsumer.asGenericFile(
-                        tarFile.getParent(), tarFile, null);
+                GenericFile<File> genericFile = 
FileConsumer.asGenericFile(tarFile.getParent(), tarFile, null, false);
                 genericFile.bindToExchange(answer);
             } catch (Exception e) {
                 throw new GenericFileOperationFailedException(e.getMessage(), 
e);
@@ -171,14 +184,15 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
         return answer;
     }
 
-    private static void addFileToTar(File source, File file, String fileName) 
throws IOException, ArchiveException {
-        File tmpTar = File.createTempFile(source.getName(), null);
+    private void addFileToTar(File source, File file, String fileName) throws 
IOException, ArchiveException {
+        File tmpTar = File.createTempFile(source.getName(), null, parentDir);
         tmpTar.delete();
         if (!source.renameTo(tmpTar)) {
             throw new IOException("Could not make temp file (" + 
source.getName() + ")");
         }
 
-        TarArchiveInputStream tin = (TarArchiveInputStream) new 
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, new 
FileInputStream(tmpTar));
+        FileInputStream fis = new FileInputStream(tmpTar);
+        TarArchiveInputStream tin = (TarArchiveInputStream) new 
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, fis);
         TarArchiveOutputStream tos = new TarArchiveOutputStream(new 
FileOutputStream(source));
         tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
         tos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
@@ -200,18 +214,20 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
         IOUtils.copy(in, tos);
         tos.closeArchiveEntry();
 
-        IOHelper.close(in);
-        IOHelper.close(tin);
-        IOHelper.close(tos);
+        IOHelper.close(fis, in, tin, tos);
+        LOG.trace("Deleting temporary file: {}", tmpTar);
+        FileUtil.deleteFile(tmpTar);
     }
 
-    private static void addEntryToTar(File source, String entryName, byte[] 
buffer, int length) throws IOException, ArchiveException {
-        File tmpTar = File.createTempFile(source.getName(), null);
+    private void addEntryToTar(File source, String entryName, byte[] buffer, 
int length) throws IOException, ArchiveException {
+        File tmpTar = File.createTempFile(source.getName(), null, parentDir);
         tmpTar.delete();
         if (!source.renameTo(tmpTar)) {
             throw new IOException("Cannot create temp file: " + 
source.getName());
         }
-        TarArchiveInputStream tin = (TarArchiveInputStream) new 
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, new 
FileInputStream(tmpTar));
+
+        FileInputStream fis = new FileInputStream(tmpTar);
+        TarArchiveInputStream tin = (TarArchiveInputStream) new 
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.TAR, fis);
         TarArchiveOutputStream tos = new TarArchiveOutputStream(new 
FileOutputStream(source));
         tos.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
         tos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
@@ -231,8 +247,9 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
         tos.write(buffer, 0, length);
         tos.closeArchiveEntry();
 
-        IOHelper.close(tin);
-        IOHelper.close(tos);
+        IOHelper.close(fis, tin, tos);
+        LOG.trace("Deleting temporary file: {}", tmpTar);
+        FileUtil.deleteFile(tmpTar);
     }
 
     /**
@@ -253,6 +270,7 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
 
         @Override
         public void onComplete(Exchange exchange) {
+            LOG.debug("Deleting tar file on completion: {} ", 
this.fileToDelete);
             FileUtil.deleteFile(this.fileToDelete);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/0811dc59/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java
 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java
index 84ed738..65fbfb0 100644
--- 
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java
+++ 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithFilenameHeaderTest.java
@@ -36,8 +36,12 @@ public class AggregationStrategyWithFilenameHeaderTest 
extends CamelTestSupport
 
     private static final List<String> FILE_NAMES = Arrays.asList("foo", "bar");
 
+    private TarAggregationStrategy tar = new TarAggregationStrategy(false, 
true);
+
     @Override
     public void setUp() throws Exception {
+        tar.setParentDir("target/temp");
+        deleteDirectory("target/temp");
         deleteDirectory("target/out");
         super.setUp();
     }
@@ -81,7 +85,7 @@ public class AggregationStrategyWithFilenameHeaderTest 
extends CamelTestSupport
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                        .aggregate(new TarAggregationStrategy(false, true))
+                        .aggregate(tar)
                         .constant(true)
                         .completionTimeout(50)
                             .to("file:target/out")

http://git-wip-us.apache.org/repos/asf/camel/blob/0811dc59/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java
 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java
index 603511b..b430f5e 100644
--- 
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java
+++ 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/AggregationStrategyWithPreservationTest.java
@@ -34,8 +34,12 @@ public class AggregationStrategyWithPreservationTest extends 
CamelTestSupport {
 
     private static final int EXPECTED_NO_FILES = 5;
 
+    private TarAggregationStrategy tar = new TarAggregationStrategy(true, 
true);
+
     @Override
     public void setUp() throws Exception {
+        tar.setParentDir("target/temp");
+        deleteDirectory("target/temp");
         deleteDirectory("target/out");
         super.setUp();
     }
@@ -83,7 +87,7 @@ public class AggregationStrategyWithPreservationTest extends 
CamelTestSupport {
             public void configure() throws Exception {
                 // Untar file and Split it according to FileEntry
                 
from("file:src/test/resources/org/apache/camel/aggregate/tarfile/data?consumer.delay=1000&noop=true&recursive=true")
-                        .aggregate(new TarAggregationStrategy(true, true))
+                        .aggregate(tar)
                         .constant(true)
                         .completionFromBatchConsumer()
                         .eagerCheckCompletion()

http://git-wip-us.apache.org/repos/asf/camel/blob/0811dc59/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java
 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java
index b0aaec7..f451cec 100644
--- 
a/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java
+++ 
b/components/camel-tarfile/src/test/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategyTest.java
@@ -31,8 +31,12 @@ public class TarAggregationStrategyTest extends 
CamelTestSupport {
 
     private static final int EXPECTED_NO_FILES = 3;
 
+    private TarAggregationStrategy tar = new TarAggregationStrategy();
+
     @Override
     public void setUp() throws Exception {
+        tar.setParentDir("target/temp");
+        deleteDirectory("target/temp");
         deleteDirectory("target/out");
         super.setUp();
     }
@@ -73,7 +77,7 @@ public class TarAggregationStrategyTest extends 
CamelTestSupport {
                 // Untar file and Split it according to FileEntry
                 
from("file:src/test/resources/org/apache/camel/aggregate/tarfile/data?consumer.delay=1000&noop=true")
                     .setHeader("foo", constant("bar"))
-                    .aggregate(new TarAggregationStrategy())
+                    .aggregate(tar)
                         .constant(true)
                         .completionFromBatchConsumer()
                         .eagerCheckCompletion()

Reply via email to