This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 7edd162 CAMEL-15532: Stream Cache file not deleted (#4224) 7edd162 is described below commit 7edd162fa997b2040ec0f7f58405d57b011db884 Author: forsthofer <forstho...@users.noreply.github.com> AuthorDate: Tue Sep 15 07:29:54 2020 +0200 CAMEL-15532: Stream Cache file not deleted (#4224) Co-authored-by: Franz Forsthofer <franz.forstho...@sap.com> --- .../MulticastParallelTimeoutStreamCachingTest.java | 111 +++++++++++++++++++++ .../converter/stream/FileInputStreamCache.java | 20 ++-- 2 files changed, 124 insertions(+), 7 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java new file mode 100644 index 0000000..b18f86b --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FilterInputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * + */ +public class MulticastParallelTimeoutStreamCachingTest extends ContextTestSupport { + + private static final String TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE + = "target/MulticastParallelTimeoutStreamCachingTestCache"; + private static final String bodyString = "message body"; + private static final byte[] BODY = bodyString.getBytes(StandardCharsets.UTF_8); + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + + deleteDirectory(new File(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE)); + } + + public static void deleteDirectory(File file) { + if (file.isDirectory()) { + File[] files = file.listFiles(); + for (File child : files) { + deleteDirectory(child); + } + } + + file.delete(); + } + + @Test + public void testSendingAMessageUsingMulticastConvertsToReReadable() throws Exception { + getMockEndpoint("mock:x").expectedBodiesReceived(bodyString); + + template.sendBody("direct:a", "testMessage"); + assertMockEndpointsSatisfied(); + + File f = new File(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE); + assertTrue(f.isDirectory()); + Thread.sleep(500l); // deletion happens asynchron + File[] files = f.listFiles(); + assertEquals(0, files.length); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + final Processor processor1 = new Processor() { + public void process(Exchange exchange) { + try { + // sleep for one second so that the stream cache is built after the main exchange has finished due to timeout on the multicast + Thread.sleep(1000l); + } catch (InterruptedException e) { + throw new IllegalStateException("Unexpected exception", e); + } + Message in = exchange.getIn(); + // use FilterInputStream to trigger streamcaching + in.setBody(new FilterInputStream(new ByteArrayInputStream(BODY)) { + + }); + } + }; + + return new RouteBuilder() { + public void configure() { + // enable stream caching + context.getStreamCachingStrategy() + .setSpoolDirectory(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE); + context.getStreamCachingStrategy().setEnabled(true); + context.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(false); + context.getStreamCachingStrategy().setSpoolThreshold(1l); + context.setStreamCaching(true); + + from("direct:a").multicast().timeout(500l).parallelProcessing().to("direct:x"); + + from("direct:x").process(processor1).to("mock:x"); + } + }; + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index 7dd3754..dbcdb74 100644 --- a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -153,15 +153,19 @@ public final class FileInputStreamCache extends InputStream implements StreamCac /** * Manages the temporary file for the file input stream caches. * - * Collects all FileInputStreamCache instances of the temporary file. Counts the number of exchanges which have a - * FileInputStreamCache instance of the temporary file. Deletes the temporary file, if all exchanges are done. + * Collects all FileInputStreamCache instances of the temporary file. Counts + * the number of exchanges which have a FileInputStreamCache instance of the + * temporary file. Deletes the temporary file, if all exchanges are done. * * @see CachedOutputStream */ static class TempFileManager { private static final Logger LOG = LoggerFactory.getLogger(TempFileManager.class); - /** Indicator whether the file input stream caches are closed on completion of the exchanges. */ + /** + * Indicator whether the file input stream caches are closed on + * completion of the exchanges. + */ private final boolean closedOnCompletion; private AtomicInteger exchangeCounter = new AtomicInteger(); private File tempFile; @@ -214,9 +218,7 @@ public final class FileInputStreamCache extends InputStream implements StreamCac try { cleanUpTempFile(); } catch (Exception e) { - LOG.warn("Error deleting temporary cache file: " + tempFile - + ". This exception will be ignored.", - e); + LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); } } } @@ -227,11 +229,15 @@ public final class FileInputStreamCache extends InputStream implements StreamCac } }; UnitOfWork streamCacheUnitOfWork = exchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, UnitOfWork.class); - if (streamCacheUnitOfWork != null) { + if (streamCacheUnitOfWork != null && streamCacheUnitOfWork.getRoute() != null) { // The stream cache must sometimes not be closed when the exchange is deleted. This is for example the // case in the splitter and multi-cast case with AggregationStrategy where the result of the sub-routes // are aggregated later in the main route. Here, the cached streams of the sub-routes must be closed with // the Unit of Work of the main route. + // streamCacheUnitOfWork.getRoute() != null means that the unit of work is still active and the done method + // was not yet called: It can happen that streamCacheUnitOfWork.getRoute() == null in the split or + // multi-cast case when there is a timeout on the main route and an exchange of the sub-route is added after + // the timeout. This we have to avoid because the stream cache would never be closed then. streamCacheUnitOfWork.addSynchronization(onCompletion); } else { // add on completion so we can cleanup after the exchange is done such as deleting temporary files