This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new 325c6b4 [SPARK-52758] Support `defineSqlGraphElements` 325c6b4 is described below commit 325c6b4aa32493974292e697dd3bf5c1b529bd0d Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu Jul 10 11:47:21 2025 -0700 [SPARK-52758] Support `defineSqlGraphElements` ### What changes were proposed in this pull request? This PR aims to support `defineSqlGraphElements ` API in order to support `Declarative Pipelines` (SPARK-51727) of Apache Spark `4.1.0-preview1`. ### Why are the changes needed? To support the new feature incrementally. ### Does this PR introduce _any_ user-facing change? No, this is a new feature. ### How was this patch tested? Pass the CIs with `4.1.0-preview1` test pipeline. <img width="1447" height="213" alt="Screenshot 2025-07-10 at 10 40 58" src="https://github.com/user-attachments/assets/e3ce1b44-b6b1-45a6-bc34-f457ccbf0615" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #213 from dongjoon-hyun/SPARK-52758. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/SparkConnectClient.swift | 29 ++++++++++++++++++++++ .../SparkConnectClientTests.swift | 18 ++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 8ea6250..7f033fd 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -1303,6 +1303,35 @@ public actor SparkConnectClient { } } + @discardableResult + func defineSqlGraphElements( + _ dataflowGraphID: String, + _ sqlFilePath: String, + _ sqlText: String + ) async throws -> Bool { + try await withGPRC { client in + if UUID(uuidString: dataflowGraphID) == nil { + throw SparkConnectError.InvalidArgument + } + + var elements = Spark_Connect_PipelineCommand.DefineSqlGraphElements() + elements.dataflowGraphID = dataflowGraphID + elements.sqlFilePath = sqlFilePath + elements.sqlText = sqlText + + var pipelineCommand = Spark_Connect_PipelineCommand() + pipelineCommand.commandType = .defineSqlGraphElements(elements) + + var command = Spark_Connect_Command() + command.commandType = .pipelineCommand(pipelineCommand) + + let responses = try await execute(self.sessionID!, command) + return responses.contains { + $0.responseType == .pipelineCommandResult(Spark_Connect_PipelineCommandResult()) + } + } + } + private enum URIParams { static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size" static let PARAM_SESSION_ID = "session_id" diff --git a/Tests/SparkConnectTests/SparkConnectClientTests.swift b/Tests/SparkConnectTests/SparkConnectClientTests.swift index 72e31ba..ccdad48 100644 --- a/Tests/SparkConnectTests/SparkConnectClientTests.swift +++ b/Tests/SparkConnectTests/SparkConnectClientTests.swift @@ -164,4 +164,22 @@ struct SparkConnectClientTests { } await client.stop() } + + @Test + func defineSqlGraphElements() async throws { + let client = SparkConnectClient(remote: TEST_REMOTE) + let response = try await client.connect(UUID().uuidString) + + try await #require(throws: SparkConnectError.InvalidArgument) { + try await client.defineSqlGraphElements("not-a-uuid-format", "path", "sql") + } + + if response.sparkVersion.version.starts(with: "4.1") { + let dataflowGraphID = try await client.createDataflowGraph() + let sqlText = "CREATE MATERIALIZED VIEW mv1 AS SELECT 1" + #expect(UUID(uuidString: dataflowGraphID) != nil) + #expect(try await client.defineSqlGraphElements(dataflowGraphID, "path", sqlText)) + } + await client.stop() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org