This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new da8f3ab Rebuild da8f3ab is described below commit da8f3ab22af8dd628410eeb1847f06247e594863 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Sep 15 07:34:56 2020 +0200 Rebuild --- .../MulticastParallelTimeoutStreamCachingTest.java | 222 ++++++++++----------- .../converter/stream/FileInputStreamCache.java | 12 +- 2 files changed, 117 insertions(+), 117 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java index b18f86b..ce9aac7 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java @@ -1,111 +1,111 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FilterInputStream; -import java.nio.charset.StandardCharsets; - -import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.Processor; -import org.apache.camel.builder.RouteBuilder; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * - */ -public class MulticastParallelTimeoutStreamCachingTest extends ContextTestSupport { - - private static final String TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE - = "target/MulticastParallelTimeoutStreamCachingTestCache"; - private static final String bodyString = "message body"; - private static final byte[] BODY = bodyString.getBytes(StandardCharsets.UTF_8); - - @BeforeEach - public void setUp() throws Exception { - super.setUp(); - - deleteDirectory(new File(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE)); - } - - public static void deleteDirectory(File file) { - if (file.isDirectory()) { - File[] files = file.listFiles(); - for (File child : files) { - deleteDirectory(child); - } - } - - file.delete(); - } - - @Test - public void testSendingAMessageUsingMulticastConvertsToReReadable() throws Exception { - getMockEndpoint("mock:x").expectedBodiesReceived(bodyString); - - template.sendBody("direct:a", "testMessage"); - assertMockEndpointsSatisfied(); - - File f = new File(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE); - assertTrue(f.isDirectory()); - Thread.sleep(500l); // deletion happens asynchron - File[] files = f.listFiles(); - assertEquals(0, files.length); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - final Processor processor1 = new Processor() { - public void process(Exchange exchange) { - try { - // sleep for one second so that the stream cache is built after the main exchange has finished due to timeout on the multicast - Thread.sleep(1000l); - } catch (InterruptedException e) { - throw new IllegalStateException("Unexpected exception", e); - } - Message in = exchange.getIn(); - // use FilterInputStream to trigger streamcaching - in.setBody(new FilterInputStream(new ByteArrayInputStream(BODY)) { - - }); - } - }; - - return new RouteBuilder() { - public void configure() { - // enable stream caching - context.getStreamCachingStrategy() - .setSpoolDirectory(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE); - context.getStreamCachingStrategy().setEnabled(true); - context.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(false); - context.getStreamCachingStrategy().setSpoolThreshold(1l); - context.setStreamCaching(true); - - from("direct:a").multicast().timeout(500l).parallelProcessing().to("direct:x"); - - from("direct:x").process(processor1).to("mock:x"); - } - }; - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FilterInputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * + */ +public class MulticastParallelTimeoutStreamCachingTest extends ContextTestSupport { + + private static final String TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE + = "target/MulticastParallelTimeoutStreamCachingTestCache"; + private static final String bodyString = "message body"; + private static final byte[] BODY = bodyString.getBytes(StandardCharsets.UTF_8); + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + + deleteDirectory(new File(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE)); + } + + public static void deleteDirectory(File file) { + if (file.isDirectory()) { + File[] files = file.listFiles(); + for (File child : files) { + deleteDirectory(child); + } + } + + file.delete(); + } + + @Test + public void testSendingAMessageUsingMulticastConvertsToReReadable() throws Exception { + getMockEndpoint("mock:x").expectedBodiesReceived(bodyString); + + template.sendBody("direct:a", "testMessage"); + assertMockEndpointsSatisfied(); + + File f = new File(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE); + assertTrue(f.isDirectory()); + Thread.sleep(500l); // deletion happens asynchron + File[] files = f.listFiles(); + assertEquals(0, files.length); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + final Processor processor1 = new Processor() { + public void process(Exchange exchange) { + try { + // sleep for one second so that the stream cache is built after the main exchange has finished due to timeout on the multicast + Thread.sleep(1000l); + } catch (InterruptedException e) { + throw new IllegalStateException("Unexpected exception", e); + } + Message in = exchange.getIn(); + // use FilterInputStream to trigger streamcaching + in.setBody(new FilterInputStream(new ByteArrayInputStream(BODY)) { + + }); + } + }; + + return new RouteBuilder() { + public void configure() { + // enable stream caching + context.getStreamCachingStrategy() + .setSpoolDirectory(TARGET_MULTICAST_PARALLEL_TIMEOUT_STREAM_CACHING_TEST_CACHE); + context.getStreamCachingStrategy().setEnabled(true); + context.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(false); + context.getStreamCachingStrategy().setSpoolThreshold(1l); + context.setStreamCaching(true); + + from("direct:a").multicast().timeout(500l).parallelProcessing().to("direct:x"); + + from("direct:x").process(processor1).to("mock:x"); + } + }; + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index dbcdb74..6e77c91 100644 --- a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -153,9 +153,8 @@ public final class FileInputStreamCache extends InputStream implements StreamCac /** * Manages the temporary file for the file input stream caches. * - * Collects all FileInputStreamCache instances of the temporary file. Counts - * the number of exchanges which have a FileInputStreamCache instance of the - * temporary file. Deletes the temporary file, if all exchanges are done. + * Collects all FileInputStreamCache instances of the temporary file. Counts the number of exchanges which have a + * FileInputStreamCache instance of the temporary file. Deletes the temporary file, if all exchanges are done. * * @see CachedOutputStream */ @@ -163,8 +162,7 @@ public final class FileInputStreamCache extends InputStream implements StreamCac private static final Logger LOG = LoggerFactory.getLogger(TempFileManager.class); /** - * Indicator whether the file input stream caches are closed on - * completion of the exchanges. + * Indicator whether the file input stream caches are closed on completion of the exchanges. */ private final boolean closedOnCompletion; private AtomicInteger exchangeCounter = new AtomicInteger(); @@ -218,7 +216,9 @@ public final class FileInputStreamCache extends InputStream implements StreamCac try { cleanUpTempFile(); } catch (Exception e) { - LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); + LOG.warn("Error deleting temporary cache file: " + tempFile + + ". This exception will be ignored.", + e); } } }