Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b91a344
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Sep 18, 2022
6fb184d
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Sep 19, 2022
75e2013
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Oct 9, 2022
5b39ba3
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Oct 9, 2022
7cff74d
Fix UT
wankunde Oct 9, 2022
d4132b3
Update code
wankunde Oct 9, 2022
3a0b6a4
Fix UT
wankunde Oct 11, 2022
0a9730d
fix UT
wankunde Oct 12, 2022
0f24bb9
Update comment
wankunde Nov 23, 2022
c848da6
update RemoteBlockPushResolver.removeShuffleMerge() code
wankunde Dec 18, 2022
e54bb75
Save the latest shuffle merge id in appShuffleInfo.shuffles
wankunde Dec 20, 2022
4d2b940
Update code
wankunde Dec 22, 2022
5d3aa2d
Fix UT
wankunde Dec 23, 2022
d034985
Update code
wankunde Dec 30, 2022
11e7d9a
Fix UT
wankunde Dec 31, 2022
29f4918
Format codestyle
wankunde Jan 1, 2023
3a7d99c
move writeAppAttemptShuffleMergeInfoToDB to main thread
wankunde Jan 2, 2023
613a99a
remove try catch for file.delete()
wankunde Jan 7, 2023
96ba7e1
Simplify the code and update some comments
wankunde Jan 11, 2023
e06ef76
Add UT
wankunde Jan 12, 2023
43085d1
Update common/network-shuffle/src/main/java/org/apache/spark/network/…
wankunde Jan 13, 2023
371feae
Format code
wankunde Jan 13, 2023
6975bcc
Add a TODO for RemoveShuffleMerge RPC
wankunde Jan 13, 2023
475f8bd
Fix bug and backport UT
wankunde Jan 13, 2023
0f1f5eb
Change log level to debug
wankunde Jan 13, 2023
7284b8f
Change log level to debug
wankunde Jan 14, 2023
3181e64
Bug fix
wankunde Jan 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,18 @@ public void getMergedBlockMeta(
MergedBlocksMetaListener listener) {
throw new UnsupportedOperationException();
}

/**
* Remove the shuffle merge data in shuffle services
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param shuffleId shuffle id.
* @param shuffleMergeId shuffle merge id.
*
* @since 3.4.0
*/
public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuffleMergeId) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ protected void handleMessage(
} finally {
responseDelayContext.stop();
}
} else if (msgObj instanceof RemoveShuffleMerge) {
RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
checkAuth(client, msg.appId);
logger.info("Removing shuffle merge data for application {} shuffle {} shuffleMerge {}",
msg.appId, msg.shuffleId, msg.shuffleMergeId);
mergeManager.removeShuffleMerge(msg);
} else if (msgObj instanceof DiagnoseCorruption) {
DiagnoseCorruption msg = (DiagnoseCorruption) msgObj;
checkAuth(client, msg.appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,22 @@ public void onFailure(Throwable e) {
}
}

@Override
public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuffleMergeId) {
checkInit();
try {
TransportClient client = clientFactory.createClient(host, port);
client.send(
new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, shuffleMergeId)
.toByteBuffer());
} catch (Exception e) {
logger.error("Exception while sending RemoveShuffleMerge request to {}:{}",
host, port, e);
return false;
}
return true;
}

@Override
public MetricSet shuffleMetrics() {
checkInit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;

/**
* The MergedShuffleFileManager is used to process push based shuffle when enabled. It works
Expand Down Expand Up @@ -121,6 +122,14 @@ MergedBlockMeta getMergedBlockMeta(
*/
String[] getMergedBlockDirs(String appId);

/**
* Remove shuffle merge data files.
*
* @param removeShuffleMerge contains shuffle details (appId, shuffleId, etc) to uniquely
* identify a shuffle to be removed
*/
void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge);

/**
* Optionally close any resources associated the MergedShuffleFileManager, such as the
* leveldb for state persistence.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.util.TransportConf;

/**
Expand Down Expand Up @@ -84,4 +85,9 @@ public MergedBlockMeta getMergedBlockMeta(
public String[] getMergedBlockDirs(String appId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) {
throw new UnsupportedOperationException("Cannot handle merged shuffle remove");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.shuffledb.DB;
import org.apache.spark.network.shuffledb.DBBackend;
import org.apache.spark.network.shuffledb.DBIterator;
Expand All @@ -95,6 +96,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;

/**
* The flag for deleting the current merged shuffle data.
*/
public static final int DELETE_ALL_MERGED_SHUFFLE = -1;

private static final String DB_KEY_DELIMITER = ";";
private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler();
// ByteBuffer to respond to client upon a successful merge of a pushed block
Expand Down Expand Up @@ -396,6 +403,61 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
}
}

@Override
public void removeShuffleMerge(RemoveShuffleMerge msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
throw new IllegalArgumentException(
String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+ "with the current attempt id %s stored in shuffle service for application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
if (mergePartitionsInfo == null) {
if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
return null;
} else {
writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
}
}
boolean deleteCurrentMergedShuffle =
msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note on the DB updates: we dont need to update the DB if shuffle was finalized and merge id is not changing - mergePartitionsInfo.isFinalized() && shuffleMergeId == mergePartitionsInfo.shuffleMergeId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if(deleteCurrentMergedShuffle) {
// request to clean up shuffle we are currently hosting
if (!mergePartitionsInfo.isFinalized()) {
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
} else {
submitCleanupTask(() ->
deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
mergePartitionsInfo.getReduceIds(), false));
}
} else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space missing after if. Also the condition here is not necessary. It will always be true (IDE shows that as well).

throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " +
"application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ",
msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId));
} else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
// cleanup request for newer shuffle - remove the outdated data we have.
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
}
writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
return new AppShuffleMergePartitionsInfo(shuffleMergeId, true);
});
}

/**
* Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
* If cleanupLocalDirs is true, the merged shuffle files will also be deleted.
Expand Down Expand Up @@ -470,6 +532,61 @@ void closeAndDeleteOutdatedPartitions(
});
}

void deleteMergedFiles(
AppAttemptShuffleMergeId appAttemptShuffleMergeId,
AppShuffleInfo appShuffleInfo,
int[] reduceIds,
boolean deleteFromDB) {
if(deleteFromDB) {
removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
}
int shuffleId = appAttemptShuffleMergeId.shuffleId;
int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
int dataFilesDeleteCnt = 0;
int indexFilesDeleteCnt = 0;
int metaFilesDeleteCnt = 0;
for (int reduceId : reduceIds) {
try {
File dataFile =
appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId);
if (dataFile.delete()) {
dataFilesDeleteCnt++;
} else {
logger.warn("Fail to delete merged file {} for {}", dataFile, appAttemptShuffleMergeId);
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no Exception thrown during deletion (no security manager).
It returns a boolean indicating successful or failed delete.

We can keep track of that, and emit a consolidated log message at the end if dataFileDeleteCount > 0 || indexFileDeleteCount > 0 || metaDeleteCount > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we log warn messages if File.delete() return false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a best effort deferred task, which can conflict with concurrent cleanup (like app termination for example) - so a number of failures is expected.
For busy clusters, a warn message will quickly overwhelm the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a consolidated log message at the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix this @wankunde ?
Essentially, the changes are:

a) There is no exception thrown in this block - we dont need the try/catch.
b) When the delete fails, we dont need to do the warn (here and below) : this can happen if application exit is racing against remove shuffle - the info at the end of the method will suffice.

logger.error(String.format("Fail to delete merged data file for {} reduceId {}",
appAttemptShuffleMergeId, reduceId), e);
}
try {
File indexFile = new File(
appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId));
if (indexFile.delete()) {
indexFilesDeleteCnt++;
} else {
logger.warn("Fail to delete merged file {} for {}", indexFile, appAttemptShuffleMergeId);
}
} catch (Exception e) {
logger.error(String.format("Fail to delete merged index file for {} reduceId {}",
appAttemptShuffleMergeId, reduceId), e);
}
try {
File metaFile =
appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId);
if (metaFile.delete()) {
metaFilesDeleteCnt++;
} else {
logger.warn("Fail to delete merged file {} for {}", metaFile, appAttemptShuffleMergeId);
}
} catch (Exception e) {
logger.error(String.format("Fail to delete merged meta file for {} reduceId {}",
appAttemptShuffleMergeId, reduceId), e);
}
}
logger.info("Delete {} data files, {} index files, {} meta files for {}",
dataFilesDeleteCnt, indexFilesDeleteCnt, metaFilesDeleteCnt, appAttemptShuffleMergeId);
}

/**
* Remove the finalized shuffle partition information for a specific appAttemptShuffleMergeId
* @param appAttemptShuffleMergeId
Expand Down Expand Up @@ -712,6 +829,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds),
Longs.toArray(sizes));
appShuffleInfo.shuffles.get(msg.shuffleId).setReduceIds(Ints.toArray(reduceIds));
}
logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed",
msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
Expand Down Expand Up @@ -1465,6 +1583,8 @@ public static class AppShuffleMergePartitionsInfo {
private final int shuffleMergeId;
private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;

private final AtomicReference<int[]> reduceIds = new AtomicReference<>(new int[0]);

public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) {
this.shuffleMergeId = shuffleMergeId;
this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER :
Expand All @@ -1479,6 +1599,14 @@ public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
public boolean isFinalized() {
return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
}

public void setReduceIds(int[] reduceIds) {
this.reduceIds.set(reduceIds);
}

public int[] getReduceIds() {
return this.reduceIds.get();
}
}

/**
Expand Down Expand Up @@ -1687,9 +1815,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) {
try {
if (dataChannel.isOpen()) {
dataChannel.close();
if (delete) {
dataFile.delete();
}
}
if (delete) {
dataFile.delete();
}
} catch (IOException ioe) {
logger.warn("Error closing data channel for {} reduceId {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public enum Type {
FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11),
PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14),
FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17),
PUSH_BLOCK_RETURN_CODE(18);
PUSH_BLOCK_RETURN_CODE(18), REMOVE_SHUFFLE_MERGE(19);

private final byte id;

Expand Down Expand Up @@ -88,6 +88,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
case 16: return DiagnoseCorruption.decode(buf);
case 17: return CorruptionCause.decode(buf);
case 18: return BlockPushReturnCode.decode(buf);
case 19: return RemoveShuffleMerge.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
Expand Down
Loading