This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 9037d6112a fixes AES decryption threading issue (#5384)
9037d6112a is described below

commit 9037d6112aced7c312a6cad31299809d83ae16e8
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Mar 7 09:49:06 2025 -0500

    fixes AES decryption threading issue (#5384)
    
    A single Cipher object was shared between multiple inputstreams when
    decrypting AES data. This caused decryption to fail when mutliple
    threads created input streams for decryption.  This could potentially
    cause failure for a single thread also if it interleaves reading from
    multiple streams that use the same Cipher.
    
    Modified the code to create a Cipher per input stream.
---
 .../accumulo/core/spi/crypto/AESCryptoService.java | 22 ++++----
 .../apache/accumulo/core/crypto/CryptoTest.java    | 65 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java 
b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java
index 15e5f45ca6..85895053b3 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java
@@ -426,20 +426,21 @@ public class AESCryptoService implements CryptoService {
     }
 
     public class AESGCMFileDecrypter implements FileDecrypter {
-      private final Cipher cipher;
       private final Key fek;
 
       AESGCMFileDecrypter(Key fek) {
+        this.fek = fek;
+      }
+
+      @Override
+      public InputStream decryptStream(InputStream inputStream) throws 
CryptoException {
+        Cipher cipher;
         try {
           cipher = Cipher.getInstance(transformation);
         } catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
           throw new CryptoException("Error obtaining cipher for transform " + 
transformation, e);
         }
-        this.fek = fek;
-      }
 
-      @Override
-      public InputStream decryptStream(InputStream inputStream) throws 
CryptoException {
         byte[] initVector = new byte[GCM_IV_LENGTH_IN_BYTES];
         try {
           IOUtils.readFully(inputStream, initVector);
@@ -531,20 +532,21 @@ public class AESCryptoService implements CryptoService {
 
     @SuppressFBWarnings(value = "CIPHER_INTEGRITY", justification = "CBC is 
provided for WALs")
     public class AESCBCFileDecrypter implements FileDecrypter {
-      private final Cipher cipher;
       private final Key fek;
 
       AESCBCFileDecrypter(Key fek) {
+        this.fek = fek;
+      }
+
+      @Override
+      public InputStream decryptStream(InputStream inputStream) throws 
CryptoException {
+        Cipher cipher;
         try {
           cipher = Cipher.getInstance(transformation);
         } catch (NoSuchAlgorithmException | NoSuchPaddingException e) {
           throw new CryptoException("Error obtaining cipher for transform " + 
transformation, e);
         }
-        this.fek = fek;
-      }
 
-      @Override
-      public InputStream decryptStream(InputStream inputStream) throws 
CryptoException {
         byte[] initVector = new byte[IV_LENGTH_IN_BYTES];
         try {
           IOUtils.readFully(inputStream, initVector);
diff --git a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java 
b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
index 6027e3975b..45f2ab22e5 100644
--- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java
@@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -45,7 +46,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import javax.crypto.Cipher;
 import javax.crypto.NoSuchPaddingException;
@@ -497,6 +501,67 @@ public class CryptoTest {
     assertEquals(2, factory.getCount());
   }
 
+  @Test
+  public void testMultipleThreads() throws Exception {
+    testMultipleThreads(WAL);
+    testMultipleThreads(TABLE);
+  }
+
+  private void testMultipleThreads(Scope scope) throws Exception {
+
+    byte[] plainText = new byte[1024 * 1024];
+    for (int i = 0; i < plainText.length; i++) {
+      plainText[i] = (byte) (i % 128);
+    }
+
+    AESCryptoService cs = new AESCryptoService();
+    cs.init(getAllCryptoProperties(ConfigMode.CRYPTO_TABLE_ON));
+    CryptoEnvironment encEnv = new CryptoEnvironmentImpl(scope, null, null);
+    FileEncrypter encrypter = cs.getFileEncrypter(encEnv);
+    byte[] params = encrypter.getDecryptionParameters();
+
+    assertNotNull(params);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(out);
+    OutputStream encrypted = encrypter.encryptStream(dataOut);
+
+    assertNotNull(encrypted);
+    DataOutputStream cipherOut = new DataOutputStream(encrypted);
+
+    cipherOut.write(plainText);
+
+    cipherOut.close();
+    dataOut.close();
+    encrypted.close();
+    out.close();
+    byte[] cipherText = out.toByteArray();
+
+    var executor = Executors.newCachedThreadPool();
+
+    List<Future<Boolean>> verifyFutures = new ArrayList<>();
+
+    FileDecrypter decrypter = cs.getFileDecrypter(new 
CryptoEnvironmentImpl(scope, null, params));
+
+    // verify that each input stream returned by decrypter.decryptStream() is 
independent when used
+    // by multiple threads
+    for (int i = 0; i < 32; i++) {
+      var future = executor.submit(() -> {
+        try (ByteArrayInputStream in = new ByteArrayInputStream(cipherText);
+            DataInputStream decrypted = new 
DataInputStream(decrypter.decryptStream(in))) {
+          byte[] dataRead = new byte[plainText.length];
+          decrypted.readFully(dataRead);
+          return Arrays.equals(plainText, dataRead);
+        }
+      });
+      verifyFutures.add(future);
+    }
+
+    for (var future : verifyFutures) {
+      assertTrue(future.get());
+    }
+  }
+
   private ArrayList<Key> testData() {
     ArrayList<Key> keys = new ArrayList<>();
     keys.add(new Key("a", "cf", "cq"));

Reply via email to