This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new f97822d  [SPARK-51626] Support `DataFrameReader`
f97822d is described below

commit f97822db564c81e47a6ecbdc47d75e0f7c6e5343
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Wed Mar 26 22:26:45 2025 -0700

    [SPARK-51626] Support `DataFrameReader`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `DataFrameReader`.
    
    ### Why are the changes needed?
    
    For the feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new addition to the unreleased version.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #29 from dongjoon-hyun/SPARK-51626.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 Sources/SparkConnect/DataFrameReader.swift         | 168 +++++++++++++++++++++
 Sources/SparkConnect/SparkSession.swift            |   6 +
 Sources/SparkConnect/TypeAliases.swift             |   2 +
 Tests/SparkConnectTests/DataFrameReaderTests.swift |  67 ++++++++
 4 files changed, 243 insertions(+)

diff --git a/Sources/SparkConnect/DataFrameReader.swift 
b/Sources/SparkConnect/DataFrameReader.swift
new file mode 100644
index 0000000..1041bc3
--- /dev/null
+++ b/Sources/SparkConnect/DataFrameReader.swift
@@ -0,0 +1,168 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Atomics
+import Foundation
+import GRPCCore
+import GRPCNIOTransportHTTP2
+import GRPCProtobuf
+import NIOCore
+import SwiftyTextTable
+import Synchronization
+
+/// An interface used to load a `DataFrame` from external storage systems
+/// (e.g. file systems, key-value stores, etc). Use `SparkSession.read` to 
access this.
+public actor DataFrameReader: Sendable {
+  var source: String = ""
+
+  var paths: [String] = []
+
+  // TODO: Case-insensitive Map
+  var extraOptions: [String: String] = [:]
+
+  let sparkSession: SparkSession
+
+  init(sparkSession: SparkSession) {
+    self.sparkSession = sparkSession
+  }
+
+  /// Specifies the input data source format.
+  /// - Parameter source: A string.
+  /// - Returns: A `DataFrameReader`.
+  public func format(_ source: String) -> DataFrameReader {
+    self.source = source
+    return self
+  }
+
+  /// Adds an input option for the underlying data source.
+  /// - Parameters:
+  ///   - key: A key string.
+  ///   - value: A value string.
+  /// - Returns: A `DataFrameReader`.
+  public func option(_ key: String, _ value: String) -> DataFrameReader {
+    self.extraOptions[key] = value
+    return self
+  }
+
+  /// Loads input in as a `DataFrame`, for data sources that don't require a 
path (e.g. external
+  /// key-value stores).
+  /// - Returns: A `DataFrame`.
+  public func load() -> DataFrame {
+    return load([])
+  }
+
+  /// Loads input in as a `DataFrame`, for data sources that require a path 
(e.g. data backed by a
+  /// local or distributed file system).
+  /// - Parameter path: A path string.
+  /// - Returns: A `DataFrame`.
+  public func load(_ path: String) -> DataFrame {
+    return load([path])
+  }
+
+  /// Loads input in as a `DataFrame`, for data sources that support multiple 
paths. Only works if
+  /// the source is a HadoopFsRelationProvider.
+  /// - Parameter paths: An array of path strings.
+  /// - Returns: A `DataFrame`.
+  public func load(_ paths: [String]) -> DataFrame {
+    self.paths = paths
+
+    var dataSource = DataSource()
+    dataSource.format = self.source
+    dataSource.paths = self.paths
+    dataSource.options = self.extraOptions
+
+    var read = Read()
+    read.dataSource = dataSource
+
+    var relation = Relation()
+    relation.read = read
+
+    var plan = Plan()
+    plan.opType = .root(relation)
+
+    return DataFrame(spark: sparkSession, plan: plan)
+  }
+
+  /// Loads a CSV file and returns the result as a `DataFrame`. See the 
documentation on the other
+  /// overloaded `csv()` method for more details.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func csv(_ path: String) -> DataFrame {
+    self.source = "csv"
+    return load(path)
+  }
+
+  /// Loads CSV files and returns the result as a `DataFrame`.
+  /// This function will go through the input once to determine the input 
schema if `inferSchema`
+  /// is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
+  /// specify the schema explicitly using `schema`.
+  /// - Parameter paths: Path strings.
+  /// - Returns: A `DataFrame`.
+  public func csv(_ paths: String...) -> DataFrame {
+    self.source = "csv"
+    return load(paths)
+  }
+
+  /// Loads a JSON file and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func json(_ path: String) -> DataFrame {
+    self.source = "json"
+    return load(path)
+  }
+
+  /// Loads JSON files and returns the result as a `DataFrame`.
+  /// - Parameter paths: Path strings
+  /// - Returns: A `DataFrame`.
+  public func json(_ paths: String...) -> DataFrame {
+    self.source = "json"
+    return load(paths)
+  }
+
+  /// Loads an ORC file and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func orc(_ path: String) -> DataFrame {
+    self.source = "orc"
+    return load(path)
+  }
+
+  /// Loads ORC files and returns the result as a `DataFrame`.
+  /// - Parameter paths: Path strings
+  /// - Returns: A `DataFrame`.
+  public func orc(_ paths: String...) -> DataFrame {
+    self.source = "orc"
+    return load(paths)
+  }
+
+  /// Loads a Parquet file and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func parquet(_ path: String) -> DataFrame {
+    self.source = "parquet"
+    return load(path)
+  }
+
+  /// Loads Parquet files, returning the result as a `DataFrame`.
+  /// - Parameter paths: Path strings
+  /// - Returns: A `DataFrame`.
+  public func parquet(_ paths: String...) -> DataFrame {
+    self.source = "parquet"
+    return load(paths)
+  }
+}
diff --git a/Sources/SparkConnect/SparkSession.swift 
b/Sources/SparkConnect/SparkSession.swift
index 524f46a..0d7546b 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -116,6 +116,12 @@ public actor SparkSession {
     return try await DataFrame(spark: self, sqlText: sqlText)
   }
 
+  var read: DataFrameReader {
+    get {
+      return DataFrameReader(sparkSession: self)
+    }
+  }
+
   /// This is defined as the return type of `SparkSession.sparkContext` method.
   /// This is an empty `Struct` type because `sparkContext` method is designed 
to throw
   /// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index 2823e5f..44f48f9 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -19,6 +19,7 @@
 typealias AnalyzePlanRequest = Spark_Connect_AnalyzePlanRequest
 typealias AnalyzePlanResponse = Spark_Connect_AnalyzePlanResponse
 typealias ConfigRequest = Spark_Connect_ConfigRequest
+typealias DataSource = Spark_Connect_Read.DataSource
 typealias DataType = Spark_Connect_DataType
 typealias ExecutePlanRequest = Spark_Connect_ExecutePlanRequest
 typealias ExpressionString = Spark_Connect_Expression.ExpressionString
@@ -29,6 +30,7 @@ typealias OneOf_Analyze = AnalyzePlanRequest.OneOf_Analyze
 typealias Plan = Spark_Connect_Plan
 typealias Project = Spark_Connect_Project
 typealias Range = Spark_Connect_Range
+typealias Read = Spark_Connect_Read
 typealias Relation = Spark_Connect_Relation
 typealias SparkConnectService = Spark_Connect_SparkConnectService
 typealias Sort = Spark_Connect_Sort
diff --git a/Tests/SparkConnectTests/DataFrameReaderTests.swift 
b/Tests/SparkConnectTests/DataFrameReaderTests.swift
new file mode 100644
index 0000000..c159b8f
--- /dev/null
+++ b/Tests/SparkConnectTests/DataFrameReaderTests.swift
@@ -0,0 +1,67 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import Foundation
+import Testing
+
+@testable import SparkConnect
+
+/// A test suite for `DataFrameReader`
+struct DataFrameReaderTests {
+
+  @Test
+  func csv() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let path = "../examples/src/main/resources/people.csv"
+    #expect(try await spark.read.format("csv").load(path).count() == 3)
+    #expect(try await spark.read.csv(path).count() == 3)
+    #expect(try await spark.read.csv(path, path).count() == 6)
+    await spark.stop()
+  }
+
+  @Test
+  func json() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let path = "../examples/src/main/resources/people.json"
+    #expect(try await spark.read.format("json").load(path).count() == 3)
+    #expect(try await spark.read.json(path).count() == 3)
+    #expect(try await spark.read.json(path, path).count() == 6)
+    await spark.stop()
+  }
+
+  @Test
+  func orc() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let path = "../examples/src/main/resources/users.orc"
+    #expect(try await spark.read.format("orc").load(path).count() == 2)
+    #expect(try await spark.read.orc(path).count() == 2)
+    #expect(try await spark.read.orc(path, path).count() == 4)
+    await spark.stop()
+  }
+
+  @Test
+  func parquet() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let path = "../examples/src/main/resources/users.parquet"
+    #expect(try await spark.read.format("parquet").load(path).count() == 2)
+    #expect(try await spark.read.parquet(path).count() == 2)
+    #expect(try await spark.read.parquet(path, path).count() == 4)
+    await spark.stop()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to