This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.25.x by this push:
     new 76e85ef  CAMEL-15557: Stream Cache file not deleted (#4262)
76e85ef is described below

commit 76e85efb1a951d1a1e6852fb77efe93cabeb75be
Author: forsthofer <forstho...@users.noreply.github.com>
AuthorDate: Mon Sep 21 06:38:49 2020 +0200

    CAMEL-15557: Stream Cache file not deleted (#4262)
    
    If you have a route with a Multicast with parallel processing and a
    timeout and a sub-route in the multicast which is creating an
    OutputStreamCache before the timeout and is writing to the
    OutputStreamCache after the timeout then the created file is never
    deleted from the file system.
    
    Co-authored-by: Franz Forsthofer <franz.forstho...@sap.com>
---
 .../converter/stream/FileInputStreamCache.java     | 11 ++++++
 .../MulticastParallelTimeoutStreamCachingTest.java | 40 +++++++++++++++++++++-
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index 74eacfe..f50fe0b 100644
--- 
a/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -239,6 +239,17 @@ public final class FileInputStreamCache extends 
InputStream implements StreamCac
             if (tempFile != null) {
                 throw new IllegalStateException("The method 
'createOutputStream' can only be called once!");
             }
+            if (closedOnCompletion && exchangeCounter.get() == 0) {
+                // exchange was already stopped -> in this case the tempFile 
would never be deleted.
+                // This can happen when in the splitter or Multi-cast case 
with parallel processing, the CachedOutputStream is created when the main unit 
of work
+                // is still active, but has a timeout and after the timeout 
which stops the unit of work the FileOutputStream is created.
+                // We only can throw here an Exception and inform the user 
that the processing took longer than the set timeout.
+                String error = "Cannot create a FileOutputStream for Stream 
Caching, because this FileOutputStream would never be removed from the file 
system."
+                        + " This situation can happen with a Splitter or Multi 
Cast in parallel processing if there is a timeout set on the Splitter or Multi 
Cast, "
+                        + " and the processing in a sub-branch takes longer 
than the timeout. Consider to increase the timeout.";
+                LOG.error(error);
+                throw new IOException(error);
+            }
             tempFile = FileUtil.createTempFile("cos", ".tmp", 
strategy.getSpoolDirectory());
 
             LOG.trace("Creating temporary stream cache file: {}", tempFile);
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
index 747775a..cf690f0 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FilterInputStream;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
 import org.apache.camel.ContextTestSupport;
@@ -28,6 +29,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.CachedOutputStream;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -39,7 +41,9 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
 
     protected Endpoint startEndpoint;
     protected MockEndpoint x;
-
+    protected MockEndpoint exception;
+    protected MockEndpoint y;
+    
     public static void deleteDirectory(File file) {
         if (file.isDirectory()) {
             File[] files = file.listFiles();
@@ -64,6 +68,15 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
         File[] files = f.listFiles();
         assertEquals(0, files.length);
     }
+    
+    @Test
+    public void 
testCreateOutputStreamCacheBeforeTimeoutButWriteToOutputStreamCacheAfterTimeout()
 throws Exception {
+        exception.expectedMessageCount(1);
+        y.expectedMessageCount(0);
+
+        template.sendBody("direct:b", "testMessage");
+        assertMockEndpointsSatisfied();
+    }
 
     @Before
     @Override
@@ -72,6 +85,8 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
 
         deleteDirectory(new 
File(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE));
         x = getMockEndpoint("mock:x");
+        y = getMockEndpoint("mock:y");
+        exception = getMockEndpoint("mock:exception");
     }
 
     protected RouteBuilder createRouteBuilder() {
@@ -90,6 +105,23 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
                 });
             }
         };
+        
+        final Processor processor2 = new Processor() {
+            public void process(Exchange exchange) throws IOException {
+                // create first the OutputStreamCache and then sleep
+                CachedOutputStream outputStream = new 
CachedOutputStream(exchange);
+                try {
+                    // sleep for one second so that the write to the 
CachedOutputStream happens after the main exchange has finished due to timeout 
on the multicast
+                    Thread.sleep(1000l);
+                } catch (InterruptedException e) {
+                    throw new IllegalStateException("Unexpected exception", e);
+                }
+                outputStream.write(BODY);
+                Message in = exchange.getIn();
+                // use FilterInputStream to trigger streamcaching
+                in.setBody(outputStream.getInputStream());
+            }
+        };
 
         return new RouteBuilder() {
             public void configure() {
@@ -99,10 +131,16 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
                 
context.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(false);
                 context.getStreamCachingStrategy().setSpoolThreshold(1l);
                 context.setStreamCaching(true);
+                
+                onException(IOException.class).to("mock:exception");
 
                 
from("direct:a").multicast().timeout(500l).parallelProcessing().to("direct:x");
 
                 from("direct:x").process(processor1).to("mock:x");
+                
+                
from("direct:b").multicast().timeout(500l).parallelProcessing().to("direct:y");
+
+                from("direct:y").process(processor2).to("mock:y");
             }
         };
     }

Reply via email to