This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f003453a6117 [SPARK-52882][SQL] Implement the current_time function in
Scala
f003453a6117 is described below
commit f003453a6117bd49c4d94a8bd909f862f77ed084
Author: Uros Bojanic <[email protected]>
AuthorDate: Thu Jul 24 08:19:57 2025 +0200
[SPARK-52882][SQL] Implement the current_time function in Scala
### What changes were proposed in this pull request?
Implement the `current_time` function in Scala API.
### Why are the changes needed?
Expand API support for the `CurrentTime` expression.
### Does this PR introduce _any_ user-facing change?
Yes, the new function is now available in Scala API.
### How was this patch tested?
Added appropriate Scala function tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51574 from uros-db/scala-current_time.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
python/pyspark/sql/tests/test_functions.py | 3 +-
.../scala/org/apache/spark/sql/functions.scala | 30 ++++++++
.../apache/spark/sql/TimeFunctionsSuiteBase.scala | 79 ++++++++++++++++++++++
3 files changed, 111 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index 3a4dfd8f6e4d..1bdb9dd76455 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -83,9 +83,10 @@ class FunctionsTestsMixin:
# Functions that we expect to be missing in python until they are
added to pyspark
expected_missing_in_py = set(
# TODO(SPARK-52888): Implement the make_time function in Python
+ # TODO(SPARK-52889): Implement the current_time function in Python
# TODO(SPARK-52890): Implement the to_time function in Python
# TODO(SPARK-52891): Implement the try_to_time function in Python
- ["make_time", "to_time", "try_to_time"]
+ ["current_time", "make_time", "to_time", "try_to_time"]
)
self.assertEqual(
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index e16554eaddbf..4d8f658ca32d 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1392,6 +1392,36 @@ object functions {
*/
def count_if(e: Column): Column = Column.fn("count_if", e)
+ /**
+ * Returns the current time at the start of query evaluation. Note that the
result will contain
+ * 6 fractional digits of seconds.
+ *
+ * @return
+ * A time.
+ *
+ * @group datetime_funcs
+ * @since 4.1.0
+ */
+ def current_time(): Column = {
+ Column.fn("current_time")
+ }
+
+ /**
+ * Returns the current time at the start of query evaluation.
+ *
+ * @param precision
+ * An integer literal in the range [0..6], indicating how many fractional
digits of seconds to
+ * include in the result.
+ * @return
+ * A time.
+ *
+ * @group datetime_funcs
+ * @since 4.1.0
+ */
+ def current_time(precision: Int): Column = {
+ Column.fn("current_time", lit(precision))
+ }
+
/**
* Aggregate function: computes a histogram on numeric 'expr' using nb bins.
The return value is
* an array of (x,y) pairs representing the centers of the histogram's bins.
As the value of
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala
index 7d7c4597ddfe..8506ab4527c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.time.LocalTime
+import java.time.temporal.ChronoUnit
import org.apache.spark.{SparkConf, SparkDateTimeException}
import org.apache.spark.sql.functions._
@@ -28,6 +29,84 @@ import org.apache.spark.sql.types._
abstract class TimeFunctionsSuiteBase extends QueryTest with
SharedSparkSession {
import testImplicits._
+ // Helper method to assert that two DataFrames with TimeType values are
approximately equal.
+ // This method assumes that the two dataframes (df1 and df2) have the same
schemas and sizes.
+ // Also, only 1 column is expected in each DataFrame, and that column must
be of TimeType.
+ private def assertTwoTimesAreApproximatelyEqual(df1: DataFrame, df2:
DataFrame) = {
+ // Check that both DataFrames have the same schema.
+ val schema1 = df1.schema
+ val schema2 = df2.schema
+ require(schema1 == schema2, "Both DataFrames must have the same schema,
but got " +
+ s"$schema1 and $schema2 for the two given DataFrames df1 and df2,
respectively.")
+ // Check that both DataFrames have the same number of rows.
+ val numRows1 = df1.count()
+ val numRows2 = df2.count()
+ require(numRows1 == numRows2, "Both DataFrames must have the same number
of rows, but got" +
+ s"$numRows1 and $numRows2 rows in the two given DataFrames df1 and df2,
respectively.")
+ // Check that both DataFrames have only 1 column.
+ val fields1 = schema1.fields.length
+ require(fields1 == 1, s"The first DataFrame must have only one column, but
got $fields1.")
+ val fields2 = schema2.fields.length
+ require(fields2 == 1, s"The second DataFrame must have only one column,
but got $fields2.")
+ // Check that the column type is TimeType.
+ val columnType1 = schema1.fields.head.dataType
+ require(columnType1.isInstanceOf[TimeType], "The column type of the first
DataFrame " +
+ s"must be TimeType, but got $columnType1.")
+ val columnType2 = schema2.fields.head.dataType
+ require(columnType2.isInstanceOf[TimeType], "The column type of the second
DataFrame " +
+ s"must be TimeType, but got $columnType2.")
+
+ // Extract the LocalTime values from the input DataFrames.
+ val time1: LocalTime = df1.collect().head.get(0).asInstanceOf[LocalTime]
+ val time2: LocalTime = df2.collect().head.get(0).asInstanceOf[LocalTime]
+
+ // Check that the time difference is within a set number of minutes.
+ val maxTimeDiffInMinutes = 15 // This should be enough time to ensure
correctness.
+ val timeDiffInMillis = Math.abs(ChronoUnit.MILLIS.between(time1, time2))
+ assert(
+ timeDiffInMillis <= maxTimeDiffInMinutes * 60 * 1000,
+ s"Time difference exceeds $maxTimeDiffInMinutes minutes:
$timeDiffInMillis ms."
+ )
+ }
+
+ test("SPARK-52882: current_time function with default precision") {
+ // Create a dummy DataFrame with a single row to test the current_time()
function.
+ val df = spark.range(1)
+
+ // Test the function using both `selectExpr` and `select`.
+ val result1 = df.selectExpr(
+ "current_time()"
+ )
+ val result2 = df.select(
+ current_time()
+ )
+
+ // Check that both methods produce approximately the same result.
+ assertTwoTimesAreApproximatelyEqual(result1, result2)
+ }
+
+ test("SPARK-52882: current_time function with specified precision") {
+ (0 to 6).foreach { precision: Int =>
+ // Create a dummy DataFrame with a single row to test the
current_time(precision) function.
+ val df = spark.range(1)
+
+ // Test the function using both `selectExpr` and `select`.
+ val result1 = df.selectExpr(
+ s"current_time($precision)"
+ )
+ val result2 = df.select(
+ current_time(precision)
+ )
+
+ // Confirm that the precision is correctly set.
+ assert(result1.schema.fields.head.dataType == TimeType(precision))
+ assert(result2.schema.fields.head.dataType == TimeType(precision))
+
+ // Check that both methods produce approximately the same result.
+ assertTwoTimesAreApproximatelyEqual(result1, result2)
+ }
+ }
+
test("SPARK-52881: make_time function") {
// Input data for the function.
val schema = StructType(Seq(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]