This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new 7ab1e45 [SPARK-51799] Support user-specified schema in `DataFrameReader` 7ab1e45 is described below commit 7ab1e45630fa650abd3dfe3f54d9ef24c966f2af Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Tue Apr 15 12:34:30 2025 +0900 [SPARK-51799] Support user-specified schema in `DataFrameReader` ### What changes were proposed in this pull request? This PR aims to support user-specified schema in `DataFrameReader`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. This is a new addition. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #58 from dongjoon-hyun/SPARK-51799. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/DataFrameReader.swift | 21 ++++++++++++++++++++ Tests/SparkConnectTests/DataFrameReaderTests.swift | 23 ++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/Sources/SparkConnect/DataFrameReader.swift b/Sources/SparkConnect/DataFrameReader.swift index ca00b07..626ff77 100644 --- a/Sources/SparkConnect/DataFrameReader.swift +++ b/Sources/SparkConnect/DataFrameReader.swift @@ -34,6 +34,8 @@ public actor DataFrameReader: Sendable { var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary([:]) + var userSpecifiedSchemaDDL: String? = nil + let sparkSession: SparkSession init(sparkSession: SparkSession) { @@ -85,6 +87,22 @@ public actor DataFrameReader: Sendable { return self } + /// Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema + /// automatically from data. By specifying the schema here, the underlying data source can skip + /// the schema inference step, and thus speed up data loading. + /// - Parameter schema: A DDL schema string. + /// - Returns: A `DataFrameReader`. + public func schema(_ schema: String) async throws -> DataFrameReader { + // Validate by parsing. + do { + _ = try await sparkSession.client.ddlParse(schema) + } catch { + throw SparkConnectError.InvalidTypeException + } + self.userSpecifiedSchemaDDL = schema + return self + } + /// Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external /// key-value stores). /// - Returns: A `DataFrame`. @@ -111,6 +129,9 @@ public actor DataFrameReader: Sendable { dataSource.format = self.source dataSource.paths = self.paths dataSource.options = self.extraOptions.toStringDictionary() + if let userSpecifiedSchemaDDL = self.userSpecifiedSchemaDDL { + dataSource.schema = userSpecifiedSchemaDDL + } var read = Read() read.dataSource = dataSource diff --git a/Tests/SparkConnectTests/DataFrameReaderTests.swift b/Tests/SparkConnectTests/DataFrameReaderTests.swift index 781282e..78968ec 100644 --- a/Tests/SparkConnectTests/DataFrameReaderTests.swift +++ b/Tests/SparkConnectTests/DataFrameReaderTests.swift @@ -85,4 +85,27 @@ struct DataFrameReaderTests { }) await spark.stop() } + + @Test + func schema() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let path = "../examples/src/main/resources/people.json" + #expect(try await spark.read.schema("age SHORT").json(path).dtypes.count == 1) + #expect(try await spark.read.schema("age SHORT").json(path).dtypes[0] == ("age", "smallint")) + #expect(try await spark.read.schema("age SHORT, name STRING").json(path).dtypes[0] == ("age", "smallint")) + #expect(try await spark.read.schema("age SHORT, name STRING").json(path).dtypes[1] == ("name", "string")) + await spark.stop() + } + + @Test + func invalidSchema() async throws { + let spark = try await SparkSession.builder.getOrCreate() + await #expect(throws: SparkConnectError.InvalidTypeException) { + _ = try await spark.read.schema("invalid-name SHORT") + } + await #expect(throws: SparkConnectError.InvalidTypeException) { + _ = try await spark.read.schema("age UNKNOWN_TYPE") + } + await spark.stop() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org