This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dfb35bed522c [SPARK-47146][CORE] Possible thread leak when doing sort
merge join
dfb35bed522c is described below
commit dfb35bed522ca706f8fc18e37c05c1766c8d8a18
Author: JacobZheng0927 <[email protected]>
AuthorDate: Mon Mar 4 23:17:32 2024 -0600
[SPARK-47146][CORE] Possible thread leak when doing sort merge join
### What changes were proposed in this pull request?
Add TaskCompletionListener to close inputStream to avoid thread leakage
caused by unclosed ReadAheadInputStream.
### Why are the changes needed?
SPARK-40849 modified the implementation of `newDaemonSingleThreadExecutor`
to use `newFixedThreadPool` instead of `newSingleThreadExecutor` .The
difference is that `newSingleThreadExecutor` uses the
`FinalizableDelegatedExecutorService`, which provides a `finalize` method that
automatically closes the thread pool. In some cases, sort merge join execution
uses `ReadAheadSteam` and does not close it, so this change caused a thread
leak. Since Finalization is deprecated and subject to re [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45327 from JacobZheng0927/SPARK-47146.
Authored-by: JacobZheng0927 <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../unsafe/sort/UnsafeSorterSpillReader.java | 12 ++++++++
.../scala/org/apache/spark/sql/JoinSuite.scala | 33 +++++++++++++++++++++-
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index db79efd00853..8bd44c8c52c1 100644
---
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -28,6 +28,8 @@ import org.apache.spark.io.ReadAheadInputStream;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.unsafe.Platform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.*;
@@ -36,6 +38,7 @@ import java.io.*;
* of the file format).
*/
public final class UnsafeSorterSpillReader extends UnsafeSorterIterator
implements Closeable {
+ private static final Logger logger =
LoggerFactory.getLogger(ReadAheadInputStream.class);
public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
private InputStream in;
@@ -82,6 +85,15 @@ public final class UnsafeSorterSpillReader extends
UnsafeSorterIterator implemen
Closeables.close(bs, /* swallowIOException = */ true);
throw e;
}
+ if (taskContext != null) {
+ taskContext.addTaskCompletionListener(context -> {
+ try {
+ close();
+ } catch (IOException e) {
+ logger.info("error while closing UnsafeSorterSpillReader", e);
+ }
+ });
+ }
}
@Override
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index f31f60e8df56..be6862f5b96b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
+import
org.apache.spark.internal.config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow,
SortOrder}
@@ -34,7 +35,7 @@ import
org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExch
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.BatchEvalPythonExec
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.SlowSQLTest
@@ -1737,3 +1738,33 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
}
}
+
+class ThreadLeakInSortMergeJoinSuite
+ extends QueryTest
+ with SharedSparkSession
+ with AdaptiveSparkPlanHelper {
+
+ setupTestData()
+ override protected def createSparkSession: TestSparkSession = {
+ SparkSession.cleanupAnyExistingSession()
+ new TestSparkSession(
+ sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+ }
+
+ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+
+ assertSpilled(sparkContext, "inner join") {
+ sql("SELECT * FROM testData JOIN testData2 ON key = a").collect()
+ }
+
+ val readAheadThread = Thread.getAllStackTraces.keySet().asScala
+ .find {
+ _.getName.startsWith("read-ahead")
+ }
+ assert(readAheadThread.isEmpty)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]