This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 878ac2fe488 [SPARK-41759][CORE] Use `weakIntern` on string values in
create new objects during deserialization
878ac2fe488 is described below
commit 878ac2fe4880472f03e6d9907a68fde30479b281
Author: panbingkun <[email protected]>
AuthorDate: Thu Jan 19 13:53:34 2023 -0800
[SPARK-41759][CORE] Use `weakIntern` on string values in create new objects
during deserialization
### What changes were proposed in this pull request?
The pr aims to use weakIntern on string values in create new objects during
deserialization.
### Why are the changes needed?
Following guid: https://github.com/apache/spark/pull/39270.
<img width="587" alt="image"
src="https://user-images.githubusercontent.com/15246973/209924939-779db183-f0a6-4f3b-aebf-cf00b6c95cf8.png">
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #39275 from panbingkun/SPARK-41759.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/status/protobuf/AccumulableInfoSerializer.scala | 3 ++-
.../protobuf/ExecutorStageSummaryWrapperSerializer.scala | 3 ++-
.../status/protobuf/ExecutorSummaryWrapperSerializer.scala | 9 +++++----
.../apache/spark/status/protobuf/PoolDataSerializer.scala | 3 +--
.../spark/status/protobuf/StageDataWrapperSerializer.scala | 12 +++++-------
.../spark/status/protobuf/StreamBlockDataSerializer.scala | 7 ++++---
6 files changed, 19 insertions(+), 18 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
index 8d5046923e9..18f937cecdb 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.status.api.v1.AccumulableInfo
import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.util.Utils.weakIntern
private[protobuf] object AccumulableInfoSerializer {
@@ -40,7 +41,7 @@ private[protobuf] object AccumulableInfoSerializer {
updates.forEach { update =>
accumulatorUpdates.append(new AccumulableInfo(
id = update.getId,
- name = update.getName,
+ name = weakIntern(update.getName),
update = getOptional(update.hasUpdate, update.getUpdate),
value = update.getValue))
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 4d9d045ed5e..2ffba9db755 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.status.protobuf
import org.apache.spark.status.ExecutorStageSummaryWrapper
+import org.apache.spark.util.Utils.weakIntern
class ExecutorStageSummaryWrapperSerializer
extends ProtobufSerDe[ExecutorStageSummaryWrapper] {
@@ -38,7 +39,7 @@ class ExecutorStageSummaryWrapperSerializer
new ExecutorStageSummaryWrapper(
stageId = binary.getStageId.toInt,
stageAttemptId = binary.getStageAttemptId,
- executorId = binary.getExecutorId,
+ executorId = weakIntern(binary.getExecutorId),
info = info)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index b008c98e562..7092a78b00a 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -25,6 +25,7 @@ import org.apache.spark.resource.ResourceInformation
import org.apache.spark.status.ExecutorSummaryWrapper
import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics}
import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.util.Utils.weakIntern
class ExecutorSummaryWrapperSerializer extends
ProtobufSerDe[ExecutorSummaryWrapper] {
@@ -109,8 +110,8 @@ class ExecutorSummaryWrapperSerializer extends
ProtobufSerDe[ExecutorSummaryWrap
getOptional(binary.hasMemoryMetrics,
() => deserializeMemoryMetrics(binary.getMemoryMetrics))
new ExecutorSummary(
- id = binary.getId,
- hostPort = binary.getHostPort,
+ id = weakIntern(binary.getId),
+ hostPort = weakIntern(binary.getHostPort),
isActive = binary.getIsActive,
rddBlocks = binary.getRddBlocks,
memoryUsed = binary.getMemoryUsed,
@@ -171,7 +172,7 @@ class ExecutorSummaryWrapperSerializer extends
ProtobufSerDe[ExecutorSummaryWrap
private def deserializeResourceInformation(binary:
StoreTypes.ResourceInformation):
ResourceInformation = {
new ResourceInformation(
- name = binary.getName,
- addresses = binary.getAddressesList.asScala.toArray)
+ name = weakIntern(binary.getName),
+ addresses = binary.getAddressesList.asScala.map(weakIntern).toArray)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala
index a9c356ef13c..d1129d90339 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/PoolDataSerializer.scala
@@ -20,7 +20,6 @@ package org.apache.spark.status.protobuf
import scala.collection.JavaConverters._
import org.apache.spark.status.PoolData
-import org.apache.spark.util.Utils.weakIntern
class PoolDataSerializer extends ProtobufSerDe[PoolData] {
@@ -34,7 +33,7 @@ class PoolDataSerializer extends ProtobufSerDe[PoolData] {
override def deserialize(bytes: Array[Byte]): PoolData = {
val poolData = StoreTypes.PoolData.parseFrom(bytes)
new PoolData(
- name = weakIntern(poolData.getName),
+ name = poolData.getName,
stageIds = poolData.getStageIdsList.asScala.map(_.toInt).toSet
)
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
index eda4422405e..dc72c3ed467 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -393,10 +393,8 @@ class StageDataWrapperSerializer extends
ProtobufSerDe[StageDataWrapper] {
getOptional(binary.hasFirstTaskLaunchedTime, () => new
Date(binary.getFirstTaskLaunchedTime))
val completionTime =
getOptional(binary.hasCompletionTime, () => new
Date(binary.getCompletionTime))
- val failureReason =
- getOptional(binary.hasFailureReason, () =>
weakIntern(binary.getFailureReason))
- val description =
- getOptional(binary.hasDescription, () =>
weakIntern(binary.getDescription))
+ val failureReason = getOptional(binary.hasFailureReason,
binary.getFailureReason)
+ val description = getOptional(binary.hasDescription, binary.getDescription)
val accumulatorUpdates =
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
val tasks = if (MapUtils.isNotEmpty(binary.getTasksMap)) {
Some(binary.getTasksMap.asScala.map(
@@ -467,9 +465,9 @@ class StageDataWrapperSerializer extends
ProtobufSerDe[StageDataWrapper] {
shuffleWriteBytes = binary.getShuffleWriteBytes,
shuffleWriteTime = binary.getShuffleWriteTime,
shuffleWriteRecords = binary.getShuffleWriteRecords,
- name = weakIntern(binary.getName),
+ name = binary.getName,
description = description,
- details = weakIntern(binary.getDetails),
+ details = binary.getDetails,
schedulingPool = weakIntern(binary.getSchedulingPool),
rddIds = binary.getRddIdsList.asScala.map(_.toInt),
accumulatorUpdates = accumulatorUpdates,
@@ -644,7 +642,7 @@ class StageDataWrapperSerializer extends
ProtobufSerDe[StageDataWrapper] {
taskLocality = weakIntern(binary.getTaskLocality),
speculative = binary.getSpeculative,
accumulatorUpdates = accumulatorUpdates,
- errorMessage = getOptional(binary.hasErrorMessage, () =>
weakIntern(binary.getErrorMessage)),
+ errorMessage = getOptional(binary.hasErrorMessage,
binary.getErrorMessage),
taskMetrics = taskMetrics,
executorLogs = binary.getExecutorLogsMap.asScala.toMap,
schedulerDelay = binary.getSchedulerDelay,
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
index f450bbbfd0c..5dac03bb337 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.status.protobuf
import org.apache.spark.status.StreamBlockData
+import org.apache.spark.util.Utils.weakIntern
class StreamBlockDataSerializer extends ProtobufSerDe[StreamBlockData] {
@@ -39,9 +40,9 @@ class StreamBlockDataSerializer extends
ProtobufSerDe[StreamBlockData] {
val binary = StoreTypes.StreamBlockData.parseFrom(bytes)
new StreamBlockData(
name = binary.getName,
- executorId = binary.getExecutorId,
- hostPort = binary.getHostPort,
- storageLevel = binary.getStorageLevel,
+ executorId = weakIntern(binary.getExecutorId),
+ hostPort = weakIntern(binary.getHostPort),
+ storageLevel = weakIntern(binary.getStorageLevel),
useMemory = binary.getUseMemory,
useDisk = binary.getUseDisk,
deserialized = binary.getDeserialized,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]