alamb commented on code in PR #21478:
URL: https://github.com/apache/datafusion/pull/21478#discussion_r3066525820


##########
datafusion/datasource-json/src/boundary_stream.rs:
##########
@@ -90,10 +90,52 @@ async fn get_stream(
     range: std::ops::Range<u64>,
 ) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
     let opts = GetOptions {
-        range: Some(GetRange::Bounded(range)),
+        range: Some(GetRange::Bounded(range.clone())),
         ..Default::default()
     };
     let result = store.get_opts(&location, opts).await?;
+
+    #[cfg(not(target_arch = "wasm32"))]
+    if let GetResultPayload::File(mut file, _path) = result.payload {
+        use std::io::{Read, Seek, SeekFrom};
+        const CHUNK_SIZE: u64 = 8 * 1024;
+
+        file.seek(SeekFrom::Start(range.start)).map_err(|e| {
+            object_store::Error::Generic {
+                store: "local",
+                source: Box::new(e),
+            }
+        })?;
+
+        return Ok(futures::stream::try_unfold(
+            (file, range.end - range.start),
+            move |(mut file, remaining)| async move {
+                if remaining == 0 {
+                    return Ok(None);
+                }
+                let to_read = remaining.min(CHUNK_SIZE);
+                let cap = usize::try_from(to_read).map_err(|e| {
+                    object_store::Error::Generic {
+                        store: "local",
+                        source: Box::new(e),
+                    }
+                })?;
+
+                let mut buf = Vec::with_capacity(cap);
+                let read =
+                    (&mut file)
+                        .take(to_read)
+                        .read_to_end(&mut buf)

Review Comment:
   This is blocking IO on a tokio task right? 



##########
datafusion/datasource-json/src/boundary_stream.rs:
##########
@@ -90,10 +90,52 @@ async fn get_stream(
     range: std::ops::Range<u64>,
 ) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
     let opts = GetOptions {
-        range: Some(GetRange::Bounded(range)),
+        range: Some(GetRange::Bounded(range.clone())),
         ..Default::default()
     };
     let result = store.get_opts(&location, opts).await?;
+
+    #[cfg(not(target_arch = "wasm32"))]
+    if let GetResultPayload::File(mut file, _path) = result.payload {
+        use std::io::{Read, Seek, SeekFrom};
+        const CHUNK_SIZE: u64 = 8 * 1024;
+
+        file.seek(SeekFrom::Start(range.start)).map_err(|e| {
+            object_store::Error::Generic {
+                store: "local",
+                source: Box::new(e),
+            }
+        })?;
+
+        return Ok(futures::stream::try_unfold(

Review Comment:
   I feel like the upstream object store tries to do the same thing
   
   
https://github.com/apache/arrow-rs-object-store/blob/v0.13.2/src/lib.rs#L1636-L1701
   
   Which then calls local stream: 
https://github.com/apache/arrow-rs-object-store/blob/main/src/local.rs#L926
   
   
   The major difference is that in object_store the work is done on a 
`spawn_blocking` thread rather than inline. 
   
   I am a bit worried about doing blocking IO on the main thread here
   
   I wonder if we could try the same approach as done upstream and run this on 
a different thread, but to Dandan's point:
   1. Use a larger buffer for the read (e.g. 256k, then slice to 8k for the 
output)
   2. buffer some of the IO to minimize the overhead (e.g. use 
[StreamExt::buffered](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.buffered)
   
   



##########
datafusion/datasource-json/src/boundary_stream.rs:
##########
@@ -90,10 +90,52 @@ async fn get_stream(
     range: std::ops::Range<u64>,
 ) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
     let opts = GetOptions {
-        range: Some(GetRange::Bounded(range)),
+        range: Some(GetRange::Bounded(range.clone())),
         ..Default::default()
     };
     let result = store.get_opts(&location, opts).await?;
+
+    #[cfg(not(target_arch = "wasm32"))]
+    if let GetResultPayload::File(mut file, _path) = result.payload {
+        use std::io::{Read, Seek, SeekFrom};
+        const CHUNK_SIZE: u64 = 8 * 1024;
+
+        file.seek(SeekFrom::Start(range.start)).map_err(|e| {
+            object_store::Error::Generic {
+                store: "local",
+                source: Box::new(e),
+            }
+        })?;
+
+        return Ok(futures::stream::try_unfold(

Review Comment:
   Update: after some more review, it seems like the current GetResult::File 
path does blocking IO as well so this isn't a regression
   
   🤷 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to