This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e424499917f [SPARK-40480][SHUFFLE] Remove push-based shuffle data
after query finished
e424499917f is described below
commit e424499917fd15ec5d05910ff12eb9c8fabb8447
Author: Kun Wan <[email protected]>
AuthorDate: Sat Jan 14 04:24:11 2023 -0600
[SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished
### What changes were proposed in this pull request?
Cleanup merged shuffle data files after query finished.
The main changes are:
* After push merge service received FinalizeShuffleMerge RPC from driver,
it will mark the MergePartitionsInfo as finilized instead of remove it.
* Delete the shuffle merge files if partition.mapTracker is empty.
* When spark driver begin to cleanup shuffle, if push-based shuffle is
enabled, spark driver will send RemoveShuffleMerge RPC to all merger locations
to delete all merge data files.
### Why are the changes needed?
There will be too many merged shuffle data files for long running spark
applications.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Local cluster test.
Closes #37922 from wankunde/remove_shuffle_merge.
Lead-authored-by: Kun Wan <[email protected]>
Co-authored-by: wankun <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
---
.../spark/network/shuffle/BlockStoreClient.java | 14 +++
.../network/shuffle/ExternalBlockHandler.java | 6 ++
.../network/shuffle/ExternalBlockStoreClient.java | 17 +++
.../network/shuffle/MergedShuffleFileManager.java | 9 ++
.../shuffle/NoOpMergedShuffleFileManager.java | 6 ++
.../network/shuffle/RemoteBlockPushResolver.java | 111 ++++++++++++++++++-
.../shuffle/protocol/BlockTransferMessage.java | 3 +-
.../shuffle/protocol/RemoveShuffleMerge.java | 102 ++++++++++++++++++
.../shuffle/RemoteBlockPushResolverSuite.java | 117 +++++++++++++++++++++
.../scala/org/apache/spark/MapOutputTracker.scala | 2 +-
.../org/apache/spark/scheduler/DAGScheduler.scala | 33 +++---
.../spark/storage/BlockManagerMasterEndpoint.scala | 36 +++++--
.../org/apache/spark/MapOutputTrackerSuite.scala | 64 ++++++++++-
13 files changed, 484 insertions(+), 36 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
index 253fb7aca1d..32222e910df 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
@@ -255,4 +255,18 @@ public abstract class BlockStoreClient implements
Closeable {
MergedBlocksMetaListener listener) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Remove the shuffle merge data in shuffle services
+ *
+ * @param host the host of the remote node.
+ * @param port the port of the remote node.
+ * @param shuffleId shuffle id.
+ * @param shuffleMergeId shuffle merge id.
+ *
+ * @since 3.4.0
+ */
+ public boolean removeShuffleMerge(String host, int port, int shuffleId, int
shuffleMergeId) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 4e40090b065..3d7c1b1ca0c 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -224,6 +224,12 @@ public class ExternalBlockHandler extends RpcHandler
} finally {
responseDelayContext.stop();
}
+ } else if (msgObj instanceof RemoveShuffleMerge) {
+ RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
+ checkAuth(client, msg.appId);
+ logger.info("Removing shuffle merge data for application {} shuffle {}
shuffleMerge {}",
+ msg.appId, msg.shuffleId, msg.shuffleMergeId);
+ mergeManager.removeShuffleMerge(msg);
} else if (msgObj instanceof DiagnoseCorruption) {
DiagnoseCorruption msg = (DiagnoseCorruption) msgObj;
checkAuth(client, msg.appId);
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index b066d99e8ef..1451d571281 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -256,6 +256,23 @@ public class ExternalBlockStoreClient extends
BlockStoreClient {
}
}
+ @Override
+ public boolean removeShuffleMerge(String host, int port, int shuffleId, int
shuffleMergeId) {
+ checkInit();
+ try {
+ TransportClient client = clientFactory.createClient(host, port);
+ client.send(
+ new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId,
shuffleMergeId)
+ .toByteBuffer());
+ // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC
+ } catch (Exception e) {
+ logger.debug("Exception while sending RemoveShuffleMerge request to
{}:{}",
+ host, port, e);
+ return false;
+ }
+ return true;
+ }
+
@Override
public MetricSet shuffleMetrics() {
checkInit();
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
index 7176b30ba08..cd5bb507dbe 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
@@ -29,6 +29,7 @@ import
org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
/**
* The MergedShuffleFileManager is used to process push based shuffle when
enabled. It works
@@ -124,6 +125,14 @@ public interface MergedShuffleFileManager {
*/
String[] getMergedBlockDirs(String appId);
+ /**
+ * Remove shuffle merge data files.
+ *
+ * @param removeShuffleMerge contains shuffle details (appId, shuffleId,
etc) to uniquely
+ * identify a shuffle to be removed
+ */
+ void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge);
+
/**
* Optionally close any resources associated the MergedShuffleFileManager,
such as the
* leveldb for state persistence.
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
index 876b1009593..7d8f9e27402 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
@@ -26,6 +26,7 @@ import
org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.util.TransportConf;
/**
@@ -84,4 +85,9 @@ public class NoOpMergedShuffleFileManager implements
MergedShuffleFileManager {
public String[] getMergedBlockDirs(String appId) {
throw new UnsupportedOperationException("Cannot handle shuffle block
merge");
}
+
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) {
+ throw new UnsupportedOperationException("Cannot handle merged shuffle
remove");
+ }
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 6a65e6ccfab..fb3f8109a1a 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -76,6 +76,7 @@ import
org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.shuffledb.DB;
import org.apache.spark.network.shuffledb.DBBackend;
import org.apache.spark.network.shuffledb.DBIterator;
@@ -100,6 +101,12 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;
+
+ /**
+ * The flag for deleting all merged shuffle data.
+ */
+ public static final int DELETE_ALL_MERGED_SHUFFLE = -1;
+
private static final String DB_KEY_DELIMITER = ";";
private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER =
createErrorHandler();
// ByteBuffer to respond to client upon a successful merge of a pushed block
@@ -404,6 +411,59 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
}
}
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge msg) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ throw new IllegalArgumentException(
+ String.format("The attempt id %s in this RemoveShuffleMerge message
does not match "
+ + "with the current attempt id %s stored in shuffle service
for application %s",
+ msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+ }
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (mergePartitionsInfo == null) {
+ if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+ return null;
+ } else {
+ writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ }
+ }
+ boolean deleteCurrentMergedShuffle =
+ msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+ msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+ int shuffleMergeIdToDelete = msg.shuffleMergeId !=
DELETE_ALL_MERGED_SHUFFLE ?
+ msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+ if (deleteCurrentMergedShuffle ||
+ shuffleMergeIdToDelete > mergePartitionsInfo.shuffleMergeId) {
+ AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+ new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId,
mergePartitionsInfo.shuffleMergeId);
+ if (!mergePartitionsInfo.isFinalized()) {
+ // Clean up shuffle data before the shuffle was finalized. Close and
delete all the open
+ // files.
+ submitCleanupTask(() ->
+ closeAndDeleteOutdatedPartitions(
+ currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
+ } else {
+ // Current shuffle was finalized, delete all the merged files
through reduceIds set
+ // in finalizeShuffleMerge method.
+ submitCleanupTask(() ->
+ deleteMergedFiles(currentAppAttemptShuffleMergeId,
appShuffleInfo,
+ mergePartitionsInfo.getReduceIds(), false));
+ }
+ } else {
+ throw new RuntimeException(String.format("Asked to remove old shuffle
merged data for " +
+ "application %s shuffleId %s shuffleMergeId %s, but current
shuffleMergeId %s ",
+ msg.appId, msg.shuffleId, shuffleMergeIdToDelete,
mergePartitionsInfo.shuffleMergeId));
+ }
+ writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeIdToDelete));
+ return new AppShuffleMergePartitionsInfo(shuffleMergeIdToDelete, true);
+ });
+ }
+
/**
* Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
* If cleanupLocalDirs is true, the merged shuffle files will also be
deleted.
@@ -478,6 +538,40 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
});
}
+ void deleteMergedFiles(
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+ AppShuffleInfo appShuffleInfo,
+ int[] reduceIds,
+ boolean deleteFromDB) {
+ if (deleteFromDB) {
+ removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+ }
+ int shuffleId = appAttemptShuffleMergeId.shuffleId;
+ int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+ int dataFilesDeleteCnt = 0;
+ int indexFilesDeleteCnt = 0;
+ int metaFilesDeleteCnt = 0;
+ for (int reduceId : reduceIds) {
+ File dataFile =
+ appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId,
reduceId);
+ if (dataFile.delete()) {
+ dataFilesDeleteCnt++;
+ }
+ File indexFile = new File(
+ appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId,
shuffleMergeId, reduceId));
+ if (indexFile.delete()) {
+ indexFilesDeleteCnt++;
+ }
+ File metaFile =
+ appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId,
reduceId);
+ if (metaFile.delete()) {
+ metaFilesDeleteCnt++;
+ }
+ }
+ logger.info("Delete {} data files, {} index files, {} meta files for {}",
+ dataFilesDeleteCnt, indexFilesDeleteCnt, metaFilesDeleteCnt,
appAttemptShuffleMergeId);
+ }
+
/**
* Remove the finalized shuffle partition information for a specific
appAttemptShuffleMergeId
* @param appAttemptShuffleMergeId
@@ -730,6 +824,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]),
Ints.toArray(reduceIds),
Longs.toArray(sizes));
+
appShuffleInfo.shuffles.get(msg.shuffleId).setReduceIds(Ints.toArray(reduceIds));
}
logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of
shuffle merge completed",
msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
@@ -1514,6 +1609,8 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
private final int shuffleMergeId;
private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
+ private final AtomicReference<int[]> reduceIds = new AtomicReference<>(new
int[0]);
+
public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean
shuffleFinalized) {
this.shuffleMergeId = shuffleMergeId;
this.shuffleMergePartitions = shuffleFinalized ?
SHUFFLE_FINALIZED_MARKER :
@@ -1528,6 +1625,14 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
public boolean isFinalized() {
return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
}
+
+ public void setReduceIds(int[] reduceIds) {
+ this.reduceIds.set(reduceIds);
+ }
+
+ public int[] getReduceIds() {
+ return this.reduceIds.get();
+ }
}
/**
@@ -1736,9 +1841,9 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
try {
if (dataChannel.isOpen()) {
dataChannel.close();
- if (delete) {
- dataFile.delete();
- }
+ }
+ if (delete) {
+ dataFile.delete();
}
} catch (IOException ioe) {
logger.warn("Error closing data channel for {} reduceId {}",
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index ad959c7e2e7..33411baa09f 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -50,7 +50,7 @@ public abstract class BlockTransferMessage implements
Encodable {
FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10),
LOCAL_DIRS_FOR_EXECUTORS(11),
PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14),
FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16),
CORRUPTION_CAUSE(17),
- PUSH_BLOCK_RETURN_CODE(18);
+ PUSH_BLOCK_RETURN_CODE(18), REMOVE_SHUFFLE_MERGE(19);
private final byte id;
@@ -88,6 +88,7 @@ public abstract class BlockTransferMessage implements
Encodable {
case 16: return DiagnoseCorruption.decode(buf);
case 17: return CorruptionCause.decode(buf);
case 18: return BlockPushReturnCode.decode(buf);
+ case 19: return RemoveShuffleMerge.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " +
type);
}
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java
new file mode 100644
index 00000000000..3bcb57a70bc
--- /dev/null
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Remove the merged data for a given shuffle.
+ * Returns {@link Boolean}
+ *
+ * @since 3.4.0
+ */
+public class RemoveShuffleMerge extends BlockTransferMessage {
+ public final String appId;
+ public final int appAttemptId;
+ public final int shuffleId;
+ public final int shuffleMergeId;
+
+ public RemoveShuffleMerge(
+ String appId,
+ int appAttemptId,
+ int shuffleId,
+ int shuffleMergeId) {
+ this.appId = appId;
+ this.appAttemptId = appAttemptId;
+ this.shuffleId = shuffleId;
+ this.shuffleMergeId = shuffleMergeId;
+ }
+
+ @Override
+ protected Type type() {
+ return Type.REMOVE_SHUFFLE_MERGE;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId, appAttemptId, shuffleId, shuffleMergeId);
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("appId", appId)
+ .append("attemptId", appAttemptId)
+ .append("shuffleId", shuffleId)
+ .append("shuffleMergeId", shuffleMergeId)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof RemoveShuffleMerge) {
+ RemoveShuffleMerge o = (RemoveShuffleMerge) other;
+ return Objects.equal(appId, o.appId)
+ && appAttemptId == o.appAttemptId
+ && shuffleId == o.shuffleId
+ && shuffleMergeId == o.shuffleMergeId;
+ }
+ return false;
+ }
+
+ @Override
+ public int encodedLength() {
+ return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4;
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ Encoders.Strings.encode(buf, appId);
+ buf.writeInt(appAttemptId);
+ buf.writeInt(shuffleId);
+ buf.writeInt(shuffleMergeId);
+ }
+
+ public static RemoveShuffleMerge decode(ByteBuf buf) {
+ String appId = Encoders.Strings.decode(buf);
+ int attemptId = buf.readInt();
+ int shuffleId = buf.readInt();
+ int shuffleMergeId = buf.readInt();
+ return new RemoveShuffleMerge(appId, attemptId, shuffleId, shuffleMergeId);
+ }
+}
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 4bb2220d463..630a651d243 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
@@ -62,6 +63,7 @@ import
org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
@@ -1364,6 +1366,121 @@ public class RemoteBlockPushResolverSuite {
RemoteBlockPushResolver.AppAttemptShuffleMergeId.class));
}
+ @Test
+ public void testRemoveShuffleMerge() throws IOException,
InterruptedException {
+ Semaphore closed = new Semaphore(0);
+ String testApp = "testRemoveShuffleMerge";
+ RemoteBlockPushResolver pushResolver = new RemoteBlockPushResolver(conf,
null) {
+ @Override
+ void closeAndDeleteOutdatedPartitions(
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+ Map<Integer, AppShufflePartitionInfo> partitions) {
+ super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId,
partitions);
+ closed.release();
+ }
+
+ @Override
+ void deleteMergedFiles(
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+ AppShuffleInfo appShuffleInfo,
+ int[] reduceIds,
+ boolean deleteFromDB) {
+ super.deleteMergedFiles(appAttemptShuffleMergeId, appShuffleInfo,
reduceIds, deleteFromDB);
+ closed.release();
+ }
+ };
+ pushResolver.registerExecutor(testApp, new ExecutorShuffleInfo(
+ prepareLocalDirs(localDirs, MERGE_DIRECTORY), 1,
MERGE_DIRECTORY_META));
+ RemoteBlockPushResolver.AppShuffleInfo shuffleInfo =
+ pushResolver.validateAndGetAppShuffleInfo(testApp);
+
+ // 1. Check whether the data is cleaned up when merged shuffle is finalized
+ // 1.1 Cleaned up the merged files when msg.shuffleMergeId is current
shuffleMergeId
+ StreamCallbackWithID streamCallback0 =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+ streamCallback0.onData(streamCallback0.getID(), ByteBuffer.wrap(new
byte[2]));
+ streamCallback0.onComplete(streamCallback0.getID());
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp,
NO_ATTEMPT_ID, 0, 1));
+ assertTrue(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
+ assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1,
0)).exists());
+ assertTrue(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
+ pushResolver.removeShuffleMerge(
+ new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1));
+ closed.tryAcquire(10, TimeUnit.SECONDS);
+ assertFalse(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
+ assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1,
0)).exists());
+ assertFalse(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
+
+ // 1.2 Cleaned up the merged files when msg.shuffleMergeId is
DELETE_ALL_MERGED_SHUFFLE
+ StreamCallbackWithID streamCallback1 =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, NO_ATTEMPT_ID, 1, 1, 0, 0, 0));
+ streamCallback1.onData(streamCallback1.getID(), ByteBuffer.wrap(new
byte[2]));
+ streamCallback1.onComplete(streamCallback1.getID());
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp,
NO_ATTEMPT_ID, 1, 1));
+ assertTrue(shuffleInfo.getMergedShuffleMetaFile(1, 1, 0).exists());
+ assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(1, 1,
0)).exists());
+ assertTrue(shuffleInfo.getMergedShuffleDataFile(1, 1, 0).exists());
+ pushResolver.removeShuffleMerge(new RemoveShuffleMerge(testApp,
NO_ATTEMPT_ID, 1,
+ RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE));
+ closed.tryAcquire(10, TimeUnit.SECONDS);
+ assertFalse(shuffleInfo.getMergedShuffleMetaFile(1, 1, 0).exists());
+ assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1,
0)).exists());
+ assertFalse(shuffleInfo.getMergedShuffleDataFile(1, 1, 0).exists());
+
+ // 1.3 Cleaned up the merged files when msg.shuffleMergeId < current
shuffleMergeId
+ StreamCallbackWithID streamCallback2 =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, NO_ATTEMPT_ID, 2, 1, 0, 0, 0));
+ streamCallback2.onData(streamCallback2.getID(), ByteBuffer.wrap(new
byte[2]));
+ streamCallback2.onComplete(streamCallback2.getID());
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp,
NO_ATTEMPT_ID, 2, 1));
+ assertTrue(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists());
+ assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(2, 1,
0)).exists());
+ assertTrue(shuffleInfo.getMergedShuffleDataFile(2, 1, 0).exists());
+
+ RuntimeException e = assertThrows(RuntimeException.class,
+ () -> pushResolver.removeShuffleMerge(
+ new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 2, 0)));
+ assertEquals("Asked to remove old shuffle merged data for application " +
testApp +
+ " shuffleId 2 shuffleMergeId 0, but current shuffleMergeId 1 ",
e.getMessage());
+
+ // 1.4 Cleaned up the merged files when msg.shuffleMergeId > current
shuffleMergeId
+ StreamCallbackWithID streamCallback3 =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, NO_ATTEMPT_ID, 3, 1, 0, 0, 0));
+ streamCallback3.onData(streamCallback3.getID(), ByteBuffer.wrap(new
byte[2]));
+ streamCallback3.onComplete(streamCallback3.getID());
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp,
NO_ATTEMPT_ID, 3, 1));
+ assertTrue(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists());
+ assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(3, 1,
0)).exists());
+ assertTrue(shuffleInfo.getMergedShuffleDataFile(3, 1, 0).exists());
+ pushResolver.removeShuffleMerge(
+ new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 2));
+ closed.tryAcquire(10, TimeUnit.SECONDS);
+ assertFalse(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists());
+ assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(3, 1,
0)).exists());
+ assertFalse(shuffleInfo.getMergedShuffleDataFile(3, 1, 0).exists());
+
+ // 2. Check whether the data is cleaned up when merged shuffle is not
finalized.
+ StreamCallbackWithID streamCallback4 =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, NO_ATTEMPT_ID, 4, 1, 0, 0, 0));
+ streamCallback4.onData(streamCallback4.getID(), ByteBuffer.wrap(new
byte[2]));
+ streamCallback4.onComplete(streamCallback4.getID());
+ assertTrue(shuffleInfo.getMergedShuffleMetaFile(4, 1, 0).exists());
+ pushResolver.removeShuffleMerge(
+ new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 4, 1));
+ closed.tryAcquire(10, TimeUnit.SECONDS);
+ assertFalse(shuffleInfo.getMergedShuffleMetaFile(4, 1, 0).exists());
+
+ // 3. Check whether the data is cleaned up when higher shuffleMergeId
finalize request comes
+ StreamCallbackWithID streamCallback5 =
pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, NO_ATTEMPT_ID, 5, 1, 0, 0, 0));
+ streamCallback5.onData(streamCallback5.getID(), ByteBuffer.wrap(new
byte[2]));
+ streamCallback5.onComplete(streamCallback5.getID());
+ assertTrue(shuffleInfo.getMergedShuffleMetaFile(5, 1, 0).exists());
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp,
NO_ATTEMPT_ID, 5, 2));
+ closed.tryAcquire(10, TimeUnit.SECONDS);
+ assertFalse(shuffleInfo.getMergedShuffleMetaFile(5, 1, 0).exists());
+ }
+
private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile)
throws IOException {
pushResolver = new RemoteBlockPushResolver(conf, null) {
@Override
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index a163fef693e..fade0b86dd8 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1207,7 +1207,7 @@ private[spark] class MapOutputTrackerMaster(
// This method is only called in local-mode.
override def getShufflePushMergerLocations(shuffleId: Int):
Seq[BlockManagerId] = {
- shuffleStatuses(shuffleId).getShufflePushMergerLocations
+
shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations).getOrElse(Seq.empty)
}
override def stop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff9f37dc488..f1ccaf05509 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1398,9 +1398,7 @@ private[spark] class DAGScheduler(
*/
private def prepareShuffleServicesForShuffleMapStage(stage:
ShuffleMapStage): Unit = {
assert(stage.shuffleDep.shuffleMergeAllowed &&
!stage.shuffleDep.isShuffleMergeFinalizedMarked)
- if (stage.shuffleDep.getMergerLocs.isEmpty) {
- getAndSetShufflePushMergerLocations(stage)
- }
+ configureShufflePushMergerLocations(stage)
val shuffleId = stage.shuffleDep.shuffleId
val shuffleMergeId = stage.shuffleDep.shuffleMergeId
@@ -1415,17 +1413,17 @@ private[spark] class DAGScheduler(
}
}
- private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage):
Seq[BlockManagerId] = {
+ private def configureShufflePushMergerLocations(stage: ShuffleMapStage):
Unit = {
+ if (stage.shuffleDep.getMergerLocs.nonEmpty) return
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
if (mergerLocs.nonEmpty) {
stage.shuffleDep.setMergerLocs(mergerLocs)
+
mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId,
mergerLocs)
+ logDebug(s"Shuffle merge locations for shuffle
${stage.shuffleDep.shuffleId} with" +
+ s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
+ s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
}
-
- logDebug(s"Shuffle merge locations for shuffle
${stage.shuffleDep.shuffleId} with" +
- s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" +
- s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
- mergerLocs
}
/** Called when stage's parents are available and we can now do its task. */
@@ -2629,16 +2627,15 @@ private[spark] class DAGScheduler(
shuffleIdToMapStage.filter { case (_, stage) =>
stage.shuffleDep.shuffleMergeAllowed &&
stage.shuffleDep.getMergerLocs.isEmpty &&
runningStages.contains(stage)
- }.foreach { case(_, stage: ShuffleMapStage) =>
- if (getAndSetShufflePushMergerLocations(stage).nonEmpty) {
- logInfo(s"Shuffle merge enabled adaptively for $stage with
shuffle" +
- s" ${stage.shuffleDep.shuffleId} and shuffle merge" +
- s" ${stage.shuffleDep.shuffleMergeId} with
${stage.shuffleDep.getMergerLocs.size}" +
- s" merger locations")
-
mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId,
- stage.shuffleDep.getMergerLocs)
- }
+ }.foreach { case (_, stage: ShuffleMapStage) =>
+ configureShufflePushMergerLocations(stage)
+ if (stage.shuffleDep.getMergerLocs.nonEmpty) {
+ logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" +
+ s" ${stage.shuffleDep.shuffleId} and shuffle merge" +
+ s" ${stage.shuffleDep.shuffleMergeId} with
${stage.shuffleDep.getMergerLocs.size}" +
+ s" merger locations")
}
+ }
}
}
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index d30272c51be..681a812e880 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -32,7 +32,7 @@ import com.google.common.cache.CacheBuilder
import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging}
-import org.apache.spark.network.shuffle.ExternalBlockStoreClient
+import org.apache.spark.network.shuffle.{ExternalBlockStoreClient,
RemoteBlockPushResolver}
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext,
RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages,
CoarseGrainedSchedulerBackend}
@@ -321,14 +321,6 @@ class BlockManagerMasterEndpoint(
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- val removeMsg = RemoveShuffle(shuffleId)
- val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
- bm.storageEndpoint.ask[Boolean](removeMsg).recover {
- // use false as default value means no shuffle data were removed
- handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
- }
- }.toSeq
-
// Find all shuffle blocks on executors that are no longer running
val blocksToDeleteByShuffleService =
new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
@@ -366,8 +358,32 @@ class BlockManagerMasterEndpoint(
}
}.getOrElse(Seq.empty)
+ val removeShuffleMergeFromShuffleServicesFutures =
+ externalBlockStoreClient.map { shuffleClient =>
+ val mergerLocations =
+ if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
+ mapOutputTracker.getShufflePushMergerLocations(shuffleId)
+ } else {
+ Seq.empty[BlockManagerId]
+ }
+ mergerLocations.map { bmId =>
+ Future[Boolean] {
+ shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId,
+ RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)
+ }
+ }
+ }.getOrElse(Seq.empty)
+
+ val removeMsg = RemoveShuffle(shuffleId)
+ val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+ bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+ // use false as default value means no shuffle data were removed
+ handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ }
+ }.toSeq
Future.sequence(removeShuffleFromExecutorsFutures ++
- removeShuffleFromShuffleServicesFutures)
+ removeShuffleFromShuffleServicesFutures ++
+ removeShuffleMergeFromShuffleServicesFutures)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index a13527f4b74..dfad4a924d7 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -17,12 +17,15 @@
package org.apache.spark
+import java.util.{Collections => JCollections, HashSet => JHashSet}
import java.util.concurrent.atomic.LongAdder
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.LocalSparkContext._
@@ -30,10 +33,11 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT,
RPC_MESSAGE_MAX_SIZE}
import org.apache.spark.internal.config.Tests.IS_TESTING
-import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
+import org.apache.spark.network.shuffle.ExternalBlockStoreClient
+import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpoint,
RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{CompressedMapStatus,
HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId,
ShuffleMergedBlockId}
+import org.apache.spark.storage.{BlockManagerId, BlockManagerMasterEndpoint,
ShuffleBlockId, ShuffleMergedBlockId}
class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
private val conf = new SparkConf
@@ -913,9 +917,63 @@ class MapOutputTrackerSuite extends SparkFunSuite with
LocalSparkContext {
slaveRpcEnv.shutdown()
}
+ private def fetchDeclaredField(value: AnyRef, fieldName: String): AnyRef = {
+ val field = value.getClass.getDeclaredField(fieldName)
+ field.setAccessible(true)
+ field.get(value)
+ }
+
+ private def lookupBlockManagerMasterEndpoint(sc: SparkContext):
BlockManagerMasterEndpoint = {
+ val rpcEnv = sc.env.rpcEnv
+ val dispatcher = fetchDeclaredField(rpcEnv, "dispatcher")
+ fetchDeclaredField(dispatcher, "endpointRefs").
+ asInstanceOf[java.util.Map[RpcEndpoint, RpcEndpointRef]].asScala.
+ filter(_._1.isInstanceOf[BlockManagerMasterEndpoint]).
+ head._1.asInstanceOf[BlockManagerMasterEndpoint]
+ }
+
+ test("SPARK-40480: shuffle remove should cleanup merged files as well") {
+ val newConf = new SparkConf
+ newConf.set("spark.shuffle.push.enabled", "true")
+ newConf.set("spark.shuffle.service.enabled", "true")
+ newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+ newConf.set(IS_TESTING, true)
+
+ val SHUFFLE_ID = 10
+ withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) {
sc =>
+ val masterTracker =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
+ val blockStoreClient = mock(classOf[ExternalBlockStoreClient])
+ val bmMaster = lookupBlockManagerMasterEndpoint(sc)
+ val field =
bmMaster.getClass.getDeclaredField("externalBlockStoreClient")
+ field.setAccessible(true)
+ field.set(bmMaster, Some(blockStoreClient))
+
+ masterTracker.registerShuffle(SHUFFLE_ID, 10, 10)
+ val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x",
s"host-$x", x))
+ masterTracker.registerShufflePushMergerLocations(SHUFFLE_ID, mergerLocs)
+
+
assert(masterTracker.getShufflePushMergerLocations(SHUFFLE_ID).map(_.host).toSet
==
+ mergerLocs.map(_.host).toSet)
+
+ val foundHosts = JCollections.synchronizedSet(new JHashSet[String]())
+ when(blockStoreClient.removeShuffleMerge(any(), any(), any(),
any())).thenAnswer(
+ (m: InvocationOnMock) => {
+ val host = m.getArgument(0).asInstanceOf[String]
+ val shuffleId = m.getArgument(2).asInstanceOf[Int]
+ assert(shuffleId == SHUFFLE_ID)
+ foundHosts.add(host)
+ true
+ })
+
+ sc.cleaner.get.doCleanupShuffle(SHUFFLE_ID, blocking = true)
+ assert(foundHosts.asScala == mergerLocs.map(_.host).toSet)
+ }
+ }
+
test("SPARK-34826: Adaptive shuffle mergers") {
val newConf = new SparkConf
- newConf.set("spark.shuffle.push.based.enabled", "true")
+ newConf.set("spark.shuffle.push.enabled", "true")
newConf.set("spark.shuffle.service.enabled", "true")
// needs TorrentBroadcast so need a SparkContext
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]