This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b1e57a2b359 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of
finalize for RemoteBlockPushResolver
b1e57a2b359 is described below
commit b1e57a2b359d7d9fbf07adfba10db97f38b99bde
Author: zhaomin <[email protected]>
AuthorDate: Wed Oct 18 01:20:08 2023 -0500
[SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for
RemoteBlockPushResolver
### What changes were proposed in this pull request?
use java.lang.ref.Cleaner instead of finalize() for RemoteBlockPushResolver
### Why are the changes needed?
The finalize() method has been marked as deprecated since Java 9 and will
be removed in the future, java.lang.ref.Cleaner is the more recommended
solution.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43371 from zhaomin1423/45315.
Authored-by: zhaomin <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../network/shuffle/RemoteBlockPushResolver.java | 101 +++++++++++++--------
.../network/shuffle/ShuffleTestAccessor.scala | 2 +-
2 files changed, 64 insertions(+), 39 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index a915d0eccb0..14fefebe089 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
@@ -94,6 +95,7 @@ import org.apache.spark.network.util.TransportConf;
*/
public class RemoteBlockPushResolver implements MergedShuffleFileManager {
+ private static final Cleaner CLEANER = Cleaner.create();
private static final Logger logger =
LoggerFactory.getLogger(RemoteBlockPushResolver.class);
public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
@@ -481,7 +483,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) ->
shuffleInfo.shuffleMergePartitions
.forEach((shuffleMergeId, partitionInfo) -> {
synchronized (partitionInfo) {
- partitionInfo.closeAllFilesAndDeleteIfNeeded(false);
+ partitionInfo.cleanable.clean();
}
}));
if (cleanupLocalDirs) {
@@ -537,7 +539,8 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
partitions
.forEach((partitionId, partitionInfo) -> {
synchronized (partitionInfo) {
- partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+ partitionInfo.cleanable.clean();
+ partitionInfo.deleteAllFiles();
}
});
}
@@ -822,7 +825,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId,
partition.reduceId,
ioe.getMessage());
} finally {
- partition.closeAllFilesAndDeleteIfNeeded(false);
+ partition.cleanable.clean();
}
}
}
@@ -1720,6 +1723,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// The meta file for a particular merged shuffle contains all the map
indices that belong to
// every chunk. The entry per chunk is a serialized bitmap.
private final MergeShuffleFile metaFile;
+ private final Cleaner.Cleanable cleanable;
// Location offset of the last successfully merged block for this shuffle
partition
private long dataFilePos;
// Track the map index whose block is being merged for this shuffle
partition
@@ -1756,6 +1760,8 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
this.dataFilePos = 0;
this.mapTracker = new RoaringBitmap();
this.chunkTracker = new RoaringBitmap();
+ this.cleanable = CLEANER.register(this, new ResourceCleaner(dataChannel,
indexFile,
+ metaFile, appAttemptShuffleMergeId, reduceId));
}
public long getDataFilePos() {
@@ -1864,36 +1870,13 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
metaFile.getChannel().truncate(metaFile.getPos());
}
- void closeAllFilesAndDeleteIfNeeded(boolean delete) {
- try {
- if (dataChannel.isOpen()) {
- dataChannel.close();
- }
- if (delete) {
- dataFile.delete();
- }
- } catch (IOException ioe) {
- logger.warn("Error closing data channel for {} reduceId {}",
- appAttemptShuffleMergeId, reduceId);
- }
- try {
- metaFile.close();
- if (delete) {
- metaFile.delete();
- }
- } catch (IOException ioe) {
- logger.warn("Error closing meta file for {} reduceId {}",
- appAttemptShuffleMergeId, reduceId);
- }
- try {
- indexFile.close();
- if (delete) {
- indexFile.delete();
- }
- } catch (IOException ioe) {
- logger.warn("Error closing index file for {} reduceId {}",
- appAttemptShuffleMergeId, reduceId);
+ private void deleteAllFiles() {
+ if (!dataFile.delete()) {
+ logger.info("Error deleting data file for {} reduceId {}",
+ appAttemptShuffleMergeId, reduceId);
}
+ metaFile.delete();
+ indexFile.delete();
}
@Override
@@ -1904,11 +1887,6 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
reduceId);
}
- @Override
- protected void finalize() throws Throwable {
- closeAllFilesAndDeleteIfNeeded(false);
- }
-
@VisibleForTesting
MergeShuffleFile getIndexFile() {
return indexFile;
@@ -1933,6 +1911,53 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
int getNumIOExceptions() {
return numIOExceptions;
}
+
+ @VisibleForTesting
+ Cleaner.Cleanable getCleanable() {
+ return cleanable;
+ }
+
+ private record ResourceCleaner(
+ FileChannel dataChannel,
+ MergeShuffleFile indexFile,
+ MergeShuffleFile metaFile,
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+ int reduceId) implements Runnable {
+
+ @Override
+ public void run() {
+ closeAllFiles(dataChannel, indexFile, metaFile,
appAttemptShuffleMergeId,
+ reduceId);
+ }
+
+ private void closeAllFiles(
+ FileChannel dataChannel,
+ MergeShuffleFile indexFile,
+ MergeShuffleFile metaFile,
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+ int reduceId) {
+ try {
+ if (dataChannel.isOpen()) {
+ dataChannel.close();
+ }
+ } catch (IOException ioe) {
+ logger.warn("Error closing data channel for {} reduceId {}",
+ appAttemptShuffleMergeId, reduceId);
+ }
+ try {
+ metaFile.close();
+ } catch (IOException ioe) {
+ logger.warn("Error closing meta file for {} reduceId {}",
+ appAttemptShuffleMergeId, reduceId);
+ }
+ try {
+ indexFile.close();
+ } catch (IOException ioe) {
+ logger.warn("Error closing index file for {} reduceId {}",
+ appAttemptShuffleMergeId, reduceId);
+ }
+ }
+ }
}
/**
@@ -2108,7 +2133,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
}
}
- void delete() throws IOException {
+ void delete() {
try {
if (null != file) {
file.delete();
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
index 092dee1a1e8..6e8e581c699 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
@@ -187,7 +187,7 @@ object ShuffleTestAccessor {
}
def closePartitionFiles(partitionInfo: AppShufflePartitionInfo): Unit = {
- partitionInfo.closeAllFilesAndDeleteIfNeeded(false)
+ partitionInfo.getCleanable.clean()
}
def clearAppShuffleInfo(mergeMgr: RemoteBlockPushResolver): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]