This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new c1817d7  [SPARK-51693] Support `storageLevel` for `DataFrame`
c1817d7 is described below

commit c1817d794f70c0b0b3d500be3d1dbf3ac0867faf
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Apr 3 06:36:19 2025 +0900

    [SPARK-51693] Support `storageLevel` for `DataFrame`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `DataFrame.storageLevel`.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a new addition to the unreleased version.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ```
    $ swift test --filter DataFrameTests.storageLevel
    􀟈  Suite DataFrameTests started.
    􀟈  Test storageLevel() started.
    􁁛  Test storageLevel() passed after 0.075 seconds.
    􁁛  Suite DataFrameTests passed after 0.075 seconds.
    􁁛  Test run with 1 test passed after 0.075 seconds.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #38 from dongjoon-hyun/SPARK-51693.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/DataFrame.swift          | 15 +++++++++++++++
 Sources/SparkConnect/SparkConnectClient.swift | 11 +++++++++++
 Sources/SparkConnect/StorageLevel.swift       | 18 ++++++++++++++++++
 Tests/SparkConnectTests/DataFrameTests.swift  | 18 ++++++++++++++++++
 4 files changed, 62 insertions(+)

diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index b96e02d..96c36be 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -342,6 +342,21 @@ public actor DataFrame: Sendable {
     return self
   }
 
+  var storageLevel: StorageLevel {
+    get async throws {
+      try await withGRPCClient(
+        transport: .http2NIOPosix(
+          target: .dns(host: spark.client.host, port: spark.client.port),
+          transportSecurity: .plaintext
+        )
+      ) { client in
+        let service = Spark_Connect_SparkConnectService.Client(wrapping: 
client)
+        return try await service
+          .analyzePlan(spark.client.getStorageLevel(spark.sessionID, 
plan)).getStorageLevel.storageLevel.toStorageLevel
+      }
+    }
+  }
+
   public func explain() async throws {
     try await explain("simple")
   }
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index 9dfde0d..3314c55 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -282,6 +282,17 @@ public actor SparkConnectClient {
       })
   }
 
+  func getStorageLevel(_ sessionID: String, _ plan: Plan) async -> 
AnalyzePlanRequest
+  {
+    return analyze(
+      sessionID,
+      {
+        var level = AnalyzePlanRequest.GetStorageLevel()
+        level.relation = plan.root
+        return OneOf_Analyze.getStorageLevel(level)
+      })
+  }
+
   func getExplain(_ sessionID: String, _ plan: Plan, _ mode: String) async -> 
AnalyzePlanRequest
   {
     return analyze(
diff --git a/Sources/SparkConnect/StorageLevel.swift 
b/Sources/SparkConnect/StorageLevel.swift
index c2e6f04..524b507 100644
--- a/Sources/SparkConnect/StorageLevel.swift
+++ b/Sources/SparkConnect/StorageLevel.swift
@@ -78,6 +78,12 @@ extension StorageLevel {
     level.replication = self.replication
     return level
   }
+
+  public static func == (lhs: StorageLevel, rhs: StorageLevel) -> Bool {
+    return lhs.useDisk == rhs.useDisk && lhs.useMemory == rhs.useMemory
+      && lhs.useOffHeap == rhs.useOffHeap && lhs.deserialized == 
rhs.deserialized
+      && lhs.replication == rhs.replication
+  }
 }
 
 extension StorageLevel: CustomStringConvertible {
@@ -86,3 +92,15 @@ extension StorageLevel: CustomStringConvertible {
       "StorageLevel(useDisk: \(useDisk), useMemory: \(useMemory), useOffHeap: 
\(useOffHeap), deserialized: \(deserialized), replication: \(replication))"
   }
 }
+
+extension Spark_Connect_StorageLevel {
+  var toStorageLevel: StorageLevel {
+    return StorageLevel(
+      useDisk: self.useDisk,
+      useMemory: self.useMemory,
+      useOffHeap: self.useOffHeap,
+      deserialized: self.deserialized,
+      replication: self.replication
+    )
+  }
+}
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift 
b/Tests/SparkConnectTests/DataFrameTests.swift
index f9dd37e..7e903bd 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -308,4 +308,22 @@ struct DataFrameTests {
     await spark.stop()
   }
 #endif
+
+  @Test
+  func storageLevel() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.range(1)
+
+    _ = try await df.unpersist()
+    #expect(try await df.storageLevel == StorageLevel.NONE)
+    _ = try await df.persist()
+    #expect(try await df.storageLevel == StorageLevel.MEMORY_AND_DISK)
+
+    _ = try await df.unpersist()
+    #expect(try await df.storageLevel == StorageLevel.NONE)
+    _ = try await df.persist(storageLevel: StorageLevel.MEMORY_ONLY)
+    #expect(try await df.storageLevel == StorageLevel.MEMORY_ONLY)
+
+    await spark.stop()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to