This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 0751f01ff1 prevents creating overlapping encryption streams (#5400) 0751f01ff1 is described below commit 0751f01ff16fff1f6790629226ac3cc2acf5b575 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Mar 13 13:11:02 2025 -0400 prevents creating overlapping encryption streams (#5400) Modified AESCryptoService to detect creation of multiple encryptions streams at the same time. The implementation in AESCryptoService does not support this because it uses shared Cipher object and shrared intialization vectors. The code that uses CryptoService does not seem to have a need for creating more that one encryption stream at a time. If the usage pattern of the code change, then the detection added in this commit should catch that. fixes #5386 * switch to atomic bool --- .../core/crypto/streams/BlockedOutputStream.java | 7 ++++++- .../accumulo/core/spi/crypto/AESCryptoService.java | 15 ++++++++++++-- .../accumulo/core/spi/crypto/CryptoService.java | 3 ++- .../accumulo/core/spi/crypto/FileEncrypter.java | 5 +++-- .../accumulo/core/crypto/BlockedIOStreamTest.java | 9 +++++--- .../apache/accumulo/core/crypto/CryptoTest.java | 24 ++++++++++++++++++++++ 6 files changed, 54 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java index 05de9f7373..c4ab873c13 100644 --- a/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; /** * Buffers all input in a growing buffer until flush() is called. Then entire buffer is written, @@ -32,8 +33,10 @@ public class BlockedOutputStream extends OutputStream { int blockSize; DataOutputStream out; ByteBuffer bb; + private final AtomicBoolean openTracker; - public BlockedOutputStream(OutputStream out, int blockSize, int bufferSize) { + public BlockedOutputStream(OutputStream out, int blockSize, int bufferSize, + AtomicBoolean openTracker) { if (bufferSize <= 0) { throw new IllegalArgumentException("bufferSize must be greater than 0."); } @@ -50,6 +53,7 @@ public class BlockedOutputStream extends OutputStream { // some buffer space + bytes to make the buffer evened up with the cipher block size - 4 bytes // for the size int bb = ByteBuffer.allocate(bufferSize + remainder - 4); + this.openTracker = openTracker; } @Override @@ -113,5 +117,6 @@ public class BlockedOutputStream extends OutputStream { public void close() throws IOException { flush(); out.close(); + openTracker.compareAndSet(true, false); } } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java index 85895053b3..555103fd98 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import javax.crypto.Cipher; import javax.crypto.CipherInputStream; @@ -351,6 +352,7 @@ public class AESCryptoService implements CryptoService { private final byte[] initVector = new byte[GCM_IV_LENGTH_IN_BYTES]; private final Cipher cipher; private final byte[] decryptionParameters; + private final AtomicBoolean openTracker = new AtomicBoolean(); AESGCMFileEncrypter() { try { @@ -371,6 +373,10 @@ public class AESCryptoService implements CryptoService { throw new CryptoException( "Key/IV reuse is forbidden in AESGCMCryptoModule. Too many RBlocks."); } + if (!openTracker.compareAndSet(false, true)) { + throw new CryptoException("Attempted to obtain new stream without closing previous one."); + } + incrementIV(initVector, initVector.length - 1); if (Arrays.equals(initVector, firstInitVector)) { ivReused = true; // This will allow us to write the final block, since the @@ -397,7 +403,7 @@ public class AESCryptoService implements CryptoService { // Without this, when the crypto stream is closed (in order to flush its last bytes) // the underlying RFile stream will *also* be closed, and that's undesirable as the // cipher stream is closed for every block written. - return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024); + return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024, openTracker); } /** @@ -493,6 +499,7 @@ public class AESCryptoService implements CryptoService { private final Key fek; private final byte[] initVector = new byte[IV_LENGTH_IN_BYTES]; private final byte[] decryptionParameters; + private final AtomicBoolean openTracker = new AtomicBoolean(); AESCBCFileEncrypter() { try { @@ -507,6 +514,10 @@ public class AESCryptoService implements CryptoService { @Override public OutputStream encryptStream(OutputStream outputStream) throws CryptoException { + if (!openTracker.compareAndSet(false, true)) { + throw new CryptoException("Attempted to obtain new stream without closing previous one."); + } + random.nextBytes(initVector); try { outputStream.write(initVector); @@ -521,7 +532,7 @@ public class AESCryptoService implements CryptoService { } CipherOutputStream cos = new CipherOutputStream(outputStream, cipher); - return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024); + return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024, openTracker); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java index a2055f1a89..dfdf63013b 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java @@ -37,7 +37,8 @@ public interface CryptoService { /** * Initialize the FileEncrypter for the environment and return. This will get called once per - * R-File or Write Ahead Log. FileEncrypter implementation must be thread safe. + * R-File or Write Ahead Log. FileEncrypter implementation is not expected be called by multiple + * threads. */ FileEncrypter getFileEncrypter(CryptoEnvironment environment); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java index ab0192c2d0..47e7d8c073 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java @@ -21,13 +21,14 @@ package org.apache.accumulo.core.spi.crypto; import java.io.OutputStream; /** - * Class implementation that will encrypt a file. Make sure implementation is thread safe. + * Class implementation that will encrypt a file. * * @since 2.0 */ public interface FileEncrypter { /** - * Encrypt the OutputStream. + * Encrypt the OutputStream. Only one OutputStream is expected to be active at a time. Before a + * new encryption OutputStream is created the previous one is expected to be closed. */ OutputStream encryptStream(OutputStream outputStream) throws CryptoService.CryptoException; diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java index 6c49fde2a2..584db67a9e 100644 --- a/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java +++ b/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java @@ -28,6 +28,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.security.SecureRandom; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.crypto.streams.BlockedInputStream; import org.apache.accumulo.core.crypto.streams.BlockedOutputStream; @@ -44,7 +45,8 @@ public class BlockedIOStreamTest { private void writeRead(int blockSize, int expectedSize) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BlockedOutputStream blockOut = new BlockedOutputStream(baos, blockSize, 1); + BlockedOutputStream blockOut = + new BlockedOutputStream(baos, blockSize, 1, new AtomicBoolean(true)); String contentString = "My Blocked Content String"; byte[] content = contentString.getBytes(UTF_8); @@ -86,7 +88,7 @@ public class BlockedIOStreamTest { public void testSpillingOverOutputStream() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); // buffer will be size 12 - BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16); + BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16, new AtomicBoolean(true)); byte[] undersized = new byte[11]; byte[] perfectSized = new byte[12]; @@ -129,7 +131,8 @@ public class BlockedIOStreamTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); int blockSize = 16; // buffer will be size 12 - BlockedOutputStream blockOut = new BlockedOutputStream(baos, blockSize, blockSize); + BlockedOutputStream blockOut = + new BlockedOutputStream(baos, blockSize, blockSize, new AtomicBoolean(true)); int size = 1024 * 1024 * 128; byte[] giant = new byte[size]; diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java index 45f2ab22e5..4a93b36043 100644 --- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java +++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java @@ -562,6 +562,30 @@ public class CryptoTest { } } + @Test + public void testOverlappingWrites() throws Exception { + testOverlappingWrites(WAL); + testOverlappingWrites(TABLE); + } + + private void testOverlappingWrites(Scope scope) throws Exception { + AESCryptoService cs = new AESCryptoService(); + cs.init(getAllCryptoProperties(ConfigMode.CRYPTO_TABLE_ON)); + CryptoEnvironment encEnv = new CryptoEnvironmentImpl(scope, null, null); + FileEncrypter encrypter = cs.getFileEncrypter(encEnv); + + ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + var es1 = encrypter.encryptStream(out1); + + // try to create a new encryption stream w/o closing the previous one + ByteArrayOutputStream out2 = new ByteArrayOutputStream(); + var ce = assertThrows(CryptoException.class, () -> encrypter.encryptStream(out2)); + assertTrue(ce.getMessage().contains("closing previous")); + + es1.close(); + var es2 = encrypter.encryptStream(out2); + } + private ArrayList<Key> testData() { ArrayList<Key> keys = new ArrayList<>(); keys.add(new Key("a", "cf", "cq"));