This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 19e1b35  [SPARK-51656] Support `time` for `SparkSession`
19e1b35 is described below

commit 19e1b3587451a658583f0969d4825a3360a193bf
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Mar 28 10:42:17 2025 -0700

    [SPARK-51656] Support `time` for `SparkSession`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `time` for `SparkSession`.
    
    ### Why are the changes needed?
    
    For feature parity with the Scala client.
    
    ```swift
    try await spark.time(spark.range(1000).count)
    try await spark.time(spark.range(1).collect)
    try await spark.time(spark.range(10).show)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new addition to the unreleased version.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #33 from dongjoon-hyun/SPARK-51656.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/SparkSession.swift         | 15 +++++++++++++++
 Tests/SparkConnectTests/SparkSessionTests.swift | 11 +++++++++++
 2 files changed, 26 insertions(+)

diff --git a/Sources/SparkConnect/SparkSession.swift 
b/Sources/SparkConnect/SparkSession.swift
index 0d7546b..8a61d86 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -17,6 +17,7 @@
 // under the License.
 //
 
+import Dispatch
 import Foundation
 import GRPCCore
 import GRPCNIOTransportHTTP2
@@ -116,12 +117,26 @@ public actor SparkSession {
     return try await DataFrame(spark: self, sqlText: sqlText)
   }
 
+  /// Returns a ``DataFrameReader`` that can be used to read non-streaming 
data in as a
+  /// `DataFrame`
   var read: DataFrameReader {
     get {
       return DataFrameReader(sparkSession: self)
     }
   }
 
+  /// Executes some code block and prints to stdout the time taken to execute 
the block.
+  /// - Parameter f: A function to execute.
+  /// - Returns: The result of the executed code.
+  public func time<T: Sendable>(_ f: () async throws -> T) async throws -> T {
+    let start = DispatchTime.now()
+    let ret = try await f()
+    let end = DispatchTime.now()
+    let elapsed = (end.uptimeNanoseconds - start.uptimeNanoseconds) / 1_000_000
+    print("Time taken: \(elapsed) ms")
+    return ret
+  }
+
   /// This is defined as the return type of `SparkSession.sparkContext` method.
   /// This is an empty `Struct` type because `sparkContext` method is designed 
to throw
   /// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.
diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift 
b/Tests/SparkConnectTests/SparkSessionTests.swift
index 4a2a549..f302349 100644
--- a/Tests/SparkConnectTests/SparkSessionTests.swift
+++ b/Tests/SparkConnectTests/SparkSessionTests.swift
@@ -74,4 +74,15 @@ struct SparkSessionTests {
     #expect(try await spark.range(0, 100, 2).count() == 50)
     await spark.stop()
   }
+
+  @Test
+  func time() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    #expect(try await spark.time(spark.range(1000).count) == 1000)
+#if !os(Linux)
+    #expect(try await spark.time(spark.range(1).collect) == [["0"]])
+    try await spark.time(spark.range(10).show)
+#endif
+    await spark.stop()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to