CAMEL-8246: camel-zipfile zip aggregation strategy should preserve headers/properties as the other does.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/524a5b31 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/524a5b31 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/524a5b31 Branch: refs/heads/camel-2.14.x Commit: 524a5b3174c5f762818523b7d9205c62311e6fa9 Parents: adbb193 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jan 16 08:44:44 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jan 16 08:45:06 2015 +0100 ---------------------------------------------------------------------- .../zipfile/ZipAggregationStrategy.java | 34 ++++++++------------ .../zipfile/ZipAggregationStrategyTest.java | 8 +++-- 2 files changed, 18 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/524a5b31/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java index 208f727..d8fe877 100644 --- a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java +++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java @@ -31,10 +31,10 @@ import org.apache.camel.component.file.FileConsumer; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileMessage; import org.apache.camel.component.file.GenericFileOperationFailedException; -import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.FileUtil; +import org.apache.camel.util.IOHelper; /** * This aggregation strategy will aggregate all incoming messages into a ZIP file. @@ -43,7 +43,6 @@ import org.apache.camel.util.FileUtil; * array and the ZIP entry will be named using the message id.</p> * <p><b>Note:</b> Please note that this aggregation strategy requires eager * completion check to work properly.</p> - * */ public class ZipAggregationStrategy implements AggregationStrategy { @@ -111,8 +110,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { } catch (IOException e) { throw new GenericFileOperationFailedException(e.getMessage(), e); } - DefaultEndpoint endpoint = (DefaultEndpoint) newExchange.getFromEndpoint(); - answer = endpoint.createExchange(); + answer = newExchange; answer.addOnCompletion(new DeleteZipFileOnCompletion(zipFile)); } else { zipFile = oldExchange.getIn().getBody(File.class); @@ -121,7 +119,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { // Handle GenericFileMessages if (GenericFileMessage.class.isAssignableFrom(newExchange.getIn().getClass())) { try { - File appendFile = newExchange.getIn().getBody(File.class); + File appendFile = newExchange.getIn().getMandatoryBody(File.class); if (appendFile != null) { addFileToZip(zipFile, appendFile, this.preserveFolderStructure ? newExchange.getIn().toString() : null); GenericFile<File> genericFile = @@ -130,21 +128,19 @@ public class ZipAggregationStrategy implements AggregationStrategy { zipFile, Charset.defaultCharset().toString()); genericFile.bindToExchange(answer); - } else { - throw new GenericFileOperationFailedException("Could not get body as file."); } - } catch (IOException e) { + } catch (Exception e) { throw new GenericFileOperationFailedException(e.getMessage(), e); } } else { // Handle all other messages - byte[] buffer = newExchange.getIn().getBody(byte[].class); try { + byte[] buffer = newExchange.getIn().getMandatoryBody(byte[].class); addEntryToZip(zipFile, newExchange.getIn().getMessageId(), buffer, buffer.length); GenericFile<File> genericFile = FileConsumer.asGenericFile( zipFile.getParent(), zipFile, Charset.defaultCharset().toString()); genericFile.bindToExchange(answer); - } catch (IOException e) { + } catch (Exception e) { throw new GenericFileOperationFailedException(e.getMessage(), e); } } @@ -158,7 +154,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { if (!source.renameTo(tmpZip)) { throw new IOException("Could not make temp file (" + source.getName() + ")"); } - byte[] buffer = new byte[1024]; + byte[] buffer = new byte[8192]; ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); @@ -168,7 +164,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { out.write(buffer, 0, read); } out.closeEntry(); - in.close(); + IOHelper.close(in); for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { out.putNextEntry(ze); @@ -177,17 +173,15 @@ public class ZipAggregationStrategy implements AggregationStrategy { } out.closeEntry(); } - zin.close(); - out.close(); + IOHelper.close(zin, out); tmpZip.delete(); } private static void addEntryToZip(File source, String entryName, byte[] buffer, int length) throws IOException { - File tmpZip = File.createTempFile(source.getName(), null); tmpZip.delete(); if (!source.renameTo(tmpZip)) { - throw new IOException("Could not make temp file (" + source.getName() + ")"); + throw new IOException("Cannot create temp file: " + source.getName()); } ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); @@ -203,18 +197,16 @@ public class ZipAggregationStrategy implements AggregationStrategy { } out.closeEntry(); } - zin.close(); - out.close(); + IOHelper.close(zin, out); tmpZip.delete(); } /** * This callback class is used to clean up the temporary ZIP file once the exchange has completed. - * */ private class DeleteZipFileOnCompletion implements Synchronization { - private File fileToDelete; + private final File fileToDelete; public DeleteZipFileOnCompletion(File fileToDelete) { this.fileToDelete = fileToDelete; @@ -222,7 +214,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { @Override public void onFailure(Exchange exchange) { - // Keep the file if somthing gone a miss. + // Keep the file if something gone a miss. } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/524a5b31/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java index 9141984..088db7f 100644 --- a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java @@ -41,12 +41,14 @@ public class ZipAggregationStrategyTest extends CamelTestSupport { public void testSplitter() throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregateToZipEntry"); mock.expectedMessageCount(1); + mock.expectedHeaderReceived("foo", "bar"); assertMockEndpointsSatisfied(); Thread.sleep(500); File[] files = new File("target/out").listFiles(); + assertTrue(files != null); assertTrue("Should be a file in target/out directory", files.length > 0); File resultFile = files[0]; @@ -55,10 +57,9 @@ public class ZipAggregationStrategyTest extends CamelTestSupport { try { int fileCount = 0; for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { - fileCount++; + fileCount = fileCount + 1; } - assertTrue("Zip file should contains " + ZipAggregationStrategyTest.EXPECTED_NO_FILES + " files", - fileCount == ZipAggregationStrategyTest.EXPECTED_NO_FILES); + assertEquals("Zip file should contains " + ZipAggregationStrategyTest.EXPECTED_NO_FILES + " files", ZipAggregationStrategyTest.EXPECTED_NO_FILES, fileCount); } finally { IOHelper.close(zin); } @@ -71,6 +72,7 @@ public class ZipAggregationStrategyTest extends CamelTestSupport { public void configure() throws Exception { // Unzip file and Split it according to FileEntry from("file:src/test/resources/org/apache/camel/aggregate/zipfile/data?consumer.delay=1000&noop=true") + .setHeader("foo", constant("bar")) .aggregate(new ZipAggregationStrategy()) .constant(true) .completionFromBatchConsumer()