This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f9cb80a5947b [SPARK-50616][SQL] Add File Extension Option to CSV 
DataSource Writer
f9cb80a5947b is described below

commit f9cb80a5947bbbf909b953fdbed13daa6f6c924f
Author: Jim Baugh <[email protected]>
AuthorDate: Fri Jan 10 08:45:11 2025 -0800

    [SPARK-50616][SQL] Add File Extension Option to CSV DataSource Writer
    
    ### What changes were proposed in this pull request?
    The existing CSV DataSource allows one to set the delimiter/separator but 
does not allow the changing of the file extension. This means that a file can 
have values separated by tabs but me marked as a ".csv" file. This change 
allows one to change the file extension to match the delimiter/separator (e.g. 
".tsv" for a tab separated value file).
    
    ### Why are the changes needed?
    This PR adds an additional option to set the fileExtension. The end result 
is that when a separator is set that is not a comma that the output file has a 
file extension that matches the separator (e.g. file.tsv, file.psv, etc...).
    
    Notes on Previous Pull Request https://github.com/apache/spark/pull/17973
    A pull request adding this option was discussed 7 years ago. One reason it 
wasn't added was:
    "I would like to suggest to leave this out if there is no better reason for 
now. Downside of this is, it looks this allows arbitrary name and it does not 
gurantee the extention is, say, tsv when the delmiter is a tab. It is purely up 
to the user."
    
    I don't believe this is a good reason to not let the user set the 
extension. If we let them set the delimiter/separator to an arbitrary 
string/char then why not let the user also set the file extension to specify 
the separator that the file uses (e.g. tsv, psv, etc...). This addition keeps 
the "csv" file extension as the default and has the benefit of allowing other 
separators to match the file extension.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. This PR adds one row to the options table for the CSV DataSource 
documentation to include the "fileExtension" option.
    
    ### How was this patch tested?
    One unit test was added to validate a file is written with the new 
extension.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49233 from jabbaugh/jbaugh-add-csv-file-ext.
    
    Authored-by: Jim Baugh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/main/resources/error/error-conditions.json   |  5 +++++
 docs/sql-data-sources-csv.md                         |  6 ++++++
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala   | 11 +++++++++++
 .../spark/sql/errors/QueryExecutionErrors.scala      | 10 ++++++++++
 .../execution/datasources/csv/CSVFileFormat.scala    |  2 +-
 .../sql/execution/datasources/v2/csv/CSVWrite.scala  |  2 +-
 .../sql/execution/datasources/csv/CSVSuite.scala     | 20 +++++++++++++++++++-
 7 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index e9b32b5f9cbe..8b266e9d6ac1 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2959,6 +2959,11 @@
           "Unsupported dtype: <invalidValue>. Valid values: float64, float32."
         ]
       },
+      "EXTENSION" : {
+        "message" : [
+          "Invalid extension: <invalidValue>. Extension is limited to exactly 
3 letters (e.g. csv, tsv, etc...)"
+        ]
+      },
       "INTEGER" : {
         "message" : [
           "expects an integer literal, but got <invalidValue>."
diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md
index 97a7065e0598..8008bc562082 100644
--- a/docs/sql-data-sources-csv.md
+++ b/docs/sql-data-sources-csv.md
@@ -60,6 +60,12 @@ Data source options of CSV can be set via:
     <td>Sets a separator for each field and value. This separator can be one 
or more characters.</td>
     <td>read/write</td>
   </tr>
+  <tr>
+    <td><code>extension</code></td>
+    <td>csv</td>
+    <td>Sets the file extension for the output files. Limited to letters. 
Length must equal 3.</td>
+    <td>write</td>
+  </tr>
   <tr>
     <td><code>encoding</code><br><code>charset</code></td>
     <td>UTF-8</td>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 5a23d6f7a3cc..6c68bc1aa589 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -103,6 +103,16 @@ class CSVOptions(
 
   val delimiter = CSVExprUtils.toDelimiterStr(
     parameters.getOrElse(SEP, parameters.getOrElse(DELIMITER, ",")))
+
+  val extension = {
+    val ext = parameters.getOrElse(EXTENSION, "csv")
+    if (ext.size != 3 && !ext.forall(_.isLetter)) {
+      throw QueryExecutionErrors.invalidFileExtensionError(EXTENSION, ext)
+    }
+
+    ext
+  }
+
   val parseMode: ParseMode =
     parameters.get(MODE).map(ParseMode.fromString).getOrElse(PermissiveMode)
   val charset = parameters.get(ENCODING).orElse(parameters.get(CHARSET))
@@ -385,6 +395,7 @@ object CSVOptions extends DataSourceOptions {
   val NEGATIVE_INF = newOption("negativeInf")
   val TIME_ZONE = newOption("timeZone")
   val UNESCAPED_QUOTE_HANDLING = newOption("unescapedQuoteHandling")
+  val EXTENSION = newOption("extension")
   // Options with alternative
   val ENCODING = "encoding"
   val CHARSET = "charset"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index e500f5e3cbd7..1ae2e5445c0c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2786,6 +2786,16 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
     Map.empty
   )
 
+  def invalidFileExtensionError(functionName: String, extension: String): 
RuntimeException = {
+    new SparkIllegalArgumentException(
+      errorClass = "INVALID_PARAMETER_VALUE.EXTENSION",
+      messageParameters = Map(
+        "functionName" -> toSQLId(functionName),
+        "parameter" -> toSQLId("extension"),
+        "fileExtension" -> toSQLId(extension),
+        "acceptable" -> "Extension is limited to exactly 3 letters (e.g. csv, 
tsv, etc...)"))
+  }
+
   def invalidCharsetError(functionName: String, charset: String): 
RuntimeException = {
     new SparkIllegalArgumentException(
       errorClass = "INVALID_PARAMETER_VALUE.CHARSET",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 8ef85ee91aa8..b2b99e2d0f4e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -86,7 +86,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       }
 
       override def getFileExtension(context: TaskAttemptContext): String = {
-        ".csv" + CodecStreams.getCompressionExtension(context)
+        "." + csvOptions.extension + 
CodecStreams.getCompressionExtension(context)
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala
index f38a1d385a39..7011fea77d88 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala
@@ -58,7 +58,7 @@ case class CSVWrite(
       }
 
       override def getFileExtension(context: TaskAttemptContext): String = {
-        ".csv" + CodecStreams.getCompressionExtension(context)
+        "." + csvOptions.extension + 
CodecStreams.getCompressionExtension(context)
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 7cacd8ea2dc5..850e887ac8e7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -3078,6 +3078,23 @@ abstract class CSVSuite
     }
   }
 
+  test("SPARK-50616: We can write with a tsv file extension") {
+    withTempPath { path =>
+      val input = Seq(
+        "1423-11-12T23:41:00",
+        "1765-03-28",
+        "2016-01-28T20:00:00"
+      ).toDF().repartition(1)
+      input.write.option("extension", "tsv").csv(path.getAbsolutePath)
+
+      val files = Files.list(path.toPath)
+        .iterator().asScala.map(x => x.getFileName.toString)
+        .toList.filter(x => x.takeRight(3).equals("tsv"))
+
+      assert(files.size == 1)
+    }
+  }
+
   test("SPARK-39904: Parse incorrect timestamp values") {
     withTempPath { path =>
       Seq(
@@ -3308,7 +3325,7 @@ abstract class CSVSuite
   }
 
   test("SPARK-40667: validate CSV Options") {
-    assert(CSVOptions.getAllOptions.size == 39)
+    assert(CSVOptions.getAllOptions.size == 40)
     // Please add validation on any new CSV options here
     assert(CSVOptions.isValidOption("header"))
     assert(CSVOptions.isValidOption("inferSchema"))
@@ -3347,6 +3364,7 @@ abstract class CSVSuite
     assert(CSVOptions.isValidOption("compression"))
     assert(CSVOptions.isValidOption("codec"))
     assert(CSVOptions.isValidOption("sep"))
+    assert(CSVOptions.isValidOption("extension"))
     assert(CSVOptions.isValidOption("delimiter"))
     assert(CSVOptions.isValidOption("columnPruning"))
     // Please add validation on any new parquet options with alternative here


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to