This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c4af64ec0dbb [SPARK-46766][SQL][AVRO] ZSTD Buffer Pool Support For
AVRO datasource
c4af64ec0dbb is described below
commit c4af64ec0dbb3cb3bda6debd38009bbe9844638c
Author: Kent Yao <[email protected]>
AuthorDate: Fri Jan 19 17:32:24 2024 +0800
[SPARK-46766][SQL][AVRO] ZSTD Buffer Pool Support For AVRO datasource
### What changes were proposed in this pull request?
This PR adds ZSTD Buffer Pool Support For AVRO datasource writing with zstd
compression codec
### Why are the changes needed?
Enable a tuning technique for users
### Does this PR introduce _any_ user-facing change?
yes, add a new configuration
### How was this patch tested?
passing existing ci shall be sufficient
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44792 from yaooqinn/SPARK-46766.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../scala/org/apache/spark/sql/avro/AvroUtils.scala | 19 +++++++++++++------
docs/sql-data-sources-avro.md | 8 ++++++++
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 6 ++++++
3 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index d9c88e14d039..f0b70f09aa55 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import org.apache.avro.Schema
import org.apache.avro.file.{DataFileReader, FileReader}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
-import org.apache.avro.mapred.FsInput
+import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
@@ -104,18 +104,25 @@ private[sql] object AvroUtils extends Logging {
parsedOptions.compression.toLowerCase(Locale.ROOT) match {
case codecName if AvroCompressionCodec.values().exists(c =>
c.lowerCaseName() == codecName) =>
+ val jobConf = job.getConfiguration
AvroCompressionCodec.fromString(codecName) match {
case UNCOMPRESSED =>
- job.getConfiguration.setBoolean("mapred.output.compress", false)
+ jobConf.setBoolean("mapred.output.compress", false)
case compressed =>
- job.getConfiguration.setBoolean("mapred.output.compress", true)
- job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC,
compressed.getCodecName)
+ jobConf.setBoolean("mapred.output.compress", true)
+ jobConf.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
if (compressed.getSupportCompressionLevel) {
val level =
sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
compressed.getDefaultCompressionLevel.toString)
logInfo(s"Compressing Avro output using the $codecName codec at
level $level")
- val s = if (compressed == ZSTANDARD) "zstd" else codecName
- job.getConfiguration.setInt(s"avro.mapred.$s.level", level.toInt)
+ val s = if (compressed == ZSTANDARD) {
+ val bufferPoolEnabled =
sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
+ jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY,
bufferPoolEnabled)
+ "zstd"
+ } else {
+ codecName
+ }
+ jobConf.setInt(s"avro.mapred.$s.level", level.toInt)
} else {
logInfo(s"Compressing Avro output using the $codecName codec")
}
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index e4b4963f7b5f..2172cb68fb98 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -380,6 +380,14 @@ Configuration of Avro can be done via `spark.conf.set` or
by running `SET key=va
</td>
<td>4.0.0</td>
</tr>
+ <tr>
+ <td>spark.sql.avro.zstandard.bufferPool.enabled</td>
+ <td>false</td>
+ <td>
+ If true, enable buffer pool of ZSTD JNI library when writing of AVRO
files.
+ </td>
+ <td>4.0.0</td>
+ </tr>
<tr>
<td>spark.sql.avro.datetimeRebaseModeInRead</td>
<td><code>EXCEPTION</code></td>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 61c7b2457b11..3dd7cf884cbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3643,6 +3643,12 @@ object SQLConf {
.intConf
.createOptional
+ val AVRO_ZSTANDARD_BUFFER_POOL_ENABLED =
buildConf("spark.sql.avro.zstandard.bufferPool.enabled")
+ .doc("If true, enable buffer pool of ZSTD JNI library when writing of AVRO
files")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
.internal()
.doc(s"If it is set to false, or ${ANSI_ENABLED.key} is true, then size of
null returns " +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]