This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f6a63b4ae [SPARK-42130][UI] Handle null string values in 
AccumulableInfo and ProcessSummary
66f6a63b4ae is described below

commit 66f6a63b4ae00607b7a3d44371d379bbe1374569
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Jan 20 13:48:15 2023 -0800

    [SPARK-42130][UI] Handle null string values in AccumulableInfo and 
ProcessSummary
    
    ### What changes were proposed in this pull request?
    
    After revisiting https://github.com/apache/spark/pull/39416 and 
https://github.com/apache/spark/pull/39623, I propose:
    * checking nullability of all string fields to avoid NPE
    * using `optional string` for the protobuf definition of all string fields. 
If the deserialized result is None, then set the string field as null.
    
    Take `AccumulableInfo` as an example, it can be null on created: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/LiveEntity.scala#L744
    The null value can make difference in the UI logic: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L791
    
    This PR updates the developer guide and introduces utility methods for 
serializing/deserializing string fields. It also handles null string values in 
AccumulableInfo and ProcessSummary for setting an example.
    ### Why are the changes needed?
    
    * update developer guide for better handling of null string values
    * add utility methods for future development of string 
serialization/deserialization
    * Properly handles null string values in AccumulableInfo and ProcessSummary 
for setting an example
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test null string input values in AccumulableInfo and ProcessSummary 
protobuf serializer.
    
    Closes #39666 from gengliangwang/fixAcc.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../apache/spark/status/protobuf/store_types.proto | 15 ++++--
 .../protobuf/AccumulableInfoSerializer.scala       | 10 ++--
 .../protobuf/ProcessSummaryWrapperSerializer.scala | 10 ++--
 .../org/apache/spark/status/protobuf/Utils.scala   | 14 ++++++
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 53 ++++++++++++----------
 5 files changed, 64 insertions(+), 38 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 551de95c157..96c78aa001d 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -22,7 +22,12 @@ package org.apache.spark.status.protobuf;
  * Developer guides:
  *   - Coding style: https://developers.google.com/protocol-buffers/docs/style
  *   - Use int64 for job/stage IDs, in case of future extension in Spark core.
- *   - Use `weakIntern` on string values in create new objects during 
deserialization.
+ *   - For string fields:
+ *     - use `optional string` for protobuf definition
+ *     - on serialization, check if the string is null to avoid NPE
+ *     - on deserialization, set string fields as null if it is not set. Also, 
use `weakIntern` on
+ *       string values in create new objects during deserialization.
+ *     - add tests with null string inputs
  */
 
 enum JobExecutionStatus {
@@ -65,9 +70,9 @@ message JobDataWrapper {
 
 message AccumulableInfo {
   int64 id = 1;
-  string name = 2;
+  optional string name = 2;
   optional string update = 3;
-  string value = 4;
+  optional string value = 4;
 }
 
 message TaskDataWrapper {
@@ -334,8 +339,8 @@ message SpeculationStageSummaryWrapper {
 }
 
 message ProcessSummary {
-  string id = 1;
-  string host_port = 2;
+  optional string id = 1;
+  optional string host_port = 2;
   bool is_active = 3;
   int32 total_cores = 4;
   int64 add_time = 5;
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
index 18f937cecdb..a696203bc52 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
@@ -22,7 +22,7 @@ import java.util.{List => JList}
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.status.api.v1.AccumulableInfo
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, 
setStringField}
 import org.apache.spark.util.Utils.weakIntern
 
 private[protobuf] object AccumulableInfoSerializer {
@@ -30,8 +30,8 @@ private[protobuf] object AccumulableInfoSerializer {
   def serialize(input: AccumulableInfo): StoreTypes.AccumulableInfo = {
     val builder = StoreTypes.AccumulableInfo.newBuilder()
       .setId(input.id)
-      .setName(input.name)
-      .setValue(input.value)
+    setStringField(input.name, builder.setName)
+    setStringField(input.value, builder.setValue)
     input.update.foreach(builder.setUpdate)
     builder.build()
   }
@@ -41,9 +41,9 @@ private[protobuf] object AccumulableInfoSerializer {
     updates.forEach { update =>
       accumulatorUpdates.append(new AccumulableInfo(
         id = update.getId,
-        name = weakIntern(update.getName),
+        name = getStringField(update.hasName, () => 
weakIntern(update.getName)),
         update = getOptional(update.hasUpdate, update.getUpdate),
-        value = update.getValue))
+        value = getStringField(update.hasValue, update.getValue)))
     }
     accumulatorUpdates
   }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
index a3d13ddd31f..3a5d224f41b 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.status.ProcessSummaryWrapper
 import org.apache.spark.status.api.v1.ProcessSummary
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, 
setStringField}
 
 class ProcessSummaryWrapperSerializer extends 
ProtobufSerDe[ProcessSummaryWrapper] {
 
@@ -42,8 +42,8 @@ class ProcessSummaryWrapperSerializer extends 
ProtobufSerDe[ProcessSummaryWrappe
 
   private def serializeProcessSummary(info: ProcessSummary): 
StoreTypes.ProcessSummary = {
     val builder = StoreTypes.ProcessSummary.newBuilder()
-    builder.setId(info.id)
-    builder.setHostPort(info.hostPort)
+    setStringField(info.id, builder.setId)
+    setStringField(info.hostPort, builder.setHostPort)
     builder.setIsActive(info.isActive)
     builder.setTotalCores(info.totalCores)
     builder.setAddTime(info.addTime.getTime)
@@ -59,8 +59,8 @@ class ProcessSummaryWrapperSerializer extends 
ProtobufSerDe[ProcessSummaryWrappe
   private def deserializeProcessSummary(info: StoreTypes.ProcessSummary): 
ProcessSummary = {
     val removeTime = getOptional(info.hasRemoveTime, () => new 
Date(info.getRemoveTime))
     new ProcessSummary(
-      id = info.getId,
-      hostPort = info.getHostPort,
+      id = getStringField(info.hasId, info.getId),
+      hostPort = getStringField(info.hasHostPort, info.getHostPort),
       isActive = info.getIsActive,
       totalCores = info.getTotalCores,
       addTime = new Date(info.getAddTime),
diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala 
b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
index d1c85d5f5c3..47e280f4ee9 100644
--- a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
@@ -17,10 +17,24 @@
 
 package org.apache.spark.status.protobuf
 
+import com.google.protobuf.MessageOrBuilder
+
 object Utils {
   def getOptional[T](condition: Boolean, result: () => T): Option[T] = if 
(condition) {
     Some(result())
   } else {
     None
   }
+
+  def setStringField(input: String, f: String => MessageOrBuilder): Unit = {
+    if (input != null) {
+      f(input)
+    }
+  }
+
+  def getStringField(condition: Boolean, result: () => String): String = if 
(condition) {
+    result()
+  } else {
+    null
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 2c88666332b..0d0d26410ed 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -86,7 +86,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
   test("Task Data") {
     val accumulatorUpdates = Seq(
       new AccumulableInfo(1L, "duration", Some("update"), "value1"),
-      new AccumulableInfo(2L, "duration2", None, "value2")
+      new AccumulableInfo(2L, "duration2", None, "value2"),
+      new AccumulableInfo(-1L, null, None, null)
     )
     val input = new TaskDataWrapper(
       taskId = 1,
@@ -757,29 +758,34 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
   }
 
   test("Process Summary") {
-    val input = new ProcessSummaryWrapper(
-      info = new ProcessSummary(
-        id = "id_1",
-        hostPort = "localhost:2020",
-        isActive = true,
-        totalCores = 4,
-        addTime = new Date(1234567L),
-        removeTime = Some(new Date(1234568L)),
-        processLogs = Map("log1" -> "log/log1", "log2" -> "logs/log2.log")
+    Seq(
+      ("id_1", "localhost:2020"),
+      (null, "") // hostPort can't be null. Otherwise there will be NPE.
+    ).foreach { case(id, hostPort) =>
+      val input = new ProcessSummaryWrapper(
+        info = new ProcessSummary(
+          id = id,
+          hostPort = hostPort,
+          isActive = true,
+          totalCores = 4,
+          addTime = new Date(1234567L),
+          removeTime = Some(new Date(1234568L)),
+          processLogs = Map("log1" -> "log/log1", "log2" -> "logs/log2.log")
+        )
       )
-    )
-    val bytes = serializer.serialize(input)
-    val result = serializer.deserialize(bytes, classOf[ProcessSummaryWrapper])
-    assert(result.info.id == input.info.id)
-    assert(result.info.hostPort == input.info.hostPort)
-    assert(result.info.isActive == input.info.isActive)
-    assert(result.info.totalCores == input.info.totalCores)
-    assert(result.info.addTime == input.info.addTime)
-    assert(result.info.removeTime == input.info.removeTime)
-    assert(result.info.processLogs.size == input.info.processLogs.size)
-    result.info.processLogs.keys.foreach { k =>
-      assert(input.info.processLogs.contains(k))
-      assert(result.info.processLogs(k) == input.info.processLogs(k))
+      val bytes = serializer.serialize(input)
+      val result = serializer.deserialize(bytes, 
classOf[ProcessSummaryWrapper])
+      assert(result.info.id == input.info.id)
+      assert(result.info.hostPort == input.info.hostPort)
+      assert(result.info.isActive == input.info.isActive)
+      assert(result.info.totalCores == input.info.totalCores)
+      assert(result.info.addTime == input.info.addTime)
+      assert(result.info.removeTime == input.info.removeTime)
+      assert(result.info.processLogs.size == input.info.processLogs.size)
+      result.info.processLogs.keys.foreach { k =>
+        assert(input.info.processLogs.contains(k))
+        assert(result.info.processLogs(k) == input.info.processLogs(k))
+      }
     }
   }
 
@@ -1365,6 +1371,7 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
       assert(a1.name == a2.name)
       assert(a1.update.getOrElse("") == a2.update.getOrElse(""))
       assert(a1.update == a2.update)
+      assert(a1.value == a2.value)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to