comphead commented on PR #3845: URL: https://github.com/apache/datafusion-comet/pull/3845#issuecomment-4224808372
another thing to think about, I was investigating Spark shuffle mechanism,
specifically atomic commits for creating index and shuffle files.
```
- Atomic commit — both files are written to temp locations first, then
atomically renamed (IndexShuffleBlockResolver.writeMetadataFileAndCommit)
The commit happens inside writeMetadataFileAndCommit (line 358) and has
three layers of protection:
1. Write-to-temp, then rename
writeMetadataFile() at line 466:
1. Write offsets to tmpFile ← if crash here, final file is untouched
2. Delete old targetFile ← if it exists
3. Rename tmpFile → targetFile ← atomic on most filesystems (POSIX
guarantee)
The rename is the single point of commitment. Before the rename, the old
file (or no file) is visible. After the rename, the new complete file is
visible. There is no window where a reader sees a
half-written file.
Same pattern for the data file (line 413):
dataTmp.renameTo(dataFile) // atomic swap
2. Synchronized block for multi-attempt dedup
The entire commit is wrapped in this.synchronized (line 382). This matters
because Spark may speculatively re-launch a slow map task. Two attempts of the
same task can race to write the same shuffle
output. The synchronized block makes the check-then-write atomic:
this.synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile,
lengths.length)
if (existingLengths != null) {
// Another attempt already succeeded — discard our temp files,
// adopt the existing partition lengths
dataTmp.delete()
} else {
// We're first — rename our temp files into place
indexTmp.renameTo(indexFile)
dataTmp.renameTo(dataFile)
}
}
3. Validation via checkIndexAndDataFile (line 181)
Before accepting existing files, Spark validates them:
1. Index file size must be exactly (numPartitions + 1) * 8 bytes
2. First offset must be 0
3. Sum of all partition lengths must equal the data file size
If any check fails, it returns null — meaning "no valid output exists, go
ahead and write yours."
---
Why This Matters — The Failure Scenarios
Without atomic commit, these scenarios cause data corruption:
┌───────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────────────────────────────────────────────┐
│ Scenario │ What goes
wrong │ How
atomic commit helps │
├───────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ Crash mid-write │ Reducer reads half-written data file — gets
corrupted/truncated records │ Reducer only ever
sees the old file or the fully-written new file │
│ │
│ (rename is atomic)
│
├───────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ Speculative │ Two map attempts write the same output concurrently
— partial overwrite, index and data get out of │ synchronized block ensures
only one attempt wins; loser discards its │
│ execution │ sync
│ temp files
│
├───────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ Index/data │ Index says partition 2 starts at byte 10, but data
file was only partially written (8 bytes) — │ checkIndexAndDataFile
validates that sum(lengths) == dataFile.length() │
│ mismatch │ reducer reads past EOF or into wrong partition
│ before accepting
│
├───────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────────────────────────────────┤
│ Reader during │ Reducer fetches data while mapper is still writing —
sees incomplete partition │ Reader opens the final-named
file, which only appears after the atomic │
│ write │
│ rename of a complete file
│
└───────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────────────────────────────────────────────────────────┘
The key invariant: at any point in time, either no valid shuffle output
exists for this map task, or a fully consistent (index + data) pair exists.
There is never a moment where the index points to
data that doesn't exist or data exists without a matching index.
---
Ordering detail
Note the commit order matters:
1. Write index temp → rename to final index (line 408)
2. Write data temp → rename to final data (line 413)
Index is committed first. If a crash happens between steps 1 and 2,
checkIndexAndDataFile will detect the mismatch (lengths.sum != data.length())
on the next attempt and overwrite both files. This is
safe because a reducer won't be fetching until the task reports success to
the driver, which only happens after both renames complete.
```
I'm wondering can those crash in the middle cases can be tested out 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
