This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 50afb735dbc9 [SPARK-51649][SQL] Dynamic writes/reads of TIME partitions
50afb735dbc9 is described below
commit 50afb735dbc9ca7a04be15f8b36469771b6b1efd
Author: Max Gekk <[email protected]>
AuthorDate: Fri Mar 28 22:29:32 2025 +0300
[SPARK-51649][SQL] Dynamic writes/reads of TIME partitions
### What changes were proposed in this pull request?
In the PR, I propose to support partition values of the TIME type. In
particular, if the desired type is `TimeType(n)`, cast partition values by the
`Cast` expression to the type.
### Why are the changes needed?
To fix the failure on reading TIME partitions:
```scala
scala> spark.read.schema("t TIME, id
INT").parquet("/Users/maxim.gekk/tmp/time_parquet").show(false)
org.apache.spark.SparkRuntimeException: [INVALID_PARTITION_VALUE] Failed to
cast value '12%3A00%3A00' to data type "TIME(6)" for partition column `t`.
Ensure the value matches the expected data type for this partition column.
SQLSTATE: 42846
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running the new test:
```
$ build/sbt "test:testOnly *PartitionedWriteSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #50442 from MaxGekk/read-time-partition.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../execution/datasources/PartitioningUtils.scala | 1 +
.../spark/sql/sources/PartitionedWriteSuite.scala | 27 ++++++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 402b70065d8e..6f39636b5f5e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -534,6 +534,7 @@ object PartitioningUtils extends SQLConfHelper {
case _: DecimalType => Literal(new JBigDecimal(value)).value
case DateType =>
Cast(Literal(value), DateType, Some(zoneId.getId)).eval()
+ case tt: TimeType => Cast(Literal(unescapePathName(value)), tt).eval()
// Timestamp types
case dt if AnyTimestampType.acceptsType(dt) =>
Try {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index f3849fe34ec2..b18d8f816e30 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -223,6 +223,33 @@ class PartitionedWriteSuite extends QueryTest with
SharedSparkSession {
}
}
}
+
+ test("Dynamic writes/reads of TIME partitions") {
+ Seq(
+ "00:00:00" -> TimeType(0),
+ "00:00:00.00109" -> TimeType(5),
+ "00:01:02.999999" -> TimeType(6),
+ "12:00:00" -> TimeType(1),
+ "23:59:59.000001" -> TimeType(),
+ "23:59:59.999999" -> TimeType(6)
+ ).foreach { case (timeStr, timeType) =>
+ withTempPath { f =>
+ val df = sql(s"select 0 AS id, cast('$timeStr' as ${timeType.sql}) AS
tt")
+ assert(df.schema("tt").dataType === timeType)
+ df.write
+ .partitionBy("tt")
+ .format("parquet")
+ .save(f.getAbsolutePath)
+ val files =
TestUtils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet"))
+ assert(files.length == 1)
+ checkPartitionValues(files.head, timeStr)
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("tt", timeType)
+
checkAnswer(spark.read.schema(schema).format("parquet").load(f.getAbsolutePath),
df)
+ }
+ }
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]