This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new f670b62 CAMEL-16177: multicast parallel processing and encrypted stream cache (#5065) f670b62 is described below commit f670b627fb88774ad235e1f419cd48e20f4f0862 Author: forsthofer <forstho...@users.noreply.github.com> AuthorDate: Thu Feb 11 17:30:45 2021 +0100 CAMEL-16177: multicast parallel processing and encrypted stream cache (#5065) * CAMEL-16177: multicast parallel processing and encrypted stream cache * Update CipherPair.java * CAMEL-16177: correct error message Co-authored-by: Franz Forsthofer <franz.forstho...@sap.com> --- ...ParallelAndStreamCachingWithEncryptionTest.java | 113 +++++++++++++++++++++ .../org/apache/camel/processor/payload10KB.txt | 1 + .../apache/camel/converter/stream/CipherPair.java | 24 +++-- .../converter/stream/FileInputStreamCache.java | 2 +- 4 files changed, 132 insertions(+), 8 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingWithEncryptionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingWithEncryptionTest.java new file mode 100644 index 0000000..952328f --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingWithEncryptionTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.IOHelper; +import org.junit.jupiter.api.Test; + +/** + * Tests the processing of a file stream-cache with encryption by the multi-cast + * processor in the parallel processing mode. + */ +public class MultiCastParallelAndStreamCachingWithEncryptionTest extends ContextTestSupport { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setStreamCaching(true); + context.getStreamCachingStrategy().setEnabled(true); + context.getStreamCachingStrategy().setSpoolDirectory("target/camel/cache"); + context.getStreamCachingStrategy().setSpoolThreshold(5000L); + context.getStreamCachingStrategy().setSpoolCipher("AES/CTR/NoPadding"); + + from("direct:start").multicast().parallelProcessing().stopOnException().to("direct:a", "direct:b").end() + .to("mock:result"); + + from("direct:a") // + // read stream + .process(new SimpleProcessor()).to("mock:resulta"); + + from("direct:b") // + // read stream concurrently, because of parallel processing + .process(new SimpleProcessor()).to("mock:resultb"); + + } + }; + } + + private static class SimpleProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + + Object body = exchange.getIn().getBody(); + if (body instanceof InputStream) { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + IOHelper.copy((InputStream) body, output); + exchange.getMessage().setBody(output.toByteArray()); + } else { + throw new RuntimeException("Type " + body.getClass().getName() + " not supported"); + } + + } + } + + /** + * Tests the FileInputStreamCache. The sent input stream is transformed to + * FileInputStreamCache before the multi-cast processor is called. + * + * @throws Exception + */ + @Test + public void testFileInputStreamCache() throws Exception { + + InputStream resultA = getPayload(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IOHelper.copy(resultA, baos); + IOHelper.close(resultA); + byte[] resultBytes = baos.toByteArray(); + + MockEndpoint mock = getMockEndpoint("mock:resulta"); + mock.expectedBodiesReceived(resultBytes); + mock = getMockEndpoint("mock:resultb"); + mock.expectedBodiesReceived(resultBytes); + + InputStream in = getPayload(); + try { + template.sendBody("direct:start", in); + assertMockEndpointsSatisfied(); + } finally { + in.close(); + } + } + + private InputStream getPayload() { + return MultiCastParallelAndStreamCachingWithEncryptionTest.class.getClassLoader() + .getResourceAsStream("org/apache/camel/processor/payload10KB.txt"); + } + +} diff --git a/core/camel-core/src/test/resources/org/apache/camel/processor/payload10KB.txt b/core/camel-core/src/test/resources/org/apache/camel/processor/payload10KB.txt new file mode 100644 index 0000000..59fde05 --- /dev/null +++ b/core/camel-core/src/test/resources/org/apache/camel/processor/payload10KB.txt @@ -0,0 +1 @@ +payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102payloud102pa [...] \ No newline at end of file diff --git a/core/camel-support/src/main/java/org/apache/camel/converter/stream/CipherPair.java b/core/camel-support/src/main/java/org/apache/camel/converter/stream/CipherPair.java index b72e770..b961292 100644 --- a/core/camel-support/src/main/java/org/apache/camel/converter/stream/CipherPair.java +++ b/core/camel-support/src/main/java/org/apache/camel/converter/stream/CipherPair.java @@ -30,7 +30,8 @@ import javax.crypto.spec.IvParameterSpec; public class CipherPair { private final String transformation; private final Cipher enccipher; - private final Cipher deccipher; + private final Key key; + private final byte[] ivp; public CipherPair(String transformation) throws GeneralSecurityException { this.transformation = transformation; @@ -45,12 +46,10 @@ public class CipherPair { KeyGenerator keygen = KeyGenerator.getInstance(a); keygen.init(new SecureRandom()); - Key key = keygen.generateKey(); + key = keygen.generateKey(); enccipher = Cipher.getInstance(transformation); - deccipher = Cipher.getInstance(transformation); enccipher.init(Cipher.ENCRYPT_MODE, key); - final byte[] ivp = enccipher.getIV(); - deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp)); + ivp = enccipher.getIV(); } public String getTransformation() { @@ -61,7 +60,18 @@ public class CipherPair { return enccipher; } - public Cipher getDecryptor() { - return deccipher; + /** + * Create the decryptor every time because the decryptor is not thead safe. For example, if you reuse the decryptor + * instance in the Multi-cast case then you will get errors. + */ + public Cipher createDecryptor() { + try { + Cipher deccipher = Cipher.getInstance(transformation); + deccipher.init(Cipher.DECRYPT_MODE, key, ivp == null ? null : new IvParameterSpec(ivp)); + return deccipher; + } catch (GeneralSecurityException e) { + // should not happen + throw new IllegalStateException("Could not instanciate decryptor", e); + } } } diff --git a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index 2aefc36..6a8a5d6 100644 --- a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -136,7 +136,7 @@ public final class FileInputStreamCache extends InputStream implements StreamCac private InputStream createInputStream(File file) throws IOException { InputStream in = new BufferedInputStream(Files.newInputStream(file.toPath(), StandardOpenOption.READ)); if (ciphers != null) { - in = new CipherInputStream(in, ciphers.getDecryptor()) { + in = new CipherInputStream(in, ciphers.createDecryptor()) { boolean closed; public void close() throws IOException {