This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 52e217f  [SPARK-52748] Support `defineDataset`
52e217f is described below

commit 52e217fee52045f3291af1918872374393b61df4
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Thu Jul 10 06:26:19 2025 -0700

    [SPARK-52748] Support `defineDataset`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `defineDataset ` API in order to support 
`Declarative Pipelines` (SPARK-51727) of Apache Spark `4.1.0-preview1`.
    
    ### Why are the changes needed?
    
    To support the new feature incrementally.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new feature.
    
    ### How was this patch tested?
    
    Pass the CIs with `4.1.0-preview1` test pipeline because we added it.
    - #210
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #211 from dongjoon-hyun/SPARK-52748.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 Sources/SparkConnect/Extension.swift               | 12 ++++++++
 Sources/SparkConnect/SparkConnectClient.swift      | 35 ++++++++++++++++++++++
 Sources/SparkConnect/SparkConnectError.swift       |  1 +
 Sources/SparkConnect/TypeAliases.swift             |  1 +
 .../SparkConnectClientTests.swift                  | 22 ++++++++++++++
 5 files changed, 71 insertions(+)

diff --git a/Sources/SparkConnect/Extension.swift 
b/Sources/SparkConnect/Extension.swift
index 4307a94..5ae22e8 100644
--- a/Sources/SparkConnect/Extension.swift
+++ b/Sources/SparkConnect/Extension.swift
@@ -181,6 +181,18 @@ extension String {
     default: .UNRECOGNIZED(-1)
     }
   }
+
+  var toDatasetType: DatasetType {
+    let mode =
+      switch self {
+      case "unspecified": DatasetType.unspecified
+      case "materializedView": DatasetType.materializedView
+      case "table": DatasetType.table
+      case "temporaryView": DatasetType.temporaryView
+      default: DatasetType.UNRECOGNIZED(-1)
+      }
+    return mode
+  }
 }
 
 extension [String: String] {
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index e86e8ba..109ee0d 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -145,6 +145,8 @@ public actor SparkConnectClient {
           throw SparkConnectError.InvalidViewName
         case let m where m.contains("DATA_SOURCE_NOT_FOUND"):
           throw SparkConnectError.DataSourceNotFound
+        case let m where m.contains("DATASET_TYPE_UNSPECIFIED"):
+          throw SparkConnectError.DatasetTypeUnspecified
         default:
           throw error
         }
@@ -1237,6 +1239,39 @@ public actor SparkConnectClient {
     }
   }
 
+  @discardableResult
+  func defineDataset(
+    _ dataflowGraphID: String,
+    _ datasetName: String,
+    _ datasetType: String,
+    _ comment: String? = nil
+  ) async throws -> Bool {
+    try await withGPRC { client in
+      if UUID(uuidString: dataflowGraphID) == nil {
+        throw SparkConnectError.InvalidArgument
+      }
+
+      var defineDataset = Spark_Connect_PipelineCommand.DefineDataset()
+      defineDataset.dataflowGraphID = dataflowGraphID
+      defineDataset.datasetName = datasetName
+      defineDataset.datasetType = datasetType.toDatasetType
+      if let comment {
+        defineDataset.comment = comment
+      }
+
+      var pipelineCommand = Spark_Connect_PipelineCommand()
+      pipelineCommand.commandType = .defineDataset(defineDataset)
+
+      var command = Spark_Connect_Command()
+      command.commandType = .pipelineCommand(pipelineCommand)
+
+      let responses = try await execute(self.sessionID!, command)
+      return responses.contains {
+        $0.responseType == 
.pipelineCommandResult(Spark_Connect_PipelineCommandResult())
+      }
+    }
+  }
+
   private enum URIParams {
     static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
     static let PARAM_SESSION_ID = "session_id"
diff --git a/Sources/SparkConnect/SparkConnectError.swift 
b/Sources/SparkConnect/SparkConnectError.swift
index 9c8d1c6..dde93c3 100644
--- a/Sources/SparkConnect/SparkConnectError.swift
+++ b/Sources/SparkConnect/SparkConnectError.swift
@@ -22,6 +22,7 @@ public enum SparkConnectError: Error {
   case CatalogNotFound
   case ColumnNotFound
   case DataSourceNotFound
+  case DatasetTypeUnspecified
   case InvalidArgument
   case InvalidSessionID
   case InvalidType
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index b061f32..c0bacdb 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -23,6 +23,7 @@ typealias AnalyzePlanResponse = 
Spark_Connect_AnalyzePlanResponse
 typealias Command = Spark_Connect_Command
 typealias ConfigRequest = Spark_Connect_ConfigRequest
 typealias DataSource = Spark_Connect_Read.DataSource
+typealias DatasetType = Spark_Connect_DatasetType
 typealias DataType = Spark_Connect_DataType
 typealias DayTimeInterval = Spark_Connect_DataType.DayTimeInterval
 typealias Drop = Spark_Connect_Drop
diff --git a/Tests/SparkConnectTests/SparkConnectClientTests.swift 
b/Tests/SparkConnectTests/SparkConnectClientTests.swift
index 955a9c8..1983be2 100644
--- a/Tests/SparkConnectTests/SparkConnectClientTests.swift
+++ b/Tests/SparkConnectTests/SparkConnectClientTests.swift
@@ -124,4 +124,26 @@ struct SparkConnectClientTests {
     }
     await client.stop()
   }
+
+  @Test
+  func defineDataset() async throws {
+    let client = SparkConnectClient(remote: TEST_REMOTE)
+    let response = try await client.connect(UUID().uuidString)
+
+    try await #require(throws: SparkConnectError.InvalidArgument) {
+      try await client.defineDataset("not-a-uuid-format", "ds1", "table")
+    }
+
+    if response.sparkVersion.version.starts(with: "4.1") {
+      let dataflowGraphID = try await client.createDataflowGraph()
+      #expect(UUID(uuidString: dataflowGraphID) != nil)
+      try await #require(throws: SparkConnectError.DatasetTypeUnspecified) {
+        try await client.defineDataset(dataflowGraphID, "ds1", "unspecified")
+      }
+      #expect(try await client.defineDataset(dataflowGraphID, "ds2", 
"materializedView"))
+      #expect(try await client.defineDataset(dataflowGraphID, "ds3", "table"))
+      #expect(try await client.defineDataset(dataflowGraphID, "ds4", 
"temporaryView"))
+    }
+    await client.stop()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to