This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new 023f450 [SPARK-51857] Support `token/userId/userAgent` parameters in `SparkConnectClient` 023f450 is described below commit 023f450d7947b9835501531de8d56a5100bf3ea0 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Mon Apr 21 23:24:24 2025 +0900 [SPARK-51857] Support `token/userId/userAgent` parameters in `SparkConnectClient` ### What changes were proposed in this pull request? This PR aims to support `token/userId/userAgent` parameters in `SparkConnectClient`. After this PR, `SparkConnectClient` understands the following parameters in its `remote` argument. ``` sc://host1:123/;token=abcd;userId=test;userAgent=myagent ``` ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #78 from dongjoon-hyun/SPARK-51857. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/SparkConnectClient.swift | 37 +++++++++++++++++++--- Sources/SparkConnect/SparkSession.swift | 12 ++----- Tests/SparkConnectTests/RuntimeConfTests.swift | 8 ++--- .../SparkConnectClientTests.swift | 23 ++++++++++---- 4 files changed, 56 insertions(+), 24 deletions(-) diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 0d2413e..3b3f6de 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -23,7 +23,7 @@ import GRPCProtobuf /// Conceptually the remote spark session that communicates with the server public actor SparkConnectClient { - let clientType: String = "swift" + var clientType: String = "swift" let url: URL let host: String let port: Int @@ -36,16 +36,36 @@ public actor SparkConnectClient { /// Create a client to use GRPCClient. /// - Parameters: /// - remote: A string to connect `Spark Connect` server. - /// - user: A string for the user ID of this connection. - init(remote: String, user: String, token: String? = nil) { + init(remote: String) { self.url = URL(string: remote)! self.host = url.host() ?? "localhost" self.port = self.url.port ?? 15002 + var token: String? = nil + let processInfo = ProcessInfo.processInfo +#if os(macOS) || os(Linux) + var userName = processInfo.environment["SPARK_USER"] ?? processInfo.userName +#else + var userName = processInfo.environment["SPARK_USER"] ?? "" +#endif + for param in self.url.path.split(separator: ";").dropFirst().filter({ !$0.isEmpty }) { + let kv = param.split(separator: "=") + switch String(kv[0]) { + case URIParams.PARAM_USER_AGENT: + clientType = String(kv[1]) + case URIParams.PARAM_TOKEN: + token = String(kv[1]) + case URIParams.PARAM_USER_ID: + userName = String(kv[1]) + default: + // Print warning and ignore + print("Unknown parameter: \(param)") + } + } self.token = token ?? ProcessInfo.processInfo.environment["SPARK_CONNECT_AUTHENTICATE_TOKEN"] if let token = self.token { self.intercepters.append(BearerTokenInterceptor(token: token)) } - self.userContext = user.toUserContext + self.userContext = userName.toUserContext } /// Stop the connection. Currently, this API is no-op because we don't reuse the connection yet. @@ -574,4 +594,13 @@ public actor SparkConnectClient { return OneOf_Analyze.isStreaming(isStreaming) }) } + + private enum URIParams { + static let PARAM_USER_ID = "userId" + static let PARAM_USER_AGENT = "userAgent" + static let PARAM_TOKEN = "token" + static let PARAM_USE_SSL = "useSsl" + static let PARAM_SESSION_ID = "sessionId" + static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpcMaxMessageSize" + } } diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift index 3e9bdf1..480f55f 100644 --- a/Sources/SparkConnect/SparkSession.swift +++ b/Sources/SparkConnect/SparkSession.swift @@ -17,7 +17,6 @@ // under the License. // -import Dispatch import Foundation /// The entry point to programming Spark with ``DataFrame`` API. @@ -39,15 +38,8 @@ public actor SparkSession { /// Create a session that uses the specified connection string and userID. /// - Parameters: /// - connection: a string in a patter, `sc://{host}:{port}` - /// - userID: an optional user ID. If absent, `SPARK_USER` environment or ``ProcessInfo.processInfo.userName`` is used. - init(_ connection: String, _ userID: String? = nil) { - let processInfo = ProcessInfo.processInfo -#if os(macOS) || os(Linux) - let userName = processInfo.environment["SPARK_USER"] ?? processInfo.userName -#else - let userName = processInfo.environment["SPARK_USER"] ?? "" -#endif - self.client = SparkConnectClient(remote: connection, user: userID ?? userName) + init(_ connection: String) { + self.client = SparkConnectClient(remote: connection) self.conf = RuntimeConf(self.client) } diff --git a/Tests/SparkConnectTests/RuntimeConfTests.swift b/Tests/SparkConnectTests/RuntimeConfTests.swift index ae891a8..4452085 100644 --- a/Tests/SparkConnectTests/RuntimeConfTests.swift +++ b/Tests/SparkConnectTests/RuntimeConfTests.swift @@ -27,7 +27,7 @@ import Testing struct RuntimeConfTests { @Test func get() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") _ = try await client.connect(UUID().uuidString) let conf = RuntimeConf(client) @@ -42,7 +42,7 @@ struct RuntimeConfTests { @Test func set() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") _ = try await client.connect(UUID().uuidString) let conf = RuntimeConf(client) try await conf.set("spark.test.key1", "value1") @@ -52,7 +52,7 @@ struct RuntimeConfTests { @Test func reset() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") _ = try await client.connect(UUID().uuidString) let conf = RuntimeConf(client) @@ -73,7 +73,7 @@ struct RuntimeConfTests { @Test func getAll() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") _ = try await client.connect(UUID().uuidString) let conf = RuntimeConf(client) let map = try await conf.getAll() diff --git a/Tests/SparkConnectTests/SparkConnectClientTests.swift b/Tests/SparkConnectTests/SparkConnectClientTests.swift index 13dbf4d..a1ef083 100644 --- a/Tests/SparkConnectTests/SparkConnectClientTests.swift +++ b/Tests/SparkConnectTests/SparkConnectClientTests.swift @@ -27,13 +27,24 @@ import Testing struct SparkConnectClientTests { @Test func createAndStop() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") + await client.stop() + } + + @Test + func parameters() async throws { + let client = SparkConnectClient(remote: "sc://host1:123/;token=abcd;userId=test;userAgent=myagent") + #expect(await client.token == "abcd") + #expect(await client.userContext.userID == "test") + #expect(await client.clientType == "myagent") + #expect(await client.host == "host1") + #expect(await client.port == 123) await client.stop() } @Test func connectWithInvalidUUID() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") try await #require(throws: SparkConnectError.InvalidSessionIDException) { let _ = try await client.connect("not-a-uuid-format") } @@ -42,14 +53,14 @@ struct SparkConnectClientTests { @Test func connect() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") let _ = try await client.connect(UUID().uuidString) await client.stop() } @Test func tags() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") let _ = try await client.connect(UUID().uuidString) let plan = await client.getPlanRange(0, 1, 1) @@ -65,7 +76,7 @@ struct SparkConnectClientTests { @Test func ddlParse() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") let _ = try await client.connect(UUID().uuidString) #expect(try await client.ddlParse("a int").simpleString == "struct<a:int>") await client.stop() @@ -74,7 +85,7 @@ struct SparkConnectClientTests { #if !os(Linux) // TODO: Enable this with the offical Spark 4 docker image @Test func jsonToDdl() async throws { - let client = SparkConnectClient(remote: "sc://localhost", user: "test") + let client = SparkConnectClient(remote: "sc://localhost") let _ = try await client.connect(UUID().uuidString) let json = #"{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}"# --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org