ariel-miculas commented on code in PR #21478:
URL: https://github.com/apache/datafusion/pull/21478#discussion_r3078305447


##########
datafusion/datasource-json/src/boundary_stream.rs:
##########
@@ -90,10 +90,52 @@ async fn get_stream(
     range: std::ops::Range<u64>,
 ) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
     let opts = GetOptions {
-        range: Some(GetRange::Bounded(range)),
+        range: Some(GetRange::Bounded(range.clone())),
         ..Default::default()
     };
     let result = store.get_opts(&location, opts).await?;
+
+    #[cfg(not(target_arch = "wasm32"))]
+    if let GetResultPayload::File(mut file, _path) = result.payload {
+        use std::io::{Read, Seek, SeekFrom};
+        const CHUNK_SIZE: u64 = 8 * 1024;
+
+        file.seek(SeekFrom::Start(range.start)).map_err(|e| {
+            object_store::Error::Generic {
+                store: "local",
+                source: Box::new(e),
+            }
+        })?;
+
+        return Ok(futures::stream::try_unfold(

Review Comment:
   Indeed, I implemented blocking IO on the GetResult::File path because that's 
how it worked previously. Other approaches I've considered:
   * 
[block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html)
 - it panics with single-threaded tokio runtimes
   * a single spawn_blocking thread with an [mpsc 
channel](https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.channel.html) - this 
could work but not as straightforward, it requires more careful design, see 
https://github.com/apache/datafusion/pull/15654
   * using tokio's ReaderStream - this would be similar to the existing 
`into_stream` approach, but the default buffer size (in 
tokio-1.50.0/src/io/blocking.rs) is 2MiB `DEFAULT_MAX_BUF_SIZE: usize = 2 * 
1024 * 1024`; it still involves context switching overhead and it requires 
importing additional libraries, such as `tokio-util` and the `tokio fs` feature
   ```
       #[cfg(not(target_arch = "wasm32"))]
       if let GetResultPayload::File(file, _path) = result.payload {
           use std::io::SeekFrom;
           use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader};
           let mut tokio_file: tokio::fs::File = 
tokio::fs::File::from_std(file);
           tokio_file
               .seek(SeekFrom::Start(range.start))
               .await
               .map_err(|e| object_store::Error::Generic {
                   store: "local",
                   source: Box::new(e),
               })?;
           const BUF_SIZE: usize = 4 * 1024 * 1024; // 4 MiB
           let limited = tokio_file.take(range.end - range.start);
           return Ok(tokio_util::io::ReaderStream::with_capacity(
               BufReader::with_capacity(BUF_SIZE, limited),
               BUF_SIZE,
           )
           .map_err(|e| object_store::Error::Generic {
               store: "local",
               source: Box::new(e),
           })
           .boxed());
       }
   ```
   * keep using `into_stream` but increase the buffer size (from the original 
8KiB) - this would significantly reduce the number of context switches
   
   I think there are better approaches, but since the original code was doing 
blocking IO in an async context (which it [probably shouldn't 
do](https://ryhl.io/blog/async-what-is-blocking/)), it would be difficult to 
achieve the same performance with any of these other approaches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to