This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3bcd973bb368 [SPARK-50808][CORE] Fix issue in writeAll with mixed 
types not getting written properly
3bcd973bb368 is described below

commit 3bcd973bb368b01b3285ed2680153b227c14d1fb
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Sat Jan 18 11:37:38 2025 -0600

    [SPARK-50808][CORE] Fix issue in writeAll with mixed types not getting 
written properly
    
    ### What changes were proposed in this pull request?
    
    Fix a bug with LevelDB/RocksDB's batched write method (`writeAll`) not 
using the correct list to serialize values.
    Luckily, existing use of this api is for the same class - which avoids this 
bug in practice.
    This PR fixes the issue to ensure the api contract works as expected, and 
avoids issues in future.
    
    ### Why are the changes needed?
    Fix existing bug.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New test introduced. Test fails without proposed changes.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49479 from mridulm/mridulm/fix-kvstore-WriteAll-multiple-types.
    
    Authored-by: Mridul Muralidharan <mridulatgmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 98d9968b9a74201f35b064eec40fec17a360cccb)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../org/apache/spark/util/kvstore/LevelDB.java     |  3 +-
 .../org/apache/spark/util/kvstore/RocksDB.java     |  3 +-
 .../apache/spark/util/kvstore/LevelDBSuite.java    | 40 ++++++++++++++++++++++
 .../apache/spark/util/kvstore/RocksDBSuite.java    | 39 +++++++++++++++++++++
 4 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 7f8d6c58aec7..74843806b3ea 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -177,7 +177,7 @@ public class LevelDB implements KVStore {
 
       // Deserialize outside synchronized block
       List<byte[]> list = new ArrayList<>(entry.getValue().size());
-      for (Object value : values) {
+      for (Object value : entry.getValue()) {
         list.add(serializer.serialize(value));
       }
       serializedValueIter = list.iterator();
@@ -191,6 +191,7 @@ public class LevelDB implements KVStore {
 
         try (WriteBatch batch = db().createWriteBatch()) {
           while (valueIter.hasNext()) {
+            assert serializedValueIter.hasNext();
             updateBatch(batch, valueIter.next(), serializedValueIter.next(), 
klass,
               naturalIndex, indices);
           }
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
index 4bc2b233fe12..8c9ac5a23200 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
@@ -209,7 +209,7 @@ public class RocksDB implements KVStore {
 
       // Deserialize outside synchronized block
       List<byte[]> list = new ArrayList<>(entry.getValue().size());
-      for (Object value : values) {
+      for (Object value : entry.getValue()) {
         list.add(serializer.serialize(value));
       }
       serializedValueIter = list.iterator();
@@ -223,6 +223,7 @@ public class RocksDB implements KVStore {
 
         try (WriteBatch writeBatch = new WriteBatch()) {
           while (valueIter.hasNext()) {
+            assert serializedValueIter.hasNext();
             updateBatch(writeBatch, valueIter.next(), 
serializedValueIter.next(), klass,
                 naturalIndex, indices);
           }
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index c22aea821af3..040ccce70b5a 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.kvstore;
 import java.io.File;
 import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -422,6 +423,37 @@ public class LevelDBSuite {
     }
   }
 
+  @Test
+  public void testMultipleTypesWriteAll() throws Exception {
+
+    List<CustomType1> type1List = Arrays.asList(
+      createCustomType1(1),
+      createCustomType1(2),
+      createCustomType1(3),
+      createCustomType1(4)
+    );
+
+    List<CustomType2> type2List = Arrays.asList(
+      createCustomType2(10),
+      createCustomType2(11),
+      createCustomType2(12),
+      createCustomType2(13)
+    );
+
+    List fullList = new ArrayList();
+    fullList.addAll(type1List);
+    fullList.addAll(type2List);
+
+    db.writeAll(fullList);
+    for (CustomType1 value : type1List) {
+      assertEquals(value, db.read(value.getClass(), value.key));
+    }
+    for (CustomType2 value : type2List) {
+      assertEquals(value, db.read(value.getClass(), value.key));
+    }
+  }
+
+
   private CustomType1 createCustomType1(int i) {
     CustomType1 t = new CustomType1();
     t.key = "key" + i;
@@ -432,6 +464,14 @@ public class LevelDBSuite {
     return t;
   }
 
+  private CustomType2 createCustomType2(int i) {
+    CustomType2 t = new CustomType2();
+    t.key = "key" + i;
+    t.id = "id" + i;
+    t.parentId = "parent_id" + (i / 2);
+    return t;
+  }
+
   private int countKeys(Class<?> type) throws Exception {
     byte[] prefix = db.getTypeInfo(type).keyPrefix();
     int count = 0;
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
index 61f18a9a26de..34a12d8fddec 100644
--- 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.kvstore;
 import java.io.File;
 import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -420,6 +421,36 @@ public class RocksDBSuite {
     }
   }
 
+  @Test
+  public void testMultipleTypesWriteAll() throws Exception {
+
+    List<CustomType1> type1List = Arrays.asList(
+      createCustomType1(1),
+      createCustomType1(2),
+      createCustomType1(3),
+      createCustomType1(4)
+    );
+
+    List<CustomType2> type2List = Arrays.asList(
+      createCustomType2(10),
+      createCustomType2(11),
+      createCustomType2(12),
+      createCustomType2(13)
+    );
+
+    List fullList = new ArrayList();
+    fullList.addAll(type1List);
+    fullList.addAll(type2List);
+
+    db.writeAll(fullList);
+    for (CustomType1 value : type1List) {
+      assertEquals(value, db.read(value.getClass(), value.key));
+    }
+    for (CustomType2 value : type2List) {
+      assertEquals(value, db.read(value.getClass(), value.key));
+    }
+  }
+
   private CustomType1 createCustomType1(int i) {
     CustomType1 t = new CustomType1();
     t.key = "key" + i;
@@ -430,6 +461,14 @@ public class RocksDBSuite {
     return t;
   }
 
+  private CustomType2 createCustomType2(int i) {
+    CustomType2 t = new CustomType2();
+    t.key = "key" + i;
+    t.id = "id" + i;
+    t.parentId = "parent_id" + (i / 2);
+    return t;
+  }
+
   private int countKeys(Class<?> type) throws Exception {
     byte[] prefix = db.getTypeInfo(type).keyPrefix();
     int count = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to