This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new 19e1b35 [SPARK-51656] Support `time` for `SparkSession`
19e1b35 is described below
commit 19e1b3587451a658583f0969d4825a3360a193bf
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Mar 28 10:42:17 2025 -0700
[SPARK-51656] Support `time` for `SparkSession`
### What changes were proposed in this pull request?
This PR aims to support `time` for `SparkSession`.
### Why are the changes needed?
For feature parity with the Scala client.
```swift
try await spark.time(spark.range(1000).count)
try await spark.time(spark.range(1).collect)
try await spark.time(spark.range(10).show)
```
### Does this PR introduce _any_ user-facing change?
No, this is a new addition to the unreleased version.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #33 from dongjoon-hyun/SPARK-51656.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/SparkSession.swift | 15 +++++++++++++++
Tests/SparkConnectTests/SparkSessionTests.swift | 11 +++++++++++
2 files changed, 26 insertions(+)
diff --git a/Sources/SparkConnect/SparkSession.swift
b/Sources/SparkConnect/SparkSession.swift
index 0d7546b..8a61d86 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -17,6 +17,7 @@
// under the License.
//
+import Dispatch
import Foundation
import GRPCCore
import GRPCNIOTransportHTTP2
@@ -116,12 +117,26 @@ public actor SparkSession {
return try await DataFrame(spark: self, sqlText: sqlText)
}
+ /// Returns a ``DataFrameReader`` that can be used to read non-streaming
data in as a
+ /// `DataFrame`
var read: DataFrameReader {
get {
return DataFrameReader(sparkSession: self)
}
}
+ /// Executes some code block and prints to stdout the time taken to execute
the block.
+ /// - Parameter f: A function to execute.
+ /// - Returns: The result of the executed code.
+ public func time<T: Sendable>(_ f: () async throws -> T) async throws -> T {
+ let start = DispatchTime.now()
+ let ret = try await f()
+ let end = DispatchTime.now()
+ let elapsed = (end.uptimeNanoseconds - start.uptimeNanoseconds) / 1_000_000
+ print("Time taken: \(elapsed) ms")
+ return ret
+ }
+
/// This is defined as the return type of `SparkSession.sparkContext` method.
/// This is an empty `Struct` type because `sparkContext` method is designed
to throw
/// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.
diff --git a/Tests/SparkConnectTests/SparkSessionTests.swift
b/Tests/SparkConnectTests/SparkSessionTests.swift
index 4a2a549..f302349 100644
--- a/Tests/SparkConnectTests/SparkSessionTests.swift
+++ b/Tests/SparkConnectTests/SparkSessionTests.swift
@@ -74,4 +74,15 @@ struct SparkSessionTests {
#expect(try await spark.range(0, 100, 2).count() == 50)
await spark.stop()
}
+
+ @Test
+ func time() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ #expect(try await spark.time(spark.range(1000).count) == 1000)
+#if !os(Linux)
+ #expect(try await spark.time(spark.range(1).collect) == [["0"]])
+ try await spark.time(spark.range(10).show)
+#endif
+ await spark.stop()
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]