This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9a4804e0f3d [SPARK-42138][UI] Handle null string values in
JobData/TaskDataWrapper/ExecutorStageSummaryWrapper
9a4804e0f3d is described below
commit 9a4804e0f3dc8029eab1d270b505a3e1e783d139
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Jan 20 22:35:14 2023 -0800
[SPARK-42138][UI] Handle null string values in
JobData/TaskDataWrapper/ExecutorStageSummaryWrapper
### What changes were proposed in this pull request?
Similar to https://github.com/apache/spark/pull/39666, this PR handles null
string values in JobData/TaskDataWrapper/ExecutorStageSummaryWrapper
### Why are the changes needed?
Properly handles null string values in the protobuf serializer.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New UTs
Closes #39680 from gengliangwang/handleMoreNull.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 12 +-
.../ExecutorStageSummaryWrapperSerializer.scala | 5 +-
.../status/protobuf/JobDataWrapperSerializer.scala | 7 +-
.../protobuf/TaskDataWrapperSerializer.scala | 19 +-
.../protobuf/KVStoreProtobufSerializerSuite.scala | 340 +++++++++++----------
5 files changed, 198 insertions(+), 185 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 96c78aa001d..b7afa93c04a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -41,7 +41,7 @@ enum JobExecutionStatus {
message JobData {
// All IDs are int64 for extendability, even when they are currently int32
in Spark.
int64 job_id = 1;
- string name = 2;
+ optional string name = 2;
optional string description = 3;
optional int64 submission_time = 4;
optional int64 completion_time = 5;
@@ -83,10 +83,10 @@ message TaskDataWrapper {
int64 launch_time = 5;
int64 result_fetch_start = 6;
int64 duration = 7;
- string executor_id = 8;
- string host = 9;
- string status = 10;
- string task_locality = 11;
+ optional string executor_id = 8;
+ optional string host = 9;
+ optional string status = 10;
+ optional string task_locality = 11;
bool speculative = 12;
repeated AccumulableInfo accumulator_updates = 13;
optional string error_message = 14;
@@ -156,7 +156,7 @@ message ExecutorStageSummary {
message ExecutorStageSummaryWrapper {
int64 stage_id = 1;
int32 stage_attempt_id = 2;
- string executor_id = 3;
+ optional string executor_id = 3;
ExecutorStageSummary info = 4;
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 2ffba9db755..8e41a857057 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.status.protobuf
import org.apache.spark.status.ExecutorStageSummaryWrapper
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
import org.apache.spark.util.Utils.weakIntern
class ExecutorStageSummaryWrapperSerializer
@@ -28,8 +29,8 @@ class ExecutorStageSummaryWrapperSerializer
val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
.setStageId(input.stageId.toLong)
.setStageAttemptId(input.stageAttemptId)
- .setExecutorId(input.executorId)
.setInfo(info)
+ setStringField(input.executorId, builder.setExecutorId)
builder.build().toByteArray
}
@@ -39,7 +40,7 @@ class ExecutorStageSummaryWrapperSerializer
new ExecutorStageSummaryWrapper(
stageId = binary.getStageId.toInt,
stageAttemptId = binary.getStageAttemptId,
- executorId = weakIntern(binary.getExecutorId),
+ executorId = getStringField(binary.hasExecutorId, () =>
weakIntern(binary.getExecutorId)),
info = info)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 10e0f125f6c..55bb4e2549e 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -23,7 +23,7 @@ import collection.JavaConverters._
import org.apache.spark.status.JobDataWrapper
import org.apache.spark.status.api.v1.JobData
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField,
setStringField}
class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] {
@@ -49,7 +49,6 @@ class JobDataWrapperSerializer extends
ProtobufSerDe[JobDataWrapper] {
private def serializeJobData(jobData: JobData): StoreTypes.JobData = {
val jobDataBuilder = StoreTypes.JobData.newBuilder()
jobDataBuilder.setJobId(jobData.jobId.toLong)
- .setName(jobData.name)
.setStatus(JobExecutionStatusSerializer.serialize(jobData.status))
.setNumTasks(jobData.numTasks)
.setNumActiveTasks(jobData.numActiveTasks)
@@ -62,7 +61,7 @@ class JobDataWrapperSerializer extends
ProtobufSerDe[JobDataWrapper] {
.setNumCompletedStages(jobData.numCompletedStages)
.setNumSkippedStages(jobData.numSkippedStages)
.setNumFailedStages(jobData.numFailedStages)
-
+ setStringField(jobData.name, jobDataBuilder.setName)
jobData.description.foreach(jobDataBuilder.setDescription)
jobData.submissionTime.foreach { d =>
jobDataBuilder.setSubmissionTime(d.getTime)
@@ -88,7 +87,7 @@ class JobDataWrapperSerializer extends
ProtobufSerDe[JobDataWrapper] {
new JobData(
jobId = info.getJobId.toInt,
- name = info.getName,
+ name = getStringField(info.hasName, info.getName),
description = description,
submissionTime = submissionTime,
completionTime = completionTime,
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
index 3c947c79eab..298a1612212 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
@@ -18,7 +18,7 @@
package org.apache.spark.status.protobuf
import org.apache.spark.status.TaskDataWrapper
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField,
setStringField}
import org.apache.spark.util.Utils.weakIntern
class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] {
@@ -32,10 +32,6 @@ class TaskDataWrapperSerializer extends
ProtobufSerDe[TaskDataWrapper] {
.setLaunchTime(input.launchTime)
.setResultFetchStart(input.resultFetchStart)
.setDuration(input.duration)
- .setExecutorId(input.executorId)
- .setHost(input.host)
- .setStatus(input.status)
- .setTaskLocality(input.taskLocality)
.setSpeculative(input.speculative)
.setHasMetrics(input.hasMetrics)
.setExecutorDeserializeTime(input.executorDeserializeTime)
@@ -74,6 +70,10 @@ class TaskDataWrapperSerializer extends
ProtobufSerDe[TaskDataWrapper] {
.setShuffleRecordsWritten(input.shuffleRecordsWritten)
.setStageId(input.stageId)
.setStageAttemptId(input.stageAttemptId)
+ setStringField(input.executorId, builder.setExecutorId)
+ setStringField(input.host, builder.setHost)
+ setStringField(input.status, builder.setStatus)
+ setStringField(input.taskLocality, builder.setTaskLocality)
input.errorMessage.foreach(builder.setErrorMessage)
input.accumulatorUpdates.foreach { update =>
builder.addAccumulatorUpdates(AccumulableInfoSerializer.serialize(update))
@@ -92,10 +92,11 @@ class TaskDataWrapperSerializer extends
ProtobufSerDe[TaskDataWrapper] {
launchTime = binary.getLaunchTime,
resultFetchStart = binary.getResultFetchStart,
duration = binary.getDuration,
- executorId = weakIntern(binary.getExecutorId),
- host = weakIntern(binary.getHost),
- status = weakIntern(binary.getStatus),
- taskLocality = weakIntern(binary.getTaskLocality),
+ executorId = getStringField(binary.hasExecutorId, () =>
weakIntern(binary.getExecutorId)),
+ host = getStringField(binary.hasHost, () => weakIntern(binary.getHost)),
+ status = getStringField(binary.hasStatus, () =>
weakIntern(binary.getStatus)),
+ taskLocality =
+ getStringField(binary.hasTaskLocality, () =>
weakIntern(binary.getTaskLocality)),
speculative = binary.getSpeculative,
accumulatorUpdates = accumulatorUpdates,
errorMessage = getOptional(binary.hasErrorMessage,
binary.getErrorMessage),
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 0d0d26410ed..774d7abc420 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -32,55 +32,60 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
private val serializer = new KVStoreProtobufSerializer()
test("Job data") {
- val input = new JobDataWrapper(
- new JobData(
- jobId = 1,
- name = "test",
- description = Some("test description"),
- submissionTime = Some(new Date(123456L)),
- completionTime = Some(new Date(654321L)),
- stageIds = Seq(1, 2, 3, 4),
- jobGroup = Some("group"),
- status = JobExecutionStatus.UNKNOWN,
- numTasks = 2,
- numActiveTasks = 3,
- numCompletedTasks = 4,
- numSkippedTasks = 5,
- numFailedTasks = 6,
- numKilledTasks = 7,
- numCompletedIndices = 8,
- numActiveStages = 9,
- numCompletedStages = 10,
- numSkippedStages = 11,
- numFailedStages = 12,
- killedTasksSummary = Map("a" -> 1, "b" -> 2)),
- Set(1, 2),
- Some(999)
- )
+ Seq(
+ ("test", Some("test description"), Some("group")),
+ (null, None, None)
+ ).foreach { case (name, description, jobGroup) =>
+ val input = new JobDataWrapper(
+ new JobData(
+ jobId = 1,
+ name = name,
+ description = description,
+ submissionTime = Some(new Date(123456L)),
+ completionTime = Some(new Date(654321L)),
+ stageIds = Seq(1, 2, 3, 4),
+ jobGroup = jobGroup,
+ status = JobExecutionStatus.UNKNOWN,
+ numTasks = 2,
+ numActiveTasks = 3,
+ numCompletedTasks = 4,
+ numSkippedTasks = 5,
+ numFailedTasks = 6,
+ numKilledTasks = 7,
+ numCompletedIndices = 8,
+ numActiveStages = 9,
+ numCompletedStages = 10,
+ numSkippedStages = 11,
+ numFailedStages = 12,
+ killedTasksSummary = Map("a" -> 1, "b" -> 2)),
+ Set(1, 2),
+ Some(999)
+ )
- val bytes = serializer.serialize(input)
- val result = serializer.deserialize(bytes, classOf[JobDataWrapper])
- assert(result.info.jobId == input.info.jobId)
- assert(result.info.description == input.info.description)
- assert(result.info.submissionTime == input.info.submissionTime)
- assert(result.info.completionTime == input.info.completionTime)
- assert(result.info.stageIds == input.info.stageIds)
- assert(result.info.jobGroup == input.info.jobGroup)
- assert(result.info.status == input.info.status)
- assert(result.info.numTasks == input.info.numTasks)
- assert(result.info.numActiveTasks == input.info.numActiveTasks)
- assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
- assert(result.info.numSkippedTasks == input.info.numSkippedTasks)
- assert(result.info.numFailedTasks == input.info.numFailedTasks)
- assert(result.info.numKilledTasks == input.info.numKilledTasks)
- assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
- assert(result.info.numActiveStages == input.info.numActiveStages)
- assert(result.info.numCompletedStages == input.info.numCompletedStages)
- assert(result.info.numSkippedStages == input.info.numSkippedStages)
- assert(result.info.numFailedStages == input.info.numFailedStages)
- assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
- assert(result.skippedStages == input.skippedStages)
- assert(result.sqlExecutionId == input.sqlExecutionId)
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[JobDataWrapper])
+ assert(result.info.jobId == input.info.jobId)
+ assert(result.info.description == input.info.description)
+ assert(result.info.submissionTime == input.info.submissionTime)
+ assert(result.info.completionTime == input.info.completionTime)
+ assert(result.info.stageIds == input.info.stageIds)
+ assert(result.info.jobGroup == input.info.jobGroup)
+ assert(result.info.status == input.info.status)
+ assert(result.info.numTasks == input.info.numTasks)
+ assert(result.info.numActiveTasks == input.info.numActiveTasks)
+ assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
+ assert(result.info.numSkippedTasks == input.info.numSkippedTasks)
+ assert(result.info.numFailedTasks == input.info.numFailedTasks)
+ assert(result.info.numKilledTasks == input.info.numKilledTasks)
+ assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
+ assert(result.info.numActiveStages == input.info.numActiveStages)
+ assert(result.info.numCompletedStages == input.info.numCompletedStages)
+ assert(result.info.numSkippedStages == input.info.numSkippedStages)
+ assert(result.info.numFailedStages == input.info.numFailedStages)
+ assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
+ assert(result.skippedStages == input.skippedStages)
+ assert(result.sqlExecutionId == input.sqlExecutionId)
+ }
}
test("Task Data") {
@@ -89,112 +94,117 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
new AccumulableInfo(2L, "duration2", None, "value2"),
new AccumulableInfo(-1L, null, None, null)
)
- val input = new TaskDataWrapper(
- taskId = 1,
- index = 2,
- attempt = 3,
- partitionId = 4,
- launchTime = 5L,
- resultFetchStart = 6L,
- duration = 10000L,
- executorId = "executor_id_1",
- host = "host_name",
- status = "SUCCESS",
- taskLocality = "LOCAL",
- speculative = true,
- accumulatorUpdates = accumulatorUpdates,
- errorMessage = Some("error"),
- hasMetrics = true,
- executorDeserializeTime = 7L,
- executorDeserializeCpuTime = 8L,
- executorRunTime = 9L,
- executorCpuTime = 10L,
- resultSize = 11L,
- jvmGcTime = 12L,
- resultSerializationTime = 13L,
- memoryBytesSpilled = 14L,
- diskBytesSpilled = 15L,
- peakExecutionMemory = 16L,
- inputBytesRead = 17L,
- inputRecordsRead = 18L,
- outputBytesWritten = 19L,
- outputRecordsWritten = 20L,
- shuffleRemoteBlocksFetched = 21L,
- shuffleLocalBlocksFetched = 22L,
- shuffleFetchWaitTime = 23L,
- shuffleRemoteBytesRead = 24L,
- shuffleRemoteBytesReadToDisk = 25L,
- shuffleLocalBytesRead = 26L,
- shuffleRecordsRead = 27L,
- shuffleCorruptMergedBlockChunks = 28L,
- shuffleMergedFetchFallbackCount = 29L,
- shuffleMergedRemoteBlocksFetched = 30L,
- shuffleMergedLocalBlocksFetched = 31L,
- shuffleMergedRemoteChunksFetched = 32L,
- shuffleMergedLocalChunksFetched = 33L,
- shuffleMergedRemoteBytesRead = 34L,
- shuffleMergedLocalBytesRead = 35L,
- shuffleRemoteReqsDuration = 36L,
- shuffleMergedRemoteReqDuration = 37L,
- shuffleBytesWritten = 38L,
- shuffleWriteTime = 39L,
- shuffleRecordsWritten = 40L,
- stageId = 41,
- stageAttemptId = 42)
+ Seq(
+ ("executor_id_1", "host_name", "SUCCESS", "LOCAL"),
+ (null, null, null, null)
+ ).foreach { case (executorId, host, status, taskLocality) =>
+ val input = new TaskDataWrapper(
+ taskId = 1,
+ index = 2,
+ attempt = 3,
+ partitionId = 4,
+ launchTime = 5L,
+ resultFetchStart = 6L,
+ duration = 10000L,
+ executorId = executorId,
+ host = host,
+ status = status,
+ taskLocality = taskLocality,
+ speculative = true,
+ accumulatorUpdates = accumulatorUpdates,
+ errorMessage = Some("error"),
+ hasMetrics = true,
+ executorDeserializeTime = 7L,
+ executorDeserializeCpuTime = 8L,
+ executorRunTime = 9L,
+ executorCpuTime = 10L,
+ resultSize = 11L,
+ jvmGcTime = 12L,
+ resultSerializationTime = 13L,
+ memoryBytesSpilled = 14L,
+ diskBytesSpilled = 15L,
+ peakExecutionMemory = 16L,
+ inputBytesRead = 17L,
+ inputRecordsRead = 18L,
+ outputBytesWritten = 19L,
+ outputRecordsWritten = 20L,
+ shuffleRemoteBlocksFetched = 21L,
+ shuffleLocalBlocksFetched = 22L,
+ shuffleFetchWaitTime = 23L,
+ shuffleRemoteBytesRead = 24L,
+ shuffleRemoteBytesReadToDisk = 25L,
+ shuffleLocalBytesRead = 26L,
+ shuffleRecordsRead = 27L,
+ shuffleCorruptMergedBlockChunks = 28L,
+ shuffleMergedFetchFallbackCount = 29L,
+ shuffleMergedRemoteBlocksFetched = 30L,
+ shuffleMergedLocalBlocksFetched = 31L,
+ shuffleMergedRemoteChunksFetched = 32L,
+ shuffleMergedLocalChunksFetched = 33L,
+ shuffleMergedRemoteBytesRead = 34L,
+ shuffleMergedLocalBytesRead = 35L,
+ shuffleRemoteReqsDuration = 36L,
+ shuffleMergedRemoteReqDuration = 37L,
+ shuffleBytesWritten = 38L,
+ shuffleWriteTime = 39L,
+ shuffleRecordsWritten = 40L,
+ stageId = 41,
+ stageAttemptId = 42)
- val bytes = serializer.serialize(input)
- val result = serializer.deserialize(bytes, classOf[TaskDataWrapper])
- checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates)
- assert(result.taskId == input.taskId)
- assert(result.index == input.index)
- assert(result.attempt == input.attempt)
- assert(result.partitionId == input.partitionId)
- assert(result.launchTime == input.launchTime)
- assert(result.resultFetchStart == input.resultFetchStart)
- assert(result.duration == input.duration)
- assert(result.executorId == input.executorId)
- assert(result.host == input.host)
- assert(result.status == input.status)
- assert(result.taskLocality == input.taskLocality)
- assert(result.speculative == input.speculative)
- assert(result.errorMessage == input.errorMessage)
- assert(result.hasMetrics == input.hasMetrics)
- assert(result.executorDeserializeTime == input.executorDeserializeTime)
- assert(result.executorDeserializeCpuTime ==
input.executorDeserializeCpuTime)
- assert(result.executorRunTime == input.executorRunTime)
- assert(result.executorCpuTime == input.executorCpuTime)
- assert(result.resultSize == input.resultSize)
- assert(result.jvmGcTime == input.jvmGcTime)
- assert(result.resultSerializationTime == input.resultSerializationTime)
- assert(result.memoryBytesSpilled == input.memoryBytesSpilled)
- assert(result.diskBytesSpilled == input.diskBytesSpilled)
- assert(result.peakExecutionMemory == input.peakExecutionMemory)
- assert(result.inputBytesRead == input.inputBytesRead)
- assert(result.inputRecordsRead == input.inputRecordsRead)
- assert(result.outputBytesWritten == input.outputBytesWritten)
- assert(result.outputRecordsWritten == input.outputRecordsWritten)
- assert(result.shuffleRemoteBlocksFetched ==
input.shuffleRemoteBlocksFetched)
- assert(result.shuffleLocalBlocksFetched == input.shuffleLocalBlocksFetched)
- assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime)
- assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead)
- assert(result.shuffleRemoteBytesReadToDisk ==
input.shuffleRemoteBytesReadToDisk)
- assert(result.shuffleLocalBytesRead == input.shuffleLocalBytesRead)
- assert(result.shuffleRecordsRead == input.shuffleRecordsRead)
- assert(result.shuffleCorruptMergedBlockChunks ==
input.shuffleCorruptMergedBlockChunks)
- assert(result.shuffleMergedFetchFallbackCount ==
input.shuffleMergedFetchFallbackCount)
- assert(result.shuffleMergedRemoteBlocksFetched ==
input.shuffleMergedRemoteBlocksFetched)
- assert(result.shuffleMergedLocalBlocksFetched ==
input.shuffleMergedLocalBlocksFetched)
- assert(result.shuffleMergedRemoteChunksFetched ==
input.shuffleMergedRemoteChunksFetched)
- assert(result.shuffleMergedLocalChunksFetched ==
input.shuffleMergedLocalChunksFetched)
- assert(result.shuffleMergedRemoteBytesRead ==
input.shuffleMergedRemoteBytesRead)
- assert(result.shuffleMergedLocalBytesRead ==
input.shuffleMergedLocalBytesRead)
- assert(result.shuffleRemoteReqsDuration == input.shuffleRemoteReqsDuration)
- assert(result.shuffleMergedRemoteReqDuration ==
input.shuffleMergedRemoteReqDuration)
- assert(result.shuffleBytesWritten == input.shuffleBytesWritten)
- assert(result.shuffleWriteTime == input.shuffleWriteTime)
- assert(result.shuffleRecordsWritten == input.shuffleRecordsWritten)
- assert(result.stageId == input.stageId)
- assert(result.stageAttemptId == input.stageAttemptId)
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[TaskDataWrapper])
+ checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates)
+ assert(result.taskId == input.taskId)
+ assert(result.index == input.index)
+ assert(result.attempt == input.attempt)
+ assert(result.partitionId == input.partitionId)
+ assert(result.launchTime == input.launchTime)
+ assert(result.resultFetchStart == input.resultFetchStart)
+ assert(result.duration == input.duration)
+ assert(result.executorId == input.executorId)
+ assert(result.host == input.host)
+ assert(result.status == input.status)
+ assert(result.taskLocality == input.taskLocality)
+ assert(result.speculative == input.speculative)
+ assert(result.errorMessage == input.errorMessage)
+ assert(result.hasMetrics == input.hasMetrics)
+ assert(result.executorDeserializeTime == input.executorDeserializeTime)
+ assert(result.executorDeserializeCpuTime ==
input.executorDeserializeCpuTime)
+ assert(result.executorRunTime == input.executorRunTime)
+ assert(result.executorCpuTime == input.executorCpuTime)
+ assert(result.resultSize == input.resultSize)
+ assert(result.jvmGcTime == input.jvmGcTime)
+ assert(result.resultSerializationTime == input.resultSerializationTime)
+ assert(result.memoryBytesSpilled == input.memoryBytesSpilled)
+ assert(result.diskBytesSpilled == input.diskBytesSpilled)
+ assert(result.peakExecutionMemory == input.peakExecutionMemory)
+ assert(result.inputBytesRead == input.inputBytesRead)
+ assert(result.inputRecordsRead == input.inputRecordsRead)
+ assert(result.outputBytesWritten == input.outputBytesWritten)
+ assert(result.outputRecordsWritten == input.outputRecordsWritten)
+ assert(result.shuffleRemoteBlocksFetched ==
input.shuffleRemoteBlocksFetched)
+ assert(result.shuffleLocalBlocksFetched ==
input.shuffleLocalBlocksFetched)
+ assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime)
+ assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead)
+ assert(result.shuffleRemoteBytesReadToDisk ==
input.shuffleRemoteBytesReadToDisk)
+ assert(result.shuffleLocalBytesRead == input.shuffleLocalBytesRead)
+ assert(result.shuffleRecordsRead == input.shuffleRecordsRead)
+ assert(result.shuffleCorruptMergedBlockChunks ==
input.shuffleCorruptMergedBlockChunks)
+ assert(result.shuffleMergedFetchFallbackCount ==
input.shuffleMergedFetchFallbackCount)
+ assert(result.shuffleMergedRemoteBlocksFetched ==
input.shuffleMergedRemoteBlocksFetched)
+ assert(result.shuffleMergedLocalBlocksFetched ==
input.shuffleMergedLocalBlocksFetched)
+ assert(result.shuffleMergedRemoteChunksFetched ==
input.shuffleMergedRemoteChunksFetched)
+ assert(result.shuffleMergedLocalChunksFetched ==
input.shuffleMergedLocalChunksFetched)
+ assert(result.shuffleMergedRemoteBytesRead ==
input.shuffleMergedRemoteBytesRead)
+ assert(result.shuffleMergedLocalBytesRead ==
input.shuffleMergedLocalBytesRead)
+ assert(result.shuffleRemoteReqsDuration ==
input.shuffleRemoteReqsDuration)
+ assert(result.shuffleMergedRemoteReqDuration ==
input.shuffleMergedRemoteReqDuration)
+ assert(result.shuffleBytesWritten == input.shuffleBytesWritten)
+ assert(result.shuffleWriteTime == input.shuffleWriteTime)
+ assert(result.shuffleRecordsWritten == input.shuffleRecordsWritten)
+ assert(result.stageId == input.stageId)
+ assert(result.stageAttemptId == input.stageAttemptId)
+ }
}
test("Executor Stage Summary") {
@@ -218,17 +228,19 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
isBlacklistedForStage = true,
peakMemoryMetrics = peakMemoryMetrics,
isExcludedForStage = false)
- val input = new ExecutorStageSummaryWrapper(
- stageId = 1,
- stageAttemptId = 2,
- executorId = "executor_id_1",
- info = info)
- val bytes = serializer.serialize(input)
- val result = serializer.deserialize(bytes,
classOf[ExecutorStageSummaryWrapper])
- assert(result.stageId == input.stageId)
- assert(result.stageAttemptId == input.stageAttemptId)
- assert(result.executorId == input.executorId)
- checkAnswer(result.info, input.info)
+ Seq("executor_id_1", null).foreach { executorId =>
+ val input = new ExecutorStageSummaryWrapper(
+ stageId = 1,
+ stageAttemptId = 2,
+ executorId = executorId,
+ info = info)
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes,
classOf[ExecutorStageSummaryWrapper])
+ assert(result.stageId == input.stageId)
+ assert(result.stageAttemptId == input.stageAttemptId)
+ assert(result.executorId == input.executorId)
+ checkAnswer(result.info, input.info)
+ }
}
test("Application Environment Info") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]