This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new fc58395f08b Revert "[SPARK-44461][SS][PYTHON][CONNECT] Verify Python
Version for spark connect streaming workers"
fc58395f08b is described below
commit fc58395f08ba81e84e60b9f0260f257a6f8b4fc1
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Aug 10 15:17:53 2023 -0700
Revert "[SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark
connect streaming workers"
This reverts commit 6b55d618d36bdd296b3883916328d26863e94b8a.
---
.../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +-
python/pyspark/sql/connect/streaming/query.py | 3 +--
python/pyspark/sql/connect/streaming/readwriter.py | 3 +--
python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py | 3 ---
python/pyspark/sql/connect/streaming/worker/listener_worker.py | 3 ---
5 files changed, 3 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index cddda6fb7a7..1a75965eb92 100644
---
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -83,7 +83,7 @@ private[spark] class StreamingPythonRunner(
val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream,
bufferSize)
val dataOut = new DataOutputStream(stream)
- PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+ // TODO(SPARK-44461): verify python version
// Send sessionId
PythonRDD.writeUTF(sessionId, dataOut)
diff --git a/python/pyspark/sql/connect/streaming/query.py
b/python/pyspark/sql/connect/streaming/query.py
index 021d27e939d..59e98e7bc30 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -23,7 +23,6 @@ from pyspark.errors import StreamingQueryException,
PySparkValueError
import pyspark.sql.connect.proto as pb2
from pyspark.serializers import CloudPickleSerializer
from pyspark.sql.connect import proto
-from pyspark.sql.connect.utils import get_python_ver
from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.streaming.query import (
StreamingQuery as PySparkStreamingQuery,
@@ -238,7 +237,7 @@ class StreamingQueryManager:
cmd = pb2.StreamingQueryManagerCommand()
expr = proto.PythonUDF()
expr.command = CloudPickleSerializer().dumps(listener)
- expr.python_ver = get_python_ver()
+ expr.python_ver = "%d.%d" % sys.version_info[:2]
cmd.add_listener.python_listener_payload.CopyFrom(expr)
cmd.add_listener.id = listener._id
self._execute_streaming_query_manager_cmd(cmd)
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py
b/python/pyspark/sql/connect/streaming/readwriter.py
index 89097fcf43a..c8cd408404f 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -31,7 +31,6 @@ from pyspark.sql.streaming.readwriter import (
DataStreamReader as PySparkDataStreamReader,
DataStreamWriter as PySparkDataStreamWriter,
)
-from pyspark.sql.connect.utils import get_python_ver
from pyspark.sql.types import Row, StructType
from pyspark.errors import PySparkTypeError, PySparkValueError
@@ -500,7 +499,7 @@ class DataStreamWriter:
self._write_proto.foreach_batch.python_function.command =
CloudPickleSerializer().dumps(
func
)
- self._write_proto.foreach_batch.python_function.python_ver =
get_python_ver()
+ self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" %
sys.version_info[:2]
return self
foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__
diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
index cf61463cd68..48a9848de40 100644
--- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
@@ -31,15 +31,12 @@ from pyspark.serializers import (
from pyspark import worker
from pyspark.sql import SparkSession
from typing import IO
-from pyspark.worker_util import check_python_version
pickle_ser = CPickleSerializer()
utf8_deserializer = UTF8Deserializer()
def main(infile: IO, outfile: IO) -> None:
- check_python_version(infile)
-
connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"]
session_id = utf8_deserializer.loads(infile)
diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py
b/python/pyspark/sql/connect/streaming/worker/listener_worker.py
index e1f4678e42f..7aef911426d 100644
--- a/python/pyspark/sql/connect/streaming/worker/listener_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/listener_worker.py
@@ -39,15 +39,12 @@ from pyspark.sql.streaming.listener import (
QueryTerminatedEvent,
QueryIdleEvent,
)
-from pyspark.worker_util import check_python_version
pickle_ser = CPickleSerializer()
utf8_deserializer = UTF8Deserializer()
def main(infile: IO, outfile: IO) -> None:
- check_python_version(infile)
-
connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"]
session_id = utf8_deserializer.loads(infile)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]