This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f6038302dd6 [SPARK-45793][CORE] Improve the built-in compression codecs
f6038302dd6 is described below
commit f6038302dd615f4bf9bed9c4af3d04426f7e5c5e
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Nov 6 20:06:39 2023 +0800
[SPARK-45793][CORE] Improve the built-in compression codecs
### What changes were proposed in this pull request?
Currently, Spark supported many built-in compression codecs used for I/O
and storage.
There are a lot of magic strings copy from built-in compression codecs.
This issue lead to developers need to manually maintain its consistency. It is
easy to make mistakes and reduce development efficiency.
### Why are the changes needed?
Improve some code for storage compression codecs
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43659 from beliefer/improve_storage_code.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
---
.../deploy/history/HistoryServerMemoryManager.scala | 3 ++-
.../org/apache/spark/internal/config/package.scala | 7 ++++---
.../org/apache/spark/io/CompressionCodec.scala | 21 ++++++++++++---------
.../deploy/history/EventLogFileWritersSuite.scala | 6 +++---
.../deploy/history/FsHistoryProviderSuite.scala | 5 +++--
.../org/apache/spark/io/CompressionCodecSuite.scala | 8 ++++----
.../apache/spark/storage/FallbackStorageSuite.scala | 3 ++-
.../collection/ExternalAppendOnlyMapSuite.scala | 3 +--
.../k8s/integrationtest/BasicTestsSuite.scala | 3 ++-
.../org/apache/spark/sql/internal/SQLConf.scala | 3 ++-
.../spark/sql/execution/streaming/OffsetSeq.scala | 3 ++-
.../streaming/state/RocksDBFileManager.scala | 2 +-
12 files changed, 38 insertions(+), 29 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
index 00e58cbdc57..b95f1ed24f3 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.History._
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.Utils
/**
@@ -75,7 +76,7 @@ private class HistoryServerMemoryManager(
private def approximateMemoryUsage(eventLogSize: Long, codec:
Option[String]): Long = {
codec match {
- case Some("zstd") =>
+ case Some(CompressionCodec.ZSTD) =>
eventLogSize * 10
case Some(_) =>
eventLogSize * 4
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 93a42eec832..bbadf91fc41 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -21,6 +21,7 @@ import java.util.Locale
import java.util.concurrent.TimeUnit
import org.apache.spark.SparkContext
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.metrics.GarbageCollectionMetrics
import org.apache.spark.network.shuffle.Constants
@@ -1530,7 +1531,7 @@ package object config {
"use fully qualified class names to specify the codec.")
.version("3.0.0")
.stringConf
- .createWithDefault("zstd")
+ .createWithDefault(CompressionCodec.ZSTD)
private[spark] val SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD =
ConfigBuilder("spark.shuffle.spill.initialMemoryThreshold")
@@ -1871,7 +1872,7 @@ package object config {
"the codec")
.version("0.8.0")
.stringConf
- .createWithDefaultString("lz4")
+ .createWithDefaultString(CompressionCodec.LZ4)
private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE =
ConfigBuilder("spark.io.compression.zstd.bufferSize")
@@ -1914,7 +1915,7 @@ package object config {
"the codec.")
.version("3.0.0")
.stringConf
- .createWithDefault("zstd")
+ .createWithDefault(CompressionCodec.ZSTD)
private[spark] val BUFFER_SIZE =
ConfigBuilder("spark.buffer.size")
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0bb392deb39..a6a5b1f67c6 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -58,18 +58,21 @@ trait CompressionCodec {
private[spark] object CompressionCodec {
- private val configKey = IO_COMPRESSION_CODEC.key
-
private[spark] def supportsConcatenationOfSerializedStreams(codec:
CompressionCodec): Boolean = {
(codec.isInstanceOf[SnappyCompressionCodec] ||
codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec] ||
codec.isInstanceOf[ZStdCompressionCodec])
}
- private val shortCompressionCodecNames = Map(
- "lz4" -> classOf[LZ4CompressionCodec].getName,
- "lzf" -> classOf[LZFCompressionCodec].getName,
- "snappy" -> classOf[SnappyCompressionCodec].getName,
- "zstd" -> classOf[ZStdCompressionCodec].getName)
+ val LZ4 = "lz4"
+ val LZF = "lzf"
+ val SNAPPY = "snappy"
+ val ZSTD = "zstd"
+
+ private[spark] val shortCompressionCodecNames = Map(
+ LZ4 -> classOf[LZ4CompressionCodec].getName,
+ LZF -> classOf[LZFCompressionCodec].getName,
+ SNAPPY -> classOf[SnappyCompressionCodec].getName,
+ ZSTD -> classOf[ZStdCompressionCodec].getName)
def getCodecName(conf: SparkConf): String = {
conf.get(IO_COMPRESSION_CODEC)
@@ -93,7 +96,7 @@ private[spark] object CompressionCodec {
errorClass = "CODEC_NOT_AVAILABLE",
messageParameters = Map(
"codecName" -> codecName,
- "configKey" -> toConf(configKey),
+ "configKey" -> toConf(IO_COMPRESSION_CODEC.key),
"configVal" -> toConfVal(FALLBACK_COMPRESSION_CODEC))))
}
@@ -113,7 +116,7 @@ private[spark] object CompressionCodec {
}
}
- val FALLBACK_COMPRESSION_CODEC = "snappy"
+ val FALLBACK_COMPRESSION_CODEC = SNAPPY
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
index b575cbc080c..349985207e4 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
@@ -176,7 +176,7 @@ class SingleEventLogFileWriterSuite extends
EventLogFileWritersSuite {
baseDirUri, "app1", None, None))
// with compression
assert(s"${baseDirUri.toString}/app1.lzf" ===
- SingleEventLogFileWriter.getLogPath(baseDirUri, "app1", None,
Some("lzf")))
+ SingleEventLogFileWriter.getLogPath(baseDirUri, "app1", None,
Some(CompressionCodec.LZF)))
// illegal characters in app ID
assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1" ===
SingleEventLogFileWriter.getLogPath(baseDirUri,
@@ -184,7 +184,7 @@ class SingleEventLogFileWriterSuite extends
EventLogFileWritersSuite {
// illegal characters in app ID with compression
assert(s"${baseDirUri.toString}/a-fine-mind_dollar_bills__1.lz4" ===
SingleEventLogFileWriter.getLogPath(baseDirUri,
- "a fine:mind$dollar{bills}.1", None, Some("lz4")))
+ "a fine:mind$dollar{bills}.1", None, Some(CompressionCodec.LZ4)))
}
override protected def createWriter(
@@ -239,7 +239,7 @@ class RollingEventLogFilesWriterSuite extends
EventLogFileWritersSuite {
// with compression
assert(s"$logDir/${EVENT_LOG_FILE_NAME_PREFIX}1_${appId}.lzf" ===
RollingEventLogFilesWriter.getEventLogFilePath(logDir, appId,
appAttemptId,
- 1, Some("lzf")).toString)
+ 1, Some(CompressionCodec.LZF)).toString)
// illegal characters in app ID
assert(s"${baseDirUri.toString}/${EVENT_LOG_DIR_NAME_PREFIX}a-fine-mind_dollar_bills__1"
===
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index ae8481a852b..d16e904bdcf 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -126,8 +126,9 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite
with Matchers with P
// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", None,
inProgress = false,
- Some("lzf"))
- writeFile(newAppCompressedComplete,
Some(CompressionCodec.createCodec(conf, "lzf")),
+ Some(CompressionCodec.LZF))
+ writeFile(
+ newAppCompressedComplete, Some(CompressionCodec.createCodec(conf,
CompressionCodec.LZF)),
SparkListenerApplicationStart(newAppCompressedComplete.getName(),
Some("new-complete-lzf"),
1L, "test", None),
SparkListenerApplicationEnd(4L))
diff --git
a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 244c007f539..9c9fac0d483 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -58,7 +58,7 @@ class CompressionCodecSuite extends SparkFunSuite {
}
test("lz4 compression codec short form") {
- val codec = CompressionCodec.createCodec(conf, "lz4")
+ val codec = CompressionCodec.createCodec(conf, CompressionCodec.LZ4)
assert(codec.getClass === classOf[LZ4CompressionCodec])
testCodec(codec)
}
@@ -76,7 +76,7 @@ class CompressionCodecSuite extends SparkFunSuite {
}
test("lzf compression codec short form") {
- val codec = CompressionCodec.createCodec(conf, "lzf")
+ val codec = CompressionCodec.createCodec(conf, CompressionCodec.LZF)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
@@ -94,7 +94,7 @@ class CompressionCodecSuite extends SparkFunSuite {
}
test("snappy compression codec short form") {
- val codec = CompressionCodec.createCodec(conf, "snappy")
+ val codec = CompressionCodec.createCodec(conf, CompressionCodec.SNAPPY)
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
@@ -115,7 +115,7 @@ class CompressionCodecSuite extends SparkFunSuite {
}
test("zstd compression codec short form") {
- val codec = CompressionCodec.createCodec(conf, "zstd")
+ val codec = CompressionCodec.createCodec(conf, CompressionCodec.ZSTD)
assert(codec.getClass === classOf[ZStdCompressionCodec])
testCodec(codec)
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 83c9707bfc2..6c51bd4ff2e 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.concurrent.Eventually.{eventually,
interval, timeout}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite, TestUtils}
import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.buffer.ManagedBuffer
@@ -292,7 +293,7 @@ class FallbackStorageSuite extends SparkFunSuite with
LocalSparkContext {
}
}
- Seq("lz4", "lzf", "snappy", "zstd").foreach { codec =>
+ CompressionCodec.shortCompressionCodecNames.keys.foreach { codec =>
test(s"$codec - Newly added executors should access old data from remote
storage") {
sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC,
codec))
withSpark(sc) { sc =>
diff --git
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 59f6e3f2d35..2a760c39b46 100644
---
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -37,7 +37,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
with Matchers {
import TestUtils.{assertNotSpilled, assertSpilled}
- private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private def createCombiner[T](i: T) = ArrayBuffer[T](i)
private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] =
buffer += i
private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]):
ArrayBuffer[T] =
@@ -224,7 +223,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
// Keep track of which compression codec we're using to report in test
failure messages
var lastCompressionCodec: Option[String] = None
try {
- allCompressionCodecs.foreach { c =>
+ CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c =>
lastCompressionCodec = Some(c)
testSimpleSpilling(Some(c), encrypt)
}
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 992fe7c97ff..d6911aadfa2 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.time.{Seconds, Span}
import org.apache.spark.{SparkFunSuite, TestUtils}
import
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.SPARK_PI_MAIN_CLASS
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.launcher.SparkLauncher
private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
@@ -93,7 +94,7 @@ private[spark] trait BasicTestsSuite { k8sSuite:
KubernetesSuite =>
test("Run SparkPi with an argument.", k8sTestTag) {
// This additional configuration with snappy is for SPARK-26995
sparkAppConf
- .set("spark.io.compression.codec", "snappy")
+ .set("spark.io.compression.codec", CompressionCodec.SNAPPY)
runSparkPiAndVerifyCompletion(appArgs = Array("5"))
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1af0b41d0fa..ecc3e6e101f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{ErrorMessageFormat, SparkConf, SparkContext,
TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
@@ -1997,7 +1998,7 @@ object SQLConf {
"use fully qualified class names to specify the codec. Default codec
is lz4.")
.version("3.1.0")
.stringConf
- .createWithDefault("lz4")
+ .createWithDefault(CompressionCodec.LZ4)
val CHECKPOINT_RENAMEDFILE_CHECK_ENABLED =
buildConf("spark.sql.streaming.checkpoint.renamedFileCheck.enabled")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 913805d1a07..dea75e3ec47 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -21,6 +21,7 @@ import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2,
SparkDataStream}
import
org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper,
StreamingAggregationStateManager, SymmetricHashJoinStateManager}
@@ -118,7 +119,7 @@ object OffsetSeqMetadata extends Logging {
StreamingAggregationStateManager.legacyVersion.toString,
STREAMING_JOIN_STATE_FORMAT_VERSION.key ->
SymmetricHashJoinStateManager.legacyVersion.toString,
- STATE_STORE_COMPRESSION_CODEC.key -> "lz4",
+ STATE_STORE_COMPRESSION_CODEC.key -> CompressionCodec.LZ4,
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false"
)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 6a62a6c52f5..046cf69f1fc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -126,7 +126,7 @@ class RocksDBFileManager(
dfsRootDir: String,
localTempDir: File,
hadoopConf: Configuration,
- codecName: String = "zstd",
+ codecName: String = CompressionCodec.ZSTD,
loggingId: String = "")
extends Logging {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]