This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 47ffe4094d9d [SPARK-48646][PYTHON] Refine Python data source API
docstring and type hints
47ffe4094d9d is described below
commit 47ffe4094d9d1e7aa3b35afc14c406447d4b2552
Author: allisonwang-db <[email protected]>
AuthorDate: Wed Jun 19 09:07:46 2024 +0900
[SPARK-48646][PYTHON] Refine Python data source API docstring and type hints
### What changes were proposed in this pull request?
This PR improves the docstring and type hints for classes and methods in
`datasource.py`.
### Why are the changes needed?
To improve type hint and docstring.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing docs test.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47003 from allisonwang-db/spark-48646-ds-typing.
Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/datasource.py | 28 ++++++++++++----------
.../pyspark/sql/worker/commit_data_source_write.py | 4 ++--
.../sql/worker/python_streaming_sink_runner.py | 4 ++--
3 files changed, 19 insertions(+), 17 deletions(-)
diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py
index 945a9e519182..8ea36bb04fb6 100644
--- a/python/pyspark/sql/datasource.py
+++ b/python/pyspark/sql/datasource.py
@@ -16,7 +16,7 @@
#
from abc import ABC, abstractmethod
from collections import UserDict
-from typing import Any, Dict, Iterator, List, Sequence, Tuple, Type, Union,
TYPE_CHECKING
+from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Type,
Union, TYPE_CHECKING
from pyspark.sql import Row
from pyspark.sql.types import StructType
@@ -30,7 +30,9 @@ __all__ = [
"DataSource",
"DataSourceReader",
"DataSourceStreamReader",
+ "SimpleDataSourceStreamReader",
"DataSourceWriter",
+ "DataSourceStreamWriter",
"DataSourceRegistration",
"InputPartition",
"SimpleDataSourceStreamReader",
@@ -331,7 +333,7 @@ class DataSourceReader(ABC):
)
@abstractmethod
- def read(self, partition: InputPartition) -> Union[Iterator[Tuple],
Iterator[Row]]:
+ def read(self, partition: InputPartition) -> Iterator[Tuple]:
"""
Generates data for a given partition and returns an iterator of tuples
or rows.
@@ -446,7 +448,7 @@ class DataSourceStreamReader(ABC):
)
@abstractmethod
- def read(self, partition: InputPartition) -> Union[Iterator[Tuple],
Iterator[Row]]:
+ def read(self, partition: InputPartition) -> Iterator[Tuple]:
"""
Generates data for a given partition and returns an iterator of tuples
or rows.
@@ -627,7 +629,7 @@ class DataSourceWriter(ABC):
"""
...
- def commit(self, messages: List["WriterCommitMessage"]) -> None:
+ def commit(self, messages: List[Optional["WriterCommitMessage"]]) -> None:
"""
Commits this writing job with a list of commit messages.
@@ -639,11 +641,11 @@ class DataSourceWriter(ABC):
Parameters
----------
messages : list of :class:`WriterCommitMessage`\\s
- A list of commit messages.
+ A list of commit messages. If a write task fails, the commit
message will be `None`.
"""
...
- def abort(self, messages: List["WriterCommitMessage"]) -> None:
+ def abort(self, messages: List[Optional["WriterCommitMessage"]]) -> None:
"""
Aborts this writing job due to task failures.
@@ -655,7 +657,7 @@ class DataSourceWriter(ABC):
Parameters
----------
messages : list of :class:`WriterCommitMessage`\\s
- A list of commit messages.
+ A list of commit messages. If a write task fails, the commit
message will be `None`.
"""
...
@@ -692,7 +694,7 @@ class DataSourceStreamWriter(ABC):
"""
...
- def commit(self, messages: List["WriterCommitMessage"], batchId: int) ->
None:
+ def commit(self, messages: List[Optional["WriterCommitMessage"]], batchId:
int) -> None:
"""
Commits this microbatch with a list of commit messages.
@@ -703,15 +705,15 @@ class DataSourceStreamWriter(ABC):
Parameters
----------
- messages : List[WriterCommitMessage]
- A list of commit messages.
+ messages : list of :class:`WriterCommitMessage`\\s
+ A list of commit messages. If a write task fails, the commit
message will be `None`.
batchId: int
An integer that uniquely identifies a batch of data being written.
The integer increase by 1 with each microbatch processed.
"""
...
- def abort(self, messages: List["WriterCommitMessage"], batchId: int) ->
None:
+ def abort(self, messages: List[Optional["WriterCommitMessage"]], batchId:
int) -> None:
"""
Aborts this microbatch due to task failures.
@@ -722,8 +724,8 @@ class DataSourceStreamWriter(ABC):
Parameters
----------
- messages : List[WriterCommitMessage]
- A list of commit messages.
+ messages : list of :class:`WriterCommitMessage`\\s
+ A list of commit messages. If a write task fails, the commit
message will be `None`.
batchId: int
An integer that uniquely identifies a batch of data being written.
The integer increase by 1 with each microbatch processed.
diff --git a/python/pyspark/sql/worker/commit_data_source_write.py
b/python/pyspark/sql/worker/commit_data_source_write.py
index cf22c19ab3eb..c7783df449d8 100644
--- a/python/pyspark/sql/worker/commit_data_source_write.py
+++ b/python/pyspark/sql/worker/commit_data_source_write.py
@@ -90,9 +90,9 @@ def main(infile: IO, outfile: IO) -> None:
# Commit or abort the Python data source write.
# Note the commit messages can be None if there are failed tasks.
if abort:
- writer.abort(commit_messages) # type: ignore[arg-type]
+ writer.abort(commit_messages)
else:
- writer.commit(commit_messages) # type: ignore[arg-type]
+ writer.commit(commit_messages)
# Send a status code back to JVM.
write_int(0, outfile)
diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py
b/python/pyspark/sql/worker/python_streaming_sink_runner.py
index 98a7a22d0a6f..b84234b309f9 100644
--- a/python/pyspark/sql/worker/python_streaming_sink_runner.py
+++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py
@@ -115,9 +115,9 @@ def main(infile: IO, outfile: IO) -> None:
# Commit or abort the Python data source write.
# Note the commit messages can be None if there are failed tasks.
if abort:
- writer.abort(commit_messages, batch_id) # type:
ignore[arg-type]
+ writer.abort(commit_messages, batch_id)
else:
- writer.commit(commit_messages, batch_id) # type:
ignore[arg-type]
+ writer.commit(commit_messages, batch_id)
# Send a status code back to JVM.
write_int(0, outfile)
outfile.flush()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]