pantShrey commented on PR #21882:
URL: https://github.com/apache/datafusion/pull/21882#issuecomment-4787103055

   @alamb, extremely sorry for the delay in pushing the latest changes. While 
waiting on your guidance regarding the RecordBatch abstraction, I started 
working through the other review comments, but I ended up getting fairly stuck 
on a design decision around the writer adapter and spent more time on it than I 
expected.
   
   I did make the writer-side changes you suggested: `SpillWriter` now extends 
`std::io::Write`, which allowed me to remove the original `SpillWriteAdapter` 
entirely.
   
   However, removing the adapter exposed two issues.
   
   1) The first is metrics tracking. The adapter was previously the place where 
I could observe the exact number of compressed IPC bytes written to the backend 
and update both the global disk usage accounting and the `spilled_bytes` 
metrics. This also removed the need for `InProgressSpillFile` to repeatedly 
call `current_disk_usage()` / `update_disk_usage()`.
   
      Without that interception point, I'm struggling to see how to accurately 
track spill metrics. Today the only estimate available at the RecordBatch level 
is derived from `get_array_memory_size()`, which reflects the in-memory Arrow 
representation rather than the final serialized IPC payload. Once IPC 
compression (LZ4/ZSTD) is enabled, that value can differ substantially from the 
actual bytes written to the backend. Additionally, Arrow's `StreamWriter` 
doesn't expose the number of bytes written per batch, so there doesn't seem to 
be a way to observe the final serialized size other than wrapping the 
`std::io::Write` boundary itself.
   
      My current compromise is a very small tracking wrapper around 
`SpillWriter` that simply forwards `std::io::Write` calls while counting the 
exact serialized bytes written by Arrow. This keeps the metrics accurate for 
compressed spills and also allows backend-local quota tracking to remain at the 
write boundary rather than requiring every backend to implement 
`current_disk_usage()`-style APIs.
   
      Do you think this approach is acceptable, or would you prefer that I 
revert to the explicit disk-usage tracking approach instead? My concern with 
the latter is that it becomes difficult for non-filesystem backends to 
implement efficiently and increases the backend API surface, while still only 
providing an approximation of the actual spill size.
   
   2) The second issue is error propagation. Before this change, quota 
enforcement could return `DataFusionError::ResourcesExhausted` directly through 
the `datafusion_common::Result` path(`drain()` allowed skipping the io 
boundary). After moving the interface to `std::io::Write`, quota failures have 
to cross the `std::io::Error` boundary. As far as I can tell, that makes it 
difficult to preserve `ResourcesExhausted` semantics all the way up to callers 
such as the spill operators, which currently check for that specific error 
type. However, I audited the codebase (e.g., `row_hash.rs`, 
`nestedloopjoin.rs`) to see if losing this specific enum variant actually 
breaks any control flow. From what I can see, `ResourcesExhausted` is 
exclusively caught by operators to handle memory limits (which triggers the 
fallback to start spilling). Conversely, hitting a disk limit during an active 
spill has no fallback -- it is a fatal error that simply aborts the query. 
Because of this, I removed the ad
 apter's error-stashing workaround and allowed the disk quota failures to just 
bubble up as standard `std::io::Error`s with the descriptive text, since the 
end-user UX and control flow remain identical.
   
   
   On the read path, I'd also like to gently push back on returning `Box<dyn 
std::io::Read>`, if that's okay. The original implementation used 
`StreamReader<BufReader<File>>`, which required `spawn_blocking` to avoid 
blocking the async executor. Avoiding that thread-pool dependency was the 
original motivation for the state machine. I replaced that path with Arrow's 
`StreamDecoder` fed by `tokio::fs::File` + `ReaderStream`, which keeps the read 
side fully async without requiring `spawn_blocking`.
   
   You're absolutely right that this introduces an extra buffering step 
compared to `StreamReader`, so there is a real tradeoff there. My thinking was 
that the async behaviour was worth that cost, but I'm happy to discuss further 
if you feel differently.
   
   I've also started experimenting locally with the RecordBatch-level 
abstraction you suggested, but I didn't want to go too far without first 
getting your direction. Given the above, would you prefer that I continue down 
the RecordBatch route and revisit these APIs as part of that refactor, or 
should I finish the current approach first and keep the RecordBatch abstraction 
as a follow-up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to