ariel-miculas commented on code in PR #21478:
URL: https://github.com/apache/datafusion/pull/21478#discussion_r3078305447
##########
datafusion/datasource-json/src/boundary_stream.rs:
##########
@@ -90,10 +90,52 @@ async fn get_stream(
range: std::ops::Range<u64>,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
let opts = GetOptions {
- range: Some(GetRange::Bounded(range)),
+ range: Some(GetRange::Bounded(range.clone())),
..Default::default()
};
let result = store.get_opts(&location, opts).await?;
+
+ #[cfg(not(target_arch = "wasm32"))]
+ if let GetResultPayload::File(mut file, _path) = result.payload {
+ use std::io::{Read, Seek, SeekFrom};
+ const CHUNK_SIZE: u64 = 8 * 1024;
+
+ file.seek(SeekFrom::Start(range.start)).map_err(|e| {
+ object_store::Error::Generic {
+ store: "local",
+ source: Box::new(e),
+ }
+ })?;
+
+ return Ok(futures::stream::try_unfold(
Review Comment:
Indeed, I implemented blocking IO on the GetResult::File path because that's
how it worked previously. Other approaches I've considered:
*
[block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html)
- it panics with single-threaded tokio runtimes
* a single spawn_blocking thread with an [mpsc
channel](https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.channel.html) - this
could work but not as straightforward, it requires more careful design, see
https://github.com/apache/datafusion/pull/15654
* using tokio's ReaderStream - this would be similar to the existing
`into_stream` approach, but the default buffer size (in
tokio-1.50.0/src/io/blocking.rs) is 2MiB `DEFAULT_MAX_BUF_SIZE: usize = 2 *
1024 * 1024`; it still involves context switching overhead and it requires
importing additional libraries, such as `tokio-util` and the `tokio fs` feature
```
#[cfg(not(target_arch = "wasm32"))]
if let GetResultPayload::File(file, _path) = result.payload {
use std::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
let mut tokio_file: tokio::fs::File =
tokio::fs::File::from_std(file);
tokio_file
.seek(SeekFrom::Start(range.start))
.await
.map_err(|e| object_store::Error::Generic {
store: "local",
source: Box::new(e),
})?;
const BUF_SIZE: usize = 4 * 1024 * 1024; // 4 MiB
let limited = tokio_file.take(range.end - range.start);
return Ok(tokio_util::io::ReaderStream::with_capacity(
BufReader::with_capacity(BUF_SIZE, limited),
BUF_SIZE,
)
.map_err(|e| object_store::Error::Generic {
store: "local",
source: Box::new(e),
})
.boxed());
}
```
* keep using `into_stream` but increase the buffer size (from the original
8KiB) - this would significantly reduce the number of context switches
I think there are better approaches, but since the original code was doing
blocking IO in an async context (which it [probably shouldn't
do](https://ryhl.io/blog/async-what-is-blocking/)), it would be difficult to
achieve the same performance with any of these other approaches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]