This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3bcd973bb368 [SPARK-50808][CORE] Fix issue in writeAll with mixed
types not getting written properly
3bcd973bb368 is described below
commit 3bcd973bb368b01b3285ed2680153b227c14d1fb
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Sat Jan 18 11:37:38 2025 -0600
[SPARK-50808][CORE] Fix issue in writeAll with mixed types not getting
written properly
### What changes were proposed in this pull request?
Fix a bug with LevelDB/RocksDB's batched write method (`writeAll`) not
using the correct list to serialize values.
Luckily, existing use of this api is for the same class - which avoids this
bug in practice.
This PR fixes the issue to ensure the api contract works as expected, and
avoids issues in future.
### Why are the changes needed?
Fix existing bug.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New test introduced. Test fails without proposed changes.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49479 from mridulm/mridulm/fix-kvstore-WriteAll-multiple-types.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 98d9968b9a74201f35b064eec40fec17a360cccb)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
.../org/apache/spark/util/kvstore/LevelDB.java | 3 +-
.../org/apache/spark/util/kvstore/RocksDB.java | 3 +-
.../apache/spark/util/kvstore/LevelDBSuite.java | 40 ++++++++++++++++++++++
.../apache/spark/util/kvstore/RocksDBSuite.java | 39 +++++++++++++++++++++
4 files changed, 83 insertions(+), 2 deletions(-)
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 7f8d6c58aec7..74843806b3ea 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -177,7 +177,7 @@ public class LevelDB implements KVStore {
// Deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
- for (Object value : values) {
+ for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
@@ -191,6 +191,7 @@ public class LevelDB implements KVStore {
try (WriteBatch batch = db().createWriteBatch()) {
while (valueIter.hasNext()) {
+ assert serializedValueIter.hasNext();
updateBatch(batch, valueIter.next(), serializedValueIter.next(),
klass,
naturalIndex, indices);
}
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
index 4bc2b233fe12..8c9ac5a23200 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
@@ -209,7 +209,7 @@ public class RocksDB implements KVStore {
// Deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
- for (Object value : values) {
+ for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
@@ -223,6 +223,7 @@ public class RocksDB implements KVStore {
try (WriteBatch writeBatch = new WriteBatch()) {
while (valueIter.hasNext()) {
+ assert serializedValueIter.hasNext();
updateBatch(writeBatch, valueIter.next(),
serializedValueIter.next(), klass,
naturalIndex, indices);
}
diff --git
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index c22aea821af3..040ccce70b5a 100644
---
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.kvstore;
import java.io.File;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -422,6 +423,37 @@ public class LevelDBSuite {
}
}
+ @Test
+ public void testMultipleTypesWriteAll() throws Exception {
+
+ List<CustomType1> type1List = Arrays.asList(
+ createCustomType1(1),
+ createCustomType1(2),
+ createCustomType1(3),
+ createCustomType1(4)
+ );
+
+ List<CustomType2> type2List = Arrays.asList(
+ createCustomType2(10),
+ createCustomType2(11),
+ createCustomType2(12),
+ createCustomType2(13)
+ );
+
+ List fullList = new ArrayList();
+ fullList.addAll(type1List);
+ fullList.addAll(type2List);
+
+ db.writeAll(fullList);
+ for (CustomType1 value : type1List) {
+ assertEquals(value, db.read(value.getClass(), value.key));
+ }
+ for (CustomType2 value : type2List) {
+ assertEquals(value, db.read(value.getClass(), value.key));
+ }
+ }
+
+
private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
@@ -432,6 +464,14 @@ public class LevelDBSuite {
return t;
}
+ private CustomType2 createCustomType2(int i) {
+ CustomType2 t = new CustomType2();
+ t.key = "key" + i;
+ t.id = "id" + i;
+ t.parentId = "parent_id" + (i / 2);
+ return t;
+ }
+
private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
diff --git
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
index 61f18a9a26de..34a12d8fddec 100644
---
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
+++
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.kvstore;
import java.io.File;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -420,6 +421,36 @@ public class RocksDBSuite {
}
}
+ @Test
+ public void testMultipleTypesWriteAll() throws Exception {
+
+ List<CustomType1> type1List = Arrays.asList(
+ createCustomType1(1),
+ createCustomType1(2),
+ createCustomType1(3),
+ createCustomType1(4)
+ );
+
+ List<CustomType2> type2List = Arrays.asList(
+ createCustomType2(10),
+ createCustomType2(11),
+ createCustomType2(12),
+ createCustomType2(13)
+ );
+
+ List fullList = new ArrayList();
+ fullList.addAll(type1List);
+ fullList.addAll(type2List);
+
+ db.writeAll(fullList);
+ for (CustomType1 value : type1List) {
+ assertEquals(value, db.read(value.getClass(), value.key));
+ }
+ for (CustomType2 value : type2List) {
+ assertEquals(value, db.read(value.getClass(), value.key));
+ }
+ }
+
private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
@@ -430,6 +461,14 @@ public class RocksDBSuite {
return t;
}
+ private CustomType2 createCustomType2(int i) {
+ CustomType2 t = new CustomType2();
+ t.key = "key" + i;
+ t.id = "id" + i;
+ t.parentId = "parent_id" + (i / 2);
+ return t;
+ }
+
private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]