This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 0751f01ff1 prevents creating overlapping encryption streams (#5400)
0751f01ff1 is described below

commit 0751f01ff16fff1f6790629226ac3cc2acf5b575
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Mar 13 13:11:02 2025 -0400

    prevents creating overlapping encryption streams (#5400)
    
    Modified AESCryptoService to detect creation of multiple encryptions
    streams at the same time. The implementation in AESCryptoService does
    not support this because it uses shared Cipher object and shrared
    intialization vectors.  The code that uses CryptoService does not seem
    to have a need for creating more that one encryption stream at a time.
    If the usage pattern of the code change, then the detection added in
    this commit should catch that.
    
    fixes #5386
    
    * switch to atomic bool
---
 .../core/crypto/streams/BlockedOutputStream.java   |  7 ++++++-
 .../accumulo/core/spi/crypto/AESCryptoService.java | 15 ++++++++++++--
 .../accumulo/core/spi/crypto/CryptoService.java    |  3 ++-
 .../accumulo/core/spi/crypto/FileEncrypter.java    |  5 +++--
 .../accumulo/core/crypto/BlockedIOStreamTest.java  |  9 +++++---
 .../apache/accumulo/core/crypto/CryptoTest.java    | 24 ++++++++++++++++++++++
 6 files changed, 54 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java
 
b/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java
index 05de9f7373..c4ab873c13 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedOutputStream.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Buffers all input in a growing buffer until flush() is called. Then entire 
buffer is written,
@@ -32,8 +33,10 @@ public class BlockedOutputStream extends OutputStream {
   int blockSize;
   DataOutputStream out;
   ByteBuffer bb;
+  private final AtomicBoolean openTracker;
 
-  public BlockedOutputStream(OutputStream out, int blockSize, int bufferSize) {
+  public BlockedOutputStream(OutputStream out, int blockSize, int bufferSize,
+      AtomicBoolean openTracker) {
     if (bufferSize <= 0) {
       throw new IllegalArgumentException("bufferSize must be greater than 0.");
     }
@@ -50,6 +53,7 @@ public class BlockedOutputStream extends OutputStream {
     // some buffer space + bytes to make the buffer evened up with the cipher 
block size - 4 bytes
     // for the size int
     bb = ByteBuffer.allocate(bufferSize + remainder - 4);
+    this.openTracker = openTracker;
   }
 
   @Override
@@ -113,5 +117,6 @@ public class BlockedOutputStream extends OutputStream {
   public void close() throws IOException {
     flush();
     out.close();
+    openTracker.compareAndSet(true, false);
   }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java 
b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java
index 85895053b3..555103fd98 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.crypto.Cipher;
 import javax.crypto.CipherInputStream;
@@ -351,6 +352,7 @@ public class AESCryptoService implements CryptoService {
       private final byte[] initVector = new byte[GCM_IV_LENGTH_IN_BYTES];
       private final Cipher cipher;
       private final byte[] decryptionParameters;
+      private final AtomicBoolean openTracker = new AtomicBoolean();
 
       AESGCMFileEncrypter() {
         try {
@@ -371,6 +373,10 @@ public class AESCryptoService implements CryptoService {
           throw new CryptoException(
               "Key/IV reuse is forbidden in AESGCMCryptoModule. Too many 
RBlocks.");
         }
+        if (!openTracker.compareAndSet(false, true)) {
+          throw new CryptoException("Attempted to obtain new stream without 
closing previous one.");
+        }
+
         incrementIV(initVector, initVector.length - 1);
         if (Arrays.equals(initVector, firstInitVector)) {
           ivReused = true; // This will allow us to write the final block, 
since the
@@ -397,7 +403,7 @@ public class AESCryptoService implements CryptoService {
         // Without this, when the crypto stream is closed (in order to flush 
its last bytes)
         // the underlying RFile stream will *also* be closed, and that's 
undesirable as the
         // cipher stream is closed for every block written.
-        return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024);
+        return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024, 
openTracker);
       }
 
       /**
@@ -493,6 +499,7 @@ public class AESCryptoService implements CryptoService {
       private final Key fek;
       private final byte[] initVector = new byte[IV_LENGTH_IN_BYTES];
       private final byte[] decryptionParameters;
+      private final AtomicBoolean openTracker = new AtomicBoolean();
 
       AESCBCFileEncrypter() {
         try {
@@ -507,6 +514,10 @@ public class AESCryptoService implements CryptoService {
 
       @Override
       public OutputStream encryptStream(OutputStream outputStream) throws 
CryptoException {
+        if (!openTracker.compareAndSet(false, true)) {
+          throw new CryptoException("Attempted to obtain new stream without 
closing previous one.");
+        }
+
         random.nextBytes(initVector);
         try {
           outputStream.write(initVector);
@@ -521,7 +532,7 @@ public class AESCryptoService implements CryptoService {
         }
 
         CipherOutputStream cos = new CipherOutputStream(outputStream, cipher);
-        return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024);
+        return new BlockedOutputStream(cos, cipher.getBlockSize(), 1024, 
openTracker);
       }
 
       @Override
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java 
b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java
index a2055f1a89..dfdf63013b 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/CryptoService.java
@@ -37,7 +37,8 @@ public interface CryptoService {
 
   /**
    * Initialize the FileEncrypter for the environment and return. This will 
get called once per
-   * R-File or Write Ahead Log. FileEncrypter implementation must be thread 
safe.
+   * R-File or Write Ahead Log. FileEncrypter implementation is not expected 
be called by multiple
+   * threads.
    */
   FileEncrypter getFileEncrypter(CryptoEnvironment environment);
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java 
b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java
index ab0192c2d0..47e7d8c073 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/FileEncrypter.java
@@ -21,13 +21,14 @@ package org.apache.accumulo.core.spi.crypto;
 import java.io.OutputStream;
 
 /**
- * Class implementation that will encrypt a file. Make sure implementation is 
thread safe.
+ * Class implementation that will encrypt a file.
  *
  * @since 2.0
  */
 public interface FileEncrypter {
   /**
-   * Encrypt the OutputStream.
+   * Encrypt the OutputStream. Only one OutputStream is expected to be active 
at a time. Before a
+   * new encryption OutputStream is created the previous one is expected to be 
closed.
    */
   OutputStream encryptStream(OutputStream outputStream) throws 
CryptoService.CryptoException;
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java 
b/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java
index 6c49fde2a2..584db67a9e 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/crypto/BlockedIOStreamTest.java
@@ -28,6 +28,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.security.SecureRandom;
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.crypto.streams.BlockedInputStream;
 import org.apache.accumulo.core.crypto.streams.BlockedOutputStream;
@@ -44,7 +45,8 @@ public class BlockedIOStreamTest {
 
   private void writeRead(int blockSize, int expectedSize) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    BlockedOutputStream blockOut = new BlockedOutputStream(baos, blockSize, 1);
+    BlockedOutputStream blockOut =
+        new BlockedOutputStream(baos, blockSize, 1, new AtomicBoolean(true));
 
     String contentString = "My Blocked Content String";
     byte[] content = contentString.getBytes(UTF_8);
@@ -86,7 +88,7 @@ public class BlockedIOStreamTest {
   public void testSpillingOverOutputStream() throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     // buffer will be size 12
-    BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16);
+    BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16, new 
AtomicBoolean(true));
 
     byte[] undersized = new byte[11];
     byte[] perfectSized = new byte[12];
@@ -129,7 +131,8 @@ public class BlockedIOStreamTest {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     int blockSize = 16;
     // buffer will be size 12
-    BlockedOutputStream blockOut = new BlockedOutputStream(baos, blockSize, 
blockSize);
+    BlockedOutputStream blockOut =
+        new BlockedOutputStream(baos, blockSize, blockSize, new 
AtomicBoolean(true));
 
     int size = 1024 * 1024 * 128;
     byte[] giant = new byte[size];
diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java 
b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
index 45f2ab22e5..4a93b36043 100644
--- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
@@ -562,6 +562,30 @@ public class CryptoTest {
     }
   }
 
+  @Test
+  public void testOverlappingWrites() throws Exception {
+    testOverlappingWrites(WAL);
+    testOverlappingWrites(TABLE);
+  }
+
+  private void testOverlappingWrites(Scope scope) throws Exception {
+    AESCryptoService cs = new AESCryptoService();
+    cs.init(getAllCryptoProperties(ConfigMode.CRYPTO_TABLE_ON));
+    CryptoEnvironment encEnv = new CryptoEnvironmentImpl(scope, null, null);
+    FileEncrypter encrypter = cs.getFileEncrypter(encEnv);
+
+    ByteArrayOutputStream out1 = new ByteArrayOutputStream();
+    var es1 = encrypter.encryptStream(out1);
+
+    // try to create a new encryption stream w/o closing the previous one
+    ByteArrayOutputStream out2 = new ByteArrayOutputStream();
+    var ce = assertThrows(CryptoException.class, () -> 
encrypter.encryptStream(out2));
+    assertTrue(ce.getMessage().contains("closing previous"));
+
+    es1.close();
+    var es2 = encrypter.encryptStream(out2);
+  }
+
   private ArrayList<Key> testData() {
     ArrayList<Key> keys = new ArrayList<>();
     keys.add(new Key("a", "cf", "cq"));

Reply via email to