This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 330b7963f97 [SPARK-42140][CORE] Handle null string values in
ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper
330b7963f97 is described below
commit 330b7963f97f06c5bb3da358c61a1ac1089666e7
Author: yangjie01 <[email protected]>
AuthorDate: Sat Jan 21 15:33:48 2023 -0800
[SPARK-42140][CORE] Handle null string values in
ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper
### What changes were proposed in this pull request?
Similar to #39666, this PR handles null string values in
ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper
### Why are the changes needed?
Properly handles null string values in the protobuf serializer.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
New UTs
Closes #39684 from LuciferYang/SPARK-42140.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 22 ++++++-------
...plicationEnvironmentInfoWrapperSerializer.scala | 34 ++++++++++++--------
.../ApplicationInfoWrapperSerializer.scala | 18 +++++------
.../protobuf/KVStoreProtobufSerializerSuite.scala | 37 ++++++++++++++++------
4 files changed, 67 insertions(+), 44 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 598800da9f5..49076790321 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -161,14 +161,14 @@ message ExecutorStageSummaryWrapper {
}
message ExecutorResourceRequest {
- string resource_name = 1;
+ optional string resource_name = 1;
int64 amount = 2;
- string discoveryScript = 3;
- string vendor = 4;
+ optional string discoveryScript = 3;
+ optional string vendor = 4;
}
message TaskResourceRequest {
- string resource_name = 1;
+ optional string resource_name = 1;
double amount = 2;
}
@@ -179,9 +179,9 @@ message ResourceProfileInfo {
}
message RuntimeInfo {
- string java_version = 1;
- string java_home = 2;
- string scala_version = 3;
+ optional string java_version = 1;
+ optional string java_home = 2;
+ optional string scala_version = 3;
}
message PairStrings {
@@ -209,14 +209,14 @@ message ApplicationAttemptInfo {
int64 end_time = 3;
int64 last_updated = 4;
int64 duration = 5;
- string spark_user = 6;
+ optional string spark_user = 6;
bool completed = 7;
- string app_spark_version = 8;
+ optional string app_spark_version = 8;
}
message ApplicationInfo {
- string id = 1;
- string name = 2;
+ optional string id = 1;
+ optional string name = 2;
optional int32 cores_granted = 3;
optional int32 max_cores = 4;
optional int32 cores_per_executor = 5;
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index b7cf01382e2..fbbc55387b8 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -22,6 +22,7 @@ import collection.JavaConverters._
import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
import org.apache.spark.status.ApplicationEnvironmentInfoWrapper
import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo,
ResourceProfileInfo, RuntimeInfo}
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
class ApplicationEnvironmentInfoWrapperSerializer
extends ProtobufSerDe[ApplicationEnvironmentInfoWrapper] {
@@ -43,9 +44,10 @@ class ApplicationEnvironmentInfoWrapperSerializer
StoreTypes.ApplicationEnvironmentInfo = {
val runtimeBuilder = StoreTypes.RuntimeInfo.newBuilder()
- runtimeBuilder.setJavaVersion(info.runtime.javaVersion)
- runtimeBuilder.setJavaHome(info.runtime.javaHome)
- runtimeBuilder.setScalaVersion(info.runtime.scalaVersion)
+ val runtime = info.runtime
+ setStringField(runtime.javaHome, runtimeBuilder.setJavaHome)
+ setStringField(runtime.javaVersion, runtimeBuilder.setJavaVersion)
+ setStringField(runtime.scalaVersion, runtimeBuilder.setScalaVersion)
val builder = StoreTypes.ApplicationEnvironmentInfo.newBuilder()
builder.setRuntime(runtimeBuilder.build())
@@ -72,10 +74,11 @@ class ApplicationEnvironmentInfoWrapperSerializer
private def deserializeApplicationEnvironmentInfo(info:
StoreTypes.ApplicationEnvironmentInfo):
ApplicationEnvironmentInfo = {
+ val rt = info.getRuntime
val runtime = new RuntimeInfo (
- javaVersion = info.getRuntime.getJavaVersion,
- javaHome = info.getRuntime.getJavaHome,
- scalaVersion = info.getRuntime.getScalaVersion
+ javaVersion = getStringField(rt.hasJavaVersion, () => rt.getJavaVersion),
+ javaHome = getStringField(rt.hasJavaHome, () => rt.getJavaHome),
+ scalaVersion = getStringField(rt.hasScalaVersion, () =>
rt.getScalaVersion)
)
val pairSSToTuple = (pair: StoreTypes.PairStrings) => {
(pair.getValue1, pair.getValue2)
@@ -105,15 +108,15 @@ class ApplicationEnvironmentInfoWrapperSerializer
builder.setId(info.id)
info.executorResources.foreach{case (k, resource) =>
val requestBuilder = StoreTypes.ExecutorResourceRequest.newBuilder()
- requestBuilder.setResourceName(resource.resourceName)
+ setStringField(resource.resourceName, requestBuilder.setResourceName)
requestBuilder.setAmount(resource.amount)
- requestBuilder.setDiscoveryScript(resource.discoveryScript)
- requestBuilder.setVendor(resource.vendor)
+ setStringField(resource.discoveryScript,
requestBuilder.setDiscoveryScript)
+ setStringField(resource.vendor, requestBuilder.setVendor)
builder.putExecutorResources(k, requestBuilder.build())
}
info.taskResources.foreach { case (k, resource) =>
val requestBuilder = StoreTypes.TaskResourceRequest.newBuilder()
- requestBuilder.setResourceName(resource.resourceName)
+ setStringField(resource.resourceName, requestBuilder.setResourceName)
requestBuilder.setAmount(resource.amount)
builder.putTaskResources(k, requestBuilder.build())
}
@@ -134,15 +137,18 @@ class ApplicationEnvironmentInfoWrapperSerializer
private def deserializeExecutorResourceRequest(info:
StoreTypes.ExecutorResourceRequest):
ExecutorResourceRequest = {
new ExecutorResourceRequest(
- resourceName = info.getResourceName,
+ resourceName = getStringField(info.hasResourceName, () =>
info.getResourceName),
amount = info.getAmount,
- discoveryScript = info.getDiscoveryScript,
- vendor = info.getVendor
+ discoveryScript = getStringField(info.hasDiscoveryScript, () =>
info.getDiscoveryScript),
+ vendor = getStringField(info.hasVendor, () => info.getVendor)
)
}
private def deserializeTaskResourceRequest(info:
StoreTypes.TaskResourceRequest):
TaskResourceRequest = {
- new TaskResourceRequest(resourceName = info.getResourceName, amount =
info.getAmount)
+ new TaskResourceRequest(
+ resourceName = getStringField(info.hasResourceName, () =>
info.getResourceName),
+ amount = info.getAmount
+ )
}
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
index c56b5302cc1..4b2bcfa1d1f 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
@@ -23,7 +23,7 @@ import collection.JavaConverters._
import org.apache.spark.status.ApplicationInfoWrapper
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils._
class ApplicationInfoWrapperSerializer extends
ProtobufSerDe[ApplicationInfoWrapper] {
@@ -44,8 +44,8 @@ class ApplicationInfoWrapperSerializer extends
ProtobufSerDe[ApplicationInfoWrap
private def serializeApplicationInfo(info: ApplicationInfo):
StoreTypes.ApplicationInfo = {
val builder = StoreTypes.ApplicationInfo.newBuilder()
- builder.setId(info.id)
- .setName(info.name)
+ setStringField(info.id, builder.setId)
+ setStringField(info.name, builder.setName)
info.coresGranted.foreach { c =>
builder.setCoresGranted(c)
}
@@ -71,8 +71,8 @@ class ApplicationInfoWrapperSerializer extends
ProtobufSerDe[ApplicationInfoWrap
val memoryPerExecutorMB = getOptional(info.hasMemoryPerExecutorMb,
info.getMemoryPerExecutorMb)
val attempts =
info.getAttemptsList.asScala.map(deserializeApplicationAttemptInfo)
ApplicationInfo(
- id = info.getId,
- name = info.getName,
+ id = getStringField(info.hasId, () => info.getId),
+ name = getStringField(info.hasName, () => info.getName),
coresGranted = coresGranted,
maxCores = maxCores,
coresPerExecutor = coresPerExecutor,
@@ -88,9 +88,9 @@ class ApplicationInfoWrapperSerializer extends
ProtobufSerDe[ApplicationInfoWrap
.setEndTime(info.endTime.getTime)
.setLastUpdated(info.lastUpdated.getTime)
.setDuration(info.duration)
- .setSparkUser(info.sparkUser)
.setCompleted(info.completed)
- .setAppSparkVersion(info.appSparkVersion)
+ setStringField(info.sparkUser, builder.setSparkUser)
+ setStringField(info.appSparkVersion, builder.setAppSparkVersion)
info.attemptId.foreach{ id =>
builder.setAttemptId(id)
}
@@ -107,9 +107,9 @@ class ApplicationInfoWrapperSerializer extends
ProtobufSerDe[ApplicationInfoWrap
endTime = new Date(info.getEndTime),
lastUpdated = new Date(info.getLastUpdated),
duration = info.getDuration,
- sparkUser = info.getSparkUser,
+ sparkUser = getStringField(info.hasSparkUser, () => info.getSparkUser),
completed = info.getCompleted,
- appSparkVersion = info.getAppSparkVersion
+ appSparkVersion = getStringField(info.hasAppSparkVersion, () =>
info.getAppSparkVersion)
)
}
}
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index e9dc9e00f67..14dd2cd601d 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -244,12 +244,21 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
}
test("Application Environment Info") {
+ testApplicationEnvironmentInfoWrapperSerDe("1.8", "/tmp/java", "2.13")
+ }
+
+ test("Application Environment Info with nulls") {
+ testApplicationEnvironmentInfoWrapperSerDe(null, null, null)
+ }
+
+ private def testApplicationEnvironmentInfoWrapperSerDe(
+ javaVersion: String, javaHome: String, scalaVersion: String): Unit = {
val input = new ApplicationEnvironmentInfoWrapper(
new ApplicationEnvironmentInfo(
runtime = new RuntimeInfo(
- javaVersion = "1.8",
- javaHome = "/tmp/java",
- scalaVersion = "2.13"),
+ javaVersion = javaVersion,
+ javaHome = javaHome,
+ scalaVersion = scalaVersion),
sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2")),
hadoopProperties = Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2",
"val2")),
systemProperties = Seq(("sys.prop.1", "value1"), ("sys.prop.2",
"value2")),
@@ -264,10 +273,10 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
discoveryScript = "script0",
vendor = "apache"),
"1" -> new ExecutorResourceRequest(
- resourceName = "exec2",
+ resourceName = null,
amount = 1,
- discoveryScript = "script1",
- vendor = "apache")
+ discoveryScript = null,
+ vendor = null)
),
taskResources = Map(
"0" -> new TaskResourceRequest(resourceName = "exec1", amount = 1),
@@ -323,6 +332,14 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
}
test("Application Info") {
+ testApplicationInfoWrapperSerDe("2", "app_2")
+ }
+
+ test("Application Info with nulls") {
+ testApplicationInfoWrapperSerDe(null, null)
+ }
+
+ private def testApplicationInfoWrapperSerDe(id: String, name: String): Unit
= {
val attempts: Seq[ApplicationAttemptInfo] = Seq(
ApplicationAttemptInfo(
attemptId = Some("001"),
@@ -340,14 +357,14 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
endTime = new Date(17L),
lastUpdated = new Date(18L),
duration = 100,
- sparkUser = "user",
+ sparkUser = null,
completed = true,
- appSparkVersion = "3.4.0"
+ appSparkVersion = null
))
val input = new ApplicationInfoWrapper(
ApplicationInfo(
- id = "2",
- name = "app_2",
+ id = id,
+ name = name,
coresGranted = Some(1),
maxCores = Some(2),
coresPerExecutor = Some(3),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]