This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 47ffe4094d9d [SPARK-48646][PYTHON] Refine Python data source API 
docstring and type hints
47ffe4094d9d is described below

commit 47ffe4094d9d1e7aa3b35afc14c406447d4b2552
Author: allisonwang-db <[email protected]>
AuthorDate: Wed Jun 19 09:07:46 2024 +0900

    [SPARK-48646][PYTHON] Refine Python data source API docstring and type hints
    
    ### What changes were proposed in this pull request?
    
    This PR improves the docstring and type hints for classes and methods in 
`datasource.py`.
    
    ### Why are the changes needed?
    
    To improve type hint and docstring.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing docs test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47003 from allisonwang-db/spark-48646-ds-typing.
    
    Authored-by: allisonwang-db <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/datasource.py                   | 28 ++++++++++++----------
 .../pyspark/sql/worker/commit_data_source_write.py |  4 ++--
 .../sql/worker/python_streaming_sink_runner.py     |  4 ++--
 3 files changed, 19 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py
index 945a9e519182..8ea36bb04fb6 100644
--- a/python/pyspark/sql/datasource.py
+++ b/python/pyspark/sql/datasource.py
@@ -16,7 +16,7 @@
 #
 from abc import ABC, abstractmethod
 from collections import UserDict
-from typing import Any, Dict, Iterator, List, Sequence, Tuple, Type, Union, 
TYPE_CHECKING
+from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Type, 
Union, TYPE_CHECKING
 
 from pyspark.sql import Row
 from pyspark.sql.types import StructType
@@ -30,7 +30,9 @@ __all__ = [
     "DataSource",
     "DataSourceReader",
     "DataSourceStreamReader",
+    "SimpleDataSourceStreamReader",
     "DataSourceWriter",
+    "DataSourceStreamWriter",
     "DataSourceRegistration",
     "InputPartition",
     "SimpleDataSourceStreamReader",
@@ -331,7 +333,7 @@ class DataSourceReader(ABC):
         )
 
     @abstractmethod
-    def read(self, partition: InputPartition) -> Union[Iterator[Tuple], 
Iterator[Row]]:
+    def read(self, partition: InputPartition) -> Iterator[Tuple]:
         """
         Generates data for a given partition and returns an iterator of tuples 
or rows.
 
@@ -446,7 +448,7 @@ class DataSourceStreamReader(ABC):
         )
 
     @abstractmethod
-    def read(self, partition: InputPartition) -> Union[Iterator[Tuple], 
Iterator[Row]]:
+    def read(self, partition: InputPartition) -> Iterator[Tuple]:
         """
         Generates data for a given partition and returns an iterator of tuples 
or rows.
 
@@ -627,7 +629,7 @@ class DataSourceWriter(ABC):
         """
         ...
 
-    def commit(self, messages: List["WriterCommitMessage"]) -> None:
+    def commit(self, messages: List[Optional["WriterCommitMessage"]]) -> None:
         """
         Commits this writing job with a list of commit messages.
 
@@ -639,11 +641,11 @@ class DataSourceWriter(ABC):
         Parameters
         ----------
         messages : list of :class:`WriterCommitMessage`\\s
-            A list of commit messages.
+            A list of commit messages. If a write task fails, the commit 
message will be `None`.
         """
         ...
 
-    def abort(self, messages: List["WriterCommitMessage"]) -> None:
+    def abort(self, messages: List[Optional["WriterCommitMessage"]]) -> None:
         """
         Aborts this writing job due to task failures.
 
@@ -655,7 +657,7 @@ class DataSourceWriter(ABC):
         Parameters
         ----------
         messages : list of :class:`WriterCommitMessage`\\s
-            A list of commit messages.
+            A list of commit messages. If a write task fails, the commit 
message will be `None`.
         """
         ...
 
@@ -692,7 +694,7 @@ class DataSourceStreamWriter(ABC):
         """
         ...
 
-    def commit(self, messages: List["WriterCommitMessage"], batchId: int) -> 
None:
+    def commit(self, messages: List[Optional["WriterCommitMessage"]], batchId: 
int) -> None:
         """
         Commits this microbatch with a list of commit messages.
 
@@ -703,15 +705,15 @@ class DataSourceStreamWriter(ABC):
 
         Parameters
         ----------
-        messages : List[WriterCommitMessage]
-            A list of commit messages.
+        messages : list of :class:`WriterCommitMessage`\\s
+            A list of commit messages. If a write task fails, the commit 
message will be `None`.
         batchId: int
             An integer that uniquely identifies a batch of data being written.
             The integer increase by 1 with each microbatch processed.
         """
         ...
 
-    def abort(self, messages: List["WriterCommitMessage"], batchId: int) -> 
None:
+    def abort(self, messages: List[Optional["WriterCommitMessage"]], batchId: 
int) -> None:
         """
         Aborts this microbatch due to task failures.
 
@@ -722,8 +724,8 @@ class DataSourceStreamWriter(ABC):
 
         Parameters
         ----------
-        messages : List[WriterCommitMessage]
-            A list of commit messages.
+        messages : list of :class:`WriterCommitMessage`\\s
+            A list of commit messages. If a write task fails, the commit 
message will be `None`.
         batchId: int
             An integer that uniquely identifies a batch of data being written.
             The integer increase by 1 with each microbatch processed.
diff --git a/python/pyspark/sql/worker/commit_data_source_write.py 
b/python/pyspark/sql/worker/commit_data_source_write.py
index cf22c19ab3eb..c7783df449d8 100644
--- a/python/pyspark/sql/worker/commit_data_source_write.py
+++ b/python/pyspark/sql/worker/commit_data_source_write.py
@@ -90,9 +90,9 @@ def main(infile: IO, outfile: IO) -> None:
         # Commit or abort the Python data source write.
         # Note the commit messages can be None if there are failed tasks.
         if abort:
-            writer.abort(commit_messages)  # type: ignore[arg-type]
+            writer.abort(commit_messages)
         else:
-            writer.commit(commit_messages)  # type: ignore[arg-type]
+            writer.commit(commit_messages)
 
         # Send a status code back to JVM.
         write_int(0, outfile)
diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py 
b/python/pyspark/sql/worker/python_streaming_sink_runner.py
index 98a7a22d0a6f..b84234b309f9 100644
--- a/python/pyspark/sql/worker/python_streaming_sink_runner.py
+++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py
@@ -115,9 +115,9 @@ def main(infile: IO, outfile: IO) -> None:
             # Commit or abort the Python data source write.
             # Note the commit messages can be None if there are failed tasks.
             if abort:
-                writer.abort(commit_messages, batch_id)  # type: 
ignore[arg-type]
+                writer.abort(commit_messages, batch_id)
             else:
-                writer.commit(commit_messages, batch_id)  # type: 
ignore[arg-type]
+                writer.commit(commit_messages, batch_id)
             # Send a status code back to JVM.
             write_int(0, outfile)
             outfile.flush()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to