This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c4af64ec0dbb [SPARK-46766][SQL][AVRO] ZSTD Buffer Pool Support For 
AVRO datasource
c4af64ec0dbb is described below

commit c4af64ec0dbb3cb3bda6debd38009bbe9844638c
Author: Kent Yao <[email protected]>
AuthorDate: Fri Jan 19 17:32:24 2024 +0800

    [SPARK-46766][SQL][AVRO] ZSTD Buffer Pool Support For AVRO datasource
    
    ### What changes were proposed in this pull request?
    
    This PR adds ZSTD Buffer Pool Support For AVRO datasource writing with zstd 
compression codec
    
    ### Why are the changes needed?
    
    Enable a tuning technique for users
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, add a new configuration
    
    ### How was this patch tested?
    
    passing existing ci shall be sufficient
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #44792 from yaooqinn/SPARK-46766.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../scala/org/apache/spark/sql/avro/AvroUtils.scala   | 19 +++++++++++++------
 docs/sql-data-sources-avro.md                         |  8 ++++++++
 .../scala/org/apache/spark/sql/internal/SQLConf.scala |  6 ++++++
 3 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index d9c88e14d039..f0b70f09aa55 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.avro.Schema
 import org.apache.avro.file.{DataFileReader, FileReader}
 import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
-import org.apache.avro.mapred.FsInput
+import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
 import org.apache.avro.mapreduce.AvroJob
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
@@ -104,18 +104,25 @@ private[sql] object AvroUtils extends Logging {
 
     parsedOptions.compression.toLowerCase(Locale.ROOT) match {
       case codecName if AvroCompressionCodec.values().exists(c => 
c.lowerCaseName() == codecName) =>
+        val jobConf = job.getConfiguration
         AvroCompressionCodec.fromString(codecName) match {
           case UNCOMPRESSED =>
-            job.getConfiguration.setBoolean("mapred.output.compress", false)
+            jobConf.setBoolean("mapred.output.compress", false)
           case compressed =>
-            job.getConfiguration.setBoolean("mapred.output.compress", true)
-            job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
compressed.getCodecName)
+            jobConf.setBoolean("mapred.output.compress", true)
+            jobConf.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
             if (compressed.getSupportCompressionLevel) {
               val level = 
sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
                 compressed.getDefaultCompressionLevel.toString)
               logInfo(s"Compressing Avro output using the $codecName codec at 
level $level")
-              val s = if (compressed == ZSTANDARD) "zstd" else codecName
-              job.getConfiguration.setInt(s"avro.mapred.$s.level", level.toInt)
+              val s = if (compressed == ZSTANDARD) {
+                val bufferPoolEnabled = 
sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
+                jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, 
bufferPoolEnabled)
+                "zstd"
+              } else {
+                codecName
+              }
+              jobConf.setInt(s"avro.mapred.$s.level", level.toInt)
             } else {
               logInfo(s"Compressing Avro output using the $codecName codec")
             }
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index e4b4963f7b5f..2172cb68fb98 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -380,6 +380,14 @@ Configuration of Avro can be done via `spark.conf.set` or 
by running `SET key=va
     </td>
     <td>4.0.0</td>
   </tr>
+  <tr>
+    <td>spark.sql.avro.zstandard.bufferPool.enabled</td>
+    <td>false</td>
+    <td>
+      If true, enable buffer pool of ZSTD JNI library when writing of AVRO 
files.
+    </td>
+    <td>4.0.0</td>
+  </tr>
   <tr>
     <td>spark.sql.avro.datetimeRebaseModeInRead</td>
     <td><code>EXCEPTION</code></td>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 61c7b2457b11..3dd7cf884cbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3643,6 +3643,12 @@ object SQLConf {
     .intConf
     .createOptional
 
+  val AVRO_ZSTANDARD_BUFFER_POOL_ENABLED = 
buildConf("spark.sql.avro.zstandard.bufferPool.enabled")
+    .doc("If true, enable buffer pool of ZSTD JNI library when writing of AVRO 
files")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
   val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
     .internal()
     .doc(s"If it is set to false, or ${ANSI_ENABLED.key} is true, then size of 
null returns " +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to