This is an automated email from the ASF dual-hosted git repository.
cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9b875ce [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to
toPandas
9b875ce is described below
commit 9b875ceada60732899053fbd90728b4944d1c03d
Author: David Li <[email protected]>
AuthorDate: Wed Feb 10 09:58:46 2021 -0800
[SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas
### What changes were proposed in this pull request?
Creating a Pandas dataframe via Apache Arrow currently can use twice as
much memory as the final result, because during the conversion, both Pandas and
Arrow retain a copy of the data. Arrow has a "self-destruct" mode now (Arrow >=
0.16) to avoid this, by freeing each column after conversion. This PR
integrates support for this in toPandas, handling a couple of edge cases:
self_destruct has no effect unless the memory is allocated appropriately,
which is handled in the Arrow serializer here. Essentially, the issue is that
self_destruct frees memory column-wise, but Arrow record batches are oriented
row-wise:
```
Record batch 0: allocation 0: column 0 chunk 0, column 1 chunk 0, ...
Record batch 1: allocation 1: column 0 chunk 1, column 1 chunk 1, ...
```
In this scenario, Arrow will drop references to all of column 0's chunks,
but no memory will actually be freed, as the chunks were just slices of an
underlying allocation. The PR copies each column into its own allocation so
that memory is instead arranged as so:
```
Record batch 0: allocation 0 column 0 chunk 0, allocation 1 column 1 chunk
0, ...
Record batch 1: allocation 2 column 0 chunk 1, allocation 3 column 1 chunk
1, ...
```
The optimization is disabled by default, and can be enabled with the Spark
SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled" set to
"true". We can't always apply this optimization because it's more likely to
generate a dataframe with immutable buffers, which Pandas doesn't always handle
well, and because it is slower overall (since it only converts one column at a
time instead of in parallel).
### Why are the changes needed?
This lets us load larger datasets - in particular, with N bytes of memory,
before we could never load a dataset bigger than N/2 bytes; now the overhead is
more like N/1.25 or so.
### Does this PR introduce _any_ user-facing change?
Yes - it adds a new SQL conf
"spark.sql.execution.arrow.pyspark.selfDestruct.enabled"
### How was this patch tested?
See the [mailing
list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Reducing-memory-usage-of-toPandas-with-Arrow-quot-self-destruct-quot-option-td30149.html)
- it was tested with Python memory_profiler. Unit tests added to check memory
within certain bounds and correctness with the option enabled.
Closes #29818 from lidavidm/spark-32953.
Authored-by: David Li <[email protected]>
Signed-off-by: Bryan Cutler <[email protected]>
---
python/pyspark/sql/pandas/conversion.py | 48 ++++++++++++++++++++--
python/pyspark/sql/tests/test_arrow.py | 33 ++++++++++++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++
3 files changed, 89 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/sql/pandas/conversion.py
b/python/pyspark/sql/pandas/conversion.py
index d8a2414..92ef7ce 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -105,13 +105,29 @@ class PandasConversionMixin(object):
import pyarrow
# Rename columns to avoid duplicated column names.
tmp_column_names = ['col_{}'.format(i) for i in
range(len(self.columns))]
- batches = self.toDF(*tmp_column_names)._collect_as_arrow()
+ self_destruct =
self.sql_ctx._conf.arrowPySparkSelfDestructEnabled()
+ batches = self.toDF(*tmp_column_names)._collect_as_arrow(
+ split_batches=self_destruct)
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
+ # Ensure only the table has a reference to the
batches, so that
+ # self_destruct (if enabled) is effective
+ del batches
# Pandas DataFrame created from PyArrow uses
datetime64[ns] for date type
# values, but we should use datetime.date to match the
behavior with when
# Arrow optimization is disabled.
- pdf = table.to_pandas(date_as_object=True)
+ pandas_options = {'date_as_object': True}
+ if self_destruct:
+ # Configure PyArrow to use as little memory as
possible:
+ # self_destruct - free columns as they are
converted
+ # split_blocks - create a separate Pandas block
for each column
+ # use_threads - convert one column at a time
+ pandas_options.update({
+ 'self_destruct': True,
+ 'split_blocks': True,
+ 'use_threads': False,
+ })
+ pdf = table.to_pandas(**pandas_options)
# Rename back to the original column names.
pdf.columns = self.columns
for field in self.schema:
@@ -225,11 +241,16 @@ class PandasConversionMixin(object):
else:
return None
- def _collect_as_arrow(self):
+ def _collect_as_arrow(self, split_batches=False):
"""
Returns all records as a list of ArrowRecordBatches, pyarrow must be
installed
and available on driver and worker Python environments.
This is an experimental feature.
+
+ :param split_batches: split batches such that each column is in its
own allocation, so
+ that the selfDestruct optimization is effective; default False.
+
+ .. note:: Experimental.
"""
from pyspark.sql.dataframe import DataFrame
@@ -240,7 +261,26 @@ class PandasConversionMixin(object):
# Collect list of un-ordered batches where last element is a list of
correct order indices
try:
- results = list(_load_from_socket((port, auth_secret),
ArrowCollectSerializer()))
+ batch_stream = _load_from_socket((port, auth_secret),
ArrowCollectSerializer())
+ if split_batches:
+ # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled,
ensure
+ # each column in each record batch is contained in its own
allocation.
+ # Otherwise, selfDestruct does nothing; it frees each column
as its
+ # converted, but each column will actually be a list of slices
of record
+ # batches, and so no memory is actually freed until all
columns are
+ # converted.
+ import pyarrow as pa
+ results = []
+ for batch_or_indices in batch_stream:
+ if isinstance(batch_or_indices, pa.RecordBatch):
+ batch_or_indices = pa.RecordBatch.from_arrays([
+ # This call actually reallocates the array
+ pa.concat_arrays([array])
+ for array in batch_or_indices
+ ], schema=batch_or_indices.schema)
+ results.append(batch_or_indices)
+ else:
+ results = list(batch_stream)
finally:
# Join serving thread and raise any exceptions from
collectAsArrowToPython
jsocket_auth_server.getResult()
diff --git a/python/pyspark/sql/tests/test_arrow.py
b/python/pyspark/sql/tests/test_arrow.py
index 938e67f..1843607 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -25,7 +25,7 @@ from distutils.version import LooseVersion
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row, SparkSession
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import rand, udf
from pyspark.sql.types import StructType, StringType, IntegerType, LongType, \
FloatType, DoubleType, DecimalType, DateType, TimestampType, BinaryType,
StructField, \
ArrayType, NullType
@@ -196,6 +196,37 @@ class ArrowTests(ReusedSQLTestCase):
pdf_arrow = df.toPandas()
assert_frame_equal(pdf_arrow, pdf)
+ def test_pandas_self_destruct(self):
+ import pyarrow as pa
+ rows = 2 ** 10
+ cols = 4
+ expected_bytes = rows * cols * 8
+ df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+ # Test the self_destruct behavior by testing _collect_as_arrow directly
+ allocation_before = pa.total_allocated_bytes()
+ batches = df._collect_as_arrow(split_batches=True)
+ table = pa.Table.from_batches(batches)
+ del batches
+ pdf_split = table.to_pandas(self_destruct=True, split_blocks=True,
use_threads=False)
+ allocation_after = pa.total_allocated_bytes()
+ difference = allocation_after - allocation_before
+ # Should be around 1x the data size (table should not hold on to any
memory)
+ self.assertGreaterEqual(difference, 0.9 * expected_bytes)
+ self.assertLessEqual(difference, 1.1 * expected_bytes)
+
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled":
False}):
+ no_self_destruct_pdf = df.toPandas()
+ # Note while memory usage is 2x data size here (both table and pdf
hold on to
+ # memory), in this case Arrow still only tracks 1x worth of memory
(since the
+ # batches are not allocated by Arrow in this case), so we can't
make any
+ # assertions here
+
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+ self_destruct_pdf = df.toPandas()
+
+ assert_frame_equal(pdf_split, no_self_destruct_pdf)
+ assert_frame_equal(pdf_split, self_destruct_pdf)
+
def test_filtered_frame(self):
df = self.spark.range(3).toDF("i")
pdf = df.filter("i < 0").toPandas()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 544a062..2865257 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2033,6 +2033,17 @@ object SQLConf {
.version("3.0.0")
.fallbackConf(ARROW_EXECUTION_ENABLED)
+ val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED =
+ buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")
+ .doc("When true, make use of Apache Arrow's self-destruct and
split-blocks options " +
+ "for columnar data transfers in PySpark, when converting from Arrow to
Pandas. " +
+ "This reduces memory usage at the cost of some CPU time. " +
+ "This optimization applies to: pyspark.sql.DataFrame.toPandas " +
+ "when 'spark.sql.execution.arrow.pyspark.enabled' is set.")
+ .version("3.2.0")
+ .booleanConf
+ .createWithDefault(false)
+
val PYSPARK_JVM_STACKTRACE_ENABLED =
buildConf("spark.sql.pyspark.jvmStacktrace.enabled")
.doc("When true, it shows the JVM stacktrace in the user-facing PySpark
exception " +
@@ -3609,6 +3620,8 @@ class SQLConf extends Serializable with Logging {
def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)
+ def arrowPySparkSelfDestructEnabled: Boolean =
getConf(ARROW_PYSPARK_SELF_DESTRUCT_ENABLED)
+
def pysparkJVMStacktraceEnabled: Boolean =
getConf(PYSPARK_JVM_STACKTRACE_ENABLED)
def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]