This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push: new f97822d [SPARK-51626] Support `DataFrameReader` f97822d is described below commit f97822db564c81e47a6ecbdc47d75e0f7c6e5343 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Wed Mar 26 22:26:45 2025 -0700 [SPARK-51626] Support `DataFrameReader` ### What changes were proposed in this pull request? This PR aims to support `DataFrameReader`. ### Why are the changes needed? For the feature parity. ### Does this PR introduce _any_ user-facing change? No, this is a new addition to the unreleased version. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #29 from dongjoon-hyun/SPARK-51626. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- Sources/SparkConnect/DataFrameReader.swift | 168 +++++++++++++++++++++ Sources/SparkConnect/SparkSession.swift | 6 + Sources/SparkConnect/TypeAliases.swift | 2 + Tests/SparkConnectTests/DataFrameReaderTests.swift | 67 ++++++++ 4 files changed, 243 insertions(+) diff --git a/Sources/SparkConnect/DataFrameReader.swift b/Sources/SparkConnect/DataFrameReader.swift new file mode 100644 index 0000000..1041bc3 --- /dev/null +++ b/Sources/SparkConnect/DataFrameReader.swift @@ -0,0 +1,168 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +import Atomics +import Foundation +import GRPCCore +import GRPCNIOTransportHTTP2 +import GRPCProtobuf +import NIOCore +import SwiftyTextTable +import Synchronization + +/// An interface used to load a `DataFrame` from external storage systems +/// (e.g. file systems, key-value stores, etc). Use `SparkSession.read` to access this. +public actor DataFrameReader: Sendable { + var source: String = "" + + var paths: [String] = [] + + // TODO: Case-insensitive Map + var extraOptions: [String: String] = [:] + + let sparkSession: SparkSession + + init(sparkSession: SparkSession) { + self.sparkSession = sparkSession + } + + /// Specifies the input data source format. + /// - Parameter source: A string. + /// - Returns: A `DataFrameReader`. + public func format(_ source: String) -> DataFrameReader { + self.source = source + return self + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A value string. + /// - Returns: A `DataFrameReader`. + public func option(_ key: String, _ value: String) -> DataFrameReader { + self.extraOptions[key] = value + return self + } + + /// Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external + /// key-value stores). + /// - Returns: A `DataFrame`. + public func load() -> DataFrame { + return load([]) + } + + /// Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a + /// local or distributed file system). + /// - Parameter path: A path string. + /// - Returns: A `DataFrame`. + public func load(_ path: String) -> DataFrame { + return load([path]) + } + + /// Loads input in as a `DataFrame`, for data sources that support multiple paths. Only works if + /// the source is a HadoopFsRelationProvider. + /// - Parameter paths: An array of path strings. + /// - Returns: A `DataFrame`. + public func load(_ paths: [String]) -> DataFrame { + self.paths = paths + + var dataSource = DataSource() + dataSource.format = self.source + dataSource.paths = self.paths + dataSource.options = self.extraOptions + + var read = Read() + read.dataSource = dataSource + + var relation = Relation() + relation.read = read + + var plan = Plan() + plan.opType = .root(relation) + + return DataFrame(spark: sparkSession, plan: plan) + } + + /// Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other + /// overloaded `csv()` method for more details. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func csv(_ path: String) -> DataFrame { + self.source = "csv" + return load(path) + } + + /// Loads CSV files and returns the result as a `DataFrame`. + /// This function will go through the input once to determine the input schema if `inferSchema` + /// is enabled. To avoid going through the entire data once, disable `inferSchema` option or + /// specify the schema explicitly using `schema`. + /// - Parameter paths: Path strings. + /// - Returns: A `DataFrame`. + public func csv(_ paths: String...) -> DataFrame { + self.source = "csv" + return load(paths) + } + + /// Loads a JSON file and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func json(_ path: String) -> DataFrame { + self.source = "json" + return load(path) + } + + /// Loads JSON files and returns the result as a `DataFrame`. + /// - Parameter paths: Path strings + /// - Returns: A `DataFrame`. + public func json(_ paths: String...) -> DataFrame { + self.source = "json" + return load(paths) + } + + /// Loads an ORC file and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func orc(_ path: String) -> DataFrame { + self.source = "orc" + return load(path) + } + + /// Loads ORC files and returns the result as a `DataFrame`. + /// - Parameter paths: Path strings + /// - Returns: A `DataFrame`. + public func orc(_ paths: String...) -> DataFrame { + self.source = "orc" + return load(paths) + } + + /// Loads a Parquet file and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func parquet(_ path: String) -> DataFrame { + self.source = "parquet" + return load(path) + } + + /// Loads Parquet files, returning the result as a `DataFrame`. + /// - Parameter paths: Path strings + /// - Returns: A `DataFrame`. + public func parquet(_ paths: String...) -> DataFrame { + self.source = "parquet" + return load(paths) + } +} diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift index 524f46a..0d7546b 100644 --- a/Sources/SparkConnect/SparkSession.swift +++ b/Sources/SparkConnect/SparkSession.swift @@ -116,6 +116,12 @@ public actor SparkSession { return try await DataFrame(spark: self, sqlText: sqlText) } + var read: DataFrameReader { + get { + return DataFrameReader(sparkSession: self) + } + } + /// This is defined as the return type of `SparkSession.sparkContext` method. /// This is an empty `Struct` type because `sparkContext` method is designed to throw /// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`. diff --git a/Sources/SparkConnect/TypeAliases.swift b/Sources/SparkConnect/TypeAliases.swift index 2823e5f..44f48f9 100644 --- a/Sources/SparkConnect/TypeAliases.swift +++ b/Sources/SparkConnect/TypeAliases.swift @@ -19,6 +19,7 @@ typealias AnalyzePlanRequest = Spark_Connect_AnalyzePlanRequest typealias AnalyzePlanResponse = Spark_Connect_AnalyzePlanResponse typealias ConfigRequest = Spark_Connect_ConfigRequest +typealias DataSource = Spark_Connect_Read.DataSource typealias DataType = Spark_Connect_DataType typealias ExecutePlanRequest = Spark_Connect_ExecutePlanRequest typealias ExpressionString = Spark_Connect_Expression.ExpressionString @@ -29,6 +30,7 @@ typealias OneOf_Analyze = AnalyzePlanRequest.OneOf_Analyze typealias Plan = Spark_Connect_Plan typealias Project = Spark_Connect_Project typealias Range = Spark_Connect_Range +typealias Read = Spark_Connect_Read typealias Relation = Spark_Connect_Relation typealias SparkConnectService = Spark_Connect_SparkConnectService typealias Sort = Spark_Connect_Sort diff --git a/Tests/SparkConnectTests/DataFrameReaderTests.swift b/Tests/SparkConnectTests/DataFrameReaderTests.swift new file mode 100644 index 0000000..c159b8f --- /dev/null +++ b/Tests/SparkConnectTests/DataFrameReaderTests.swift @@ -0,0 +1,67 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Foundation +import Testing + +@testable import SparkConnect + +/// A test suite for `DataFrameReader` +struct DataFrameReaderTests { + + @Test + func csv() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let path = "../examples/src/main/resources/people.csv" + #expect(try await spark.read.format("csv").load(path).count() == 3) + #expect(try await spark.read.csv(path).count() == 3) + #expect(try await spark.read.csv(path, path).count() == 6) + await spark.stop() + } + + @Test + func json() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let path = "../examples/src/main/resources/people.json" + #expect(try await spark.read.format("json").load(path).count() == 3) + #expect(try await spark.read.json(path).count() == 3) + #expect(try await spark.read.json(path, path).count() == 6) + await spark.stop() + } + + @Test + func orc() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let path = "../examples/src/main/resources/users.orc" + #expect(try await spark.read.format("orc").load(path).count() == 2) + #expect(try await spark.read.orc(path).count() == 2) + #expect(try await spark.read.orc(path, path).count() == 4) + await spark.stop() + } + + @Test + func parquet() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let path = "../examples/src/main/resources/users.parquet" + #expect(try await spark.read.format("parquet").load(path).count() == 2) + #expect(try await spark.read.parquet(path).count() == 2) + #expect(try await spark.read.parquet(path, path).count() == 4) + await spark.stop() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org