fresh-borzoni commented on code in PR #446:
URL: https://github.com/apache/fluss-rust/pull/446#discussion_r2963536835


##########
bindings/python/src/lib.rs:
##########
@@ -118,6 +118,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<Lookuper>()?;
     m.add_class::<Schema>()?;
     m.add_class::<LogScanner>()?;
+    m.add_class::<PyRecordBatchLogReader>()?;

Review Comment:
   isn't it internal iterator?



##########
bindings/python/src/table.rs:
##########
@@ -1908,8 +1941,6 @@ pub struct LogScanner {
     projected_schema: SchemaRef,
     /// The projected row type to use for record-based scanning
     projected_row_type: fcore::metadata::RowType,
-    /// Cache for partition_id -> partition_name mapping (avoids repeated 
list_partition_infos calls)
-    partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>,

Review Comment:
   Why have we removed this?



##########
bindings/python/src/table.rs:
##########
@@ -1856,6 +1855,40 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
         .unwrap_or_else(|_| "unknown".to_string())
 }
 
+/// Python iterator that lazily yields PyArrow RecordBatches from a
+/// [`fcore::client::RecordBatchLogReader`]. Used as the backing iterator
+/// for ``pa.RecordBatchReader.from_batches()``.
+///
+/// **Concurrency:** Do not call ``poll_arrow`` / ``poll_record_batch`` on the
+/// same logical scanner while this iterator is active.
+#[pyclass]
+pub struct PyRecordBatchLogReader {
+    reader: fcore::client::RecordBatchLogReader,
+}
+
+#[pymethods]
+impl PyRecordBatchLogReader {
+    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+        slf
+    }
+
+    fn __next__(&mut self, py: Python) -> PyResult<Option<Py<PyAny>>> {
+        let batch = py
+            .detach(|| TOKIO_RUNTIME.block_on(self.reader.next_batch()))

Review Comment:
   PyRecordBatchLogReader holds async RecordBatchLogReader and calls 
TOKIO_RUNTIME.block_on() directly, duplicating what SyncRecordBatchLogReader 
already does. 
   Per the design spec, Python should use the shared sync adapter wrapped in 
py.detach().



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -266,6 +277,14 @@ struct LogScannerInner {
     log_scanner_status: Arc<LogScannerStatus>,
     log_fetcher: LogFetcher,
     is_partitioned_table: bool,
+    arrow_schema: SchemaRef,
+    /// Serializes overlapping `poll` / `poll_batches` across clones sharing 
this `Arc`.
+    ///
+    /// TODO: Consider an API that consumes

Review Comment:
   Let's do it properly in this PR. The reader should take ownership of the 
scanner (move, not clone). That way the compiler prevents concurrent polls - no 
mutex needed. 



##########
crates/fluss/src/client/table/reader.rs:
##########
@@ -0,0 +1,500 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bounded log reader that polls until stopping offsets, then terminates.
+//!
+//! Unlike [`RecordBatchLogScanner`] which is unbounded (continuous streaming),
+//! [`RecordBatchLogReader`] reads log data up to a finite set of stopping
+//! offsets and then signals completion. This enables "snapshot-style" reads
+//! from a streaming log: capture the latest offsets, then consume all data
+//! up to those offsets.
+//!
+//! The reader also provides a synchronous 
[`arrow::record_batch::RecordBatchReader`]
+//! adapter via [`RecordBatchLogReader::to_record_batch_reader`] for Arrow
+//! ecosystem interop and FFI consumers (Python, C++).
+
+use crate::client::admin::FlussAdmin;
+use crate::client::table::RecordBatchLogScanner;
+use crate::error::{Error, Result};
+use crate::metadata::TableBucket;
+use crate::record::ScanBatch;
+use crate::rpc::message::OffsetSpec;
+use arrow::record_batch::RecordBatch;
+use arrow_schema::SchemaRef;
+use std::collections::{HashMap, VecDeque};
+use std::time::Duration;
+
+const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(500);
+
+/// Bounded log reader that consumes log data up to specified stopping offsets.
+///
+/// This type wraps a [`RecordBatchLogScanner`] and adds stopping semantics:
+/// it polls batches from the scanner, filters/slices them against per-bucket
+/// stopping offsets, and signals completion when all buckets are caught up.
+///
+/// # Concurrent use (important)
+///
+/// [`RecordBatchLogScanner`] is cheaply clonable and all clones share the same
+/// underlying scanner state (fetch buffer, subscription state, in-flight
+/// fetches). A `RecordBatchLogReader` typically takes one clone while the
+/// [`crate::client::FlussTable`] scan path may still hold another handle to 
the
+/// same logical scanner.
+///
+/// **Do not** interleave [`RecordBatchLogScanner::poll`] (or Python
+/// `poll_arrow` / `poll_record_batch`) with 
[`next_batch`](RecordBatchLogReader::next_batch)
+/// on scanners that share this state. Use either the bounded reader **or** the
+/// low-level poll loop for a given subscription session, not both at once.
+/// Overlapping calls fail fast with 
[`crate::error::Error::UnsupportedOperation`]
+/// (serialized in the client via `LogScannerInner::poll_session`).
+///
+/// # Construction
+///
+/// Use [`RecordBatchLogReader::new_until_latest`] for the common case of
+/// reading all currently-available data, or 
[`RecordBatchLogReader::new_until_offsets`]
+/// for custom stopping offsets.
+///
+/// # Async iteration
+///
+/// Call [`next_batch`](RecordBatchLogReader::next_batch) repeatedly to get
+/// `RecordBatch`es lazily, one at a time. Returns `None` when all buckets
+/// have reached their stopping offsets.
+///
+/// # Sync adapter
+///
+/// Call 
[`to_record_batch_reader`](RecordBatchLogReader::to_record_batch_reader)
+/// to get a synchronous [`arrow::record_batch::RecordBatchReader`] suitable
+/// for Arrow FFI consumers.
+pub struct RecordBatchLogReader {
+    scanner: RecordBatchLogScanner,
+    stopping_offsets: HashMap<TableBucket, i64>,
+    buffer: VecDeque<RecordBatch>,
+    schema: SchemaRef,
+}
+
+impl RecordBatchLogReader {
+    /// Create a reader that reads until the latest offsets at the time of 
creation.
+    ///
+    /// Queries the server for the current latest offset of each subscribed
+    /// bucket, then reads until those offsets are reached.
+    pub async fn new_until_latest(
+        scanner: RecordBatchLogScanner,
+        admin: &FlussAdmin,
+    ) -> Result<Self> {
+        let subscribed = scanner.get_subscribed_buckets();
+        if subscribed.is_empty() {
+            return Err(Error::IllegalArgument {
+                message: "No buckets subscribed. Call subscribe() before 
creating a reader."
+                    .to_string(),
+            });
+        }
+
+        let stopping_offsets = query_latest_offsets(admin, &scanner, 
&subscribed).await?;
+        let schema = scanner.schema();
+
+        Ok(Self {
+            scanner,
+            stopping_offsets,
+            buffer: VecDeque::new(),
+            schema,
+        })
+    }
+
+    /// Create a reader with explicit stopping offsets per bucket.
+    pub fn new_until_offsets(
+        scanner: RecordBatchLogScanner,
+        stopping_offsets: HashMap<TableBucket, i64>,
+    ) -> Self {
+        let schema = scanner.schema();
+        Self {
+            scanner,
+            stopping_offsets,
+            buffer: VecDeque::new(),
+            schema,
+        }
+    }
+
+    /// Returns the Arrow schema for batches produced by this reader.
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Drain all remaining batches until stopping offsets are satisfied.
+    ///
+    /// This is a convenience for callers (e.g. bindings building a single 
Arrow
+    /// table) that want to materialize the full result in Rust without 
per-batch
+    /// Python iteration.
+    pub async fn collect_all_batches(&mut self) -> Result<Vec<RecordBatch>> {
+        let mut out = Vec::new();
+        while let Some(b) = self.next_batch().await? {
+            out.push(b);
+        }
+        Ok(out)
+    }
+
+    /// Fetch the next `RecordBatch`, or `None` if all buckets are caught up.
+    ///
+    /// Each call may internally poll multiple batches from the scanner,
+    /// buffer them, and return one at a time. Batches that cross a stopping
+    /// offset boundary are sliced to exclude records at or beyond the stop 
point.
+    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
+        loop {
+            if let Some(batch) = self.buffer.pop_front() {
+                return Ok(Some(batch));
+            }
+
+            if self.stopping_offsets.is_empty() {
+                return Ok(None);
+            }
+
+            let scan_batches = self.scanner.poll(DEFAULT_POLL_TIMEOUT).await?;
+
+            if scan_batches.is_empty() {
+                continue;
+            }
+
+            filter_batches(scan_batches, &mut self.stopping_offsets, &mut 
self.buffer);
+        }
+    }
+
+    /// Convert this async reader into a synchronous 
[`arrow::record_batch::RecordBatchReader`].
+    ///
+    /// The returned adapter calls [`tokio::runtime::Handle::block_on`] on each
+    /// iterator step. **Do not** call this from inside a Tokio worker thread
+    /// while the same runtime is driving async work (nested `block_on` can
+    /// panic or deadlock). Prefer 
[`next_batch`](RecordBatchLogReader::next_batch)
+    /// in async Rust code. This is intended for sync/FFI boundaries (C++, some
+    /// Python call paths).
+    pub fn to_record_batch_reader(
+        self,
+        handle: tokio::runtime::Handle,
+    ) -> SyncRecordBatchLogReader {
+        SyncRecordBatchLogReader {
+            reader: self,
+            handle,
+        }
+    }
+}
+
+/// Synchronous adapter that implements 
[`arrow::record_batch::RecordBatchReader`].
+///
+/// Created via [`RecordBatchLogReader::to_record_batch_reader`].
+/// Blocks the current thread on each `next()` call using the provided
+/// Tokio runtime handle.
+pub struct SyncRecordBatchLogReader {
+    reader: RecordBatchLogReader,
+    handle: tokio::runtime::Handle,
+}
+
+impl Iterator for SyncRecordBatchLogReader {
+    type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        match self.handle.block_on(self.reader.next_batch()) {
+            Ok(Some(batch)) => Some(Ok(batch)),
+            Ok(None) => None,
+            Err(e) => 
Some(Err(arrow::error::ArrowError::ExternalError(Box::new(e)))),
+        }
+    }
+}
+
+impl arrow::record_batch::RecordBatchReader for SyncRecordBatchLogReader {
+    fn schema(&self) -> SchemaRef {
+        self.reader.schema()
+    }
+}
+
+/// Query latest offsets for all subscribed buckets, handling both partitioned
+/// and non-partitioned tables.
+async fn query_latest_offsets(
+    admin: &FlussAdmin,
+    scanner: &RecordBatchLogScanner,
+    subscribed: &[(TableBucket, i64)],
+) -> Result<HashMap<TableBucket, i64>> {
+    let table_path = scanner.table_path();
+
+    if !scanner.is_partitioned() {
+        let bucket_ids: Vec<i32> = subscribed.iter().map(|(tb, _)| 
tb.bucket_id()).collect();
+
+        let offsets = admin
+            .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest)
+            .await?;
+
+        let table_id = scanner.table_id();
+        Ok(offsets
+            .into_iter()
+            .filter(|(_, offset)| *offset > 0)
+            .map(|(bucket_id, offset)| (TableBucket::new(table_id, bucket_id), 
offset))
+            .collect())
+    } else {
+        query_partitioned_offsets(admin, scanner, subscribed).await
+    }
+}
+
+/// Query offsets for partitioned table subscriptions.
+async fn query_partitioned_offsets(
+    admin: &FlussAdmin,
+    scanner: &RecordBatchLogScanner,
+    subscribed: &[(TableBucket, i64)],
+) -> Result<HashMap<TableBucket, i64>> {
+    let table_path = scanner.table_path();
+    let table_id = scanner.table_id();
+
+    let partition_infos = admin.list_partition_infos(table_path).await?;
+    let partition_id_to_name: HashMap<i64, String> = partition_infos
+        .into_iter()
+        .map(|info| (info.get_partition_id(), info.get_partition_name()))
+        .collect();
+
+    let mut by_partition: HashMap<i64, Vec<i32>> = HashMap::new();
+    for (tb, _) in subscribed {
+        if let Some(partition_id) = tb.partition_id() {
+            by_partition
+                .entry(partition_id)
+                .or_default()
+                .push(tb.bucket_id());
+        }
+    }
+
+    let mut result: HashMap<TableBucket, i64> = HashMap::new();
+
+    for (partition_id, bucket_ids) in by_partition {
+        let partition_name =
+            partition_id_to_name
+                .get(&partition_id)
+                .ok_or_else(|| Error::UnexpectedError {
+                    message: format!("Unknown partition_id: {partition_id}"),
+                    source: None,
+                })?;
+
+        let offsets = admin
+            .list_partition_offsets(table_path, partition_name, &bucket_ids, 
OffsetSpec::Latest)
+            .await?;
+
+        for (bucket_id, offset) in offsets {
+            if offset > 0 {
+                let tb = TableBucket::new_with_partition(table_id, 
Some(partition_id), bucket_id);
+                result.insert(tb, offset);
+            }
+        }
+    }
+
+    Ok(result)
+}
+
+/// Filter and slice scan batches against per-bucket stopping offsets.
+///
+/// For each batch:
+/// - If the batch's bucket is not in `stopping_offsets`, skip it.
+/// - If `base_offset >= stop_at`, the bucket is exhausted; remove from map.
+/// - If `last_offset >= stop_at`, slice to keep only records before stop_at.
+/// - Otherwise, keep the full batch.
+///
+/// Accepted batches are pushed to `buffer`. Exhausted buckets are removed
+/// from `stopping_offsets`.
+fn filter_batches(
+    scan_batches: Vec<ScanBatch>,
+    stopping_offsets: &mut HashMap<TableBucket, i64>,
+    buffer: &mut VecDeque<RecordBatch>,
+) {
+    for scan_batch in scan_batches {
+        let bucket = scan_batch.bucket().clone();
+        let Some(&stop_at) = stopping_offsets.get(&bucket) else {
+            continue;
+        };
+
+        let base_offset = scan_batch.base_offset();
+        let last_offset = scan_batch.last_offset();
+
+        if base_offset >= stop_at {
+            stopping_offsets.remove(&bucket);
+            continue;
+        }
+
+        let batch = if last_offset >= stop_at {
+            let num_to_keep = (stop_at - base_offset) as usize;
+            let b = scan_batch.into_batch();
+            let limit = num_to_keep.min(b.num_rows());
+            b.slice(0, limit)
+        } else {
+            scan_batch.into_batch()
+        };
+
+        buffer.push_back(batch);
+
+        if last_offset >= stop_at - 1 {
+            stopping_offsets.remove(&bucket);

Review Comment:
   Shall we unsibscribe as well?



##########
crates/fluss/src/client/table/reader.rs:
##########
@@ -0,0 +1,500 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bounded log reader that polls until stopping offsets, then terminates.
+//!
+//! Unlike [`RecordBatchLogScanner`] which is unbounded (continuous streaming),
+//! [`RecordBatchLogReader`] reads log data up to a finite set of stopping
+//! offsets and then signals completion. This enables "snapshot-style" reads
+//! from a streaming log: capture the latest offsets, then consume all data
+//! up to those offsets.
+//!
+//! The reader also provides a synchronous 
[`arrow::record_batch::RecordBatchReader`]
+//! adapter via [`RecordBatchLogReader::to_record_batch_reader`] for Arrow
+//! ecosystem interop and FFI consumers (Python, C++).
+
+use crate::client::admin::FlussAdmin;
+use crate::client::table::RecordBatchLogScanner;
+use crate::error::{Error, Result};
+use crate::metadata::TableBucket;
+use crate::record::ScanBatch;
+use crate::rpc::message::OffsetSpec;
+use arrow::record_batch::RecordBatch;
+use arrow_schema::SchemaRef;
+use std::collections::{HashMap, VecDeque};
+use std::time::Duration;
+
+const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(500);
+
+/// Bounded log reader that consumes log data up to specified stopping offsets.
+///
+/// This type wraps a [`RecordBatchLogScanner`] and adds stopping semantics:
+/// it polls batches from the scanner, filters/slices them against per-bucket
+/// stopping offsets, and signals completion when all buckets are caught up.
+///
+/// # Concurrent use (important)
+///
+/// [`RecordBatchLogScanner`] is cheaply clonable and all clones share the same
+/// underlying scanner state (fetch buffer, subscription state, in-flight
+/// fetches). A `RecordBatchLogReader` typically takes one clone while the
+/// [`crate::client::FlussTable`] scan path may still hold another handle to 
the
+/// same logical scanner.
+///
+/// **Do not** interleave [`RecordBatchLogScanner::poll`] (or Python
+/// `poll_arrow` / `poll_record_batch`) with 
[`next_batch`](RecordBatchLogReader::next_batch)
+/// on scanners that share this state. Use either the bounded reader **or** the
+/// low-level poll loop for a given subscription session, not both at once.
+/// Overlapping calls fail fast with 
[`crate::error::Error::UnsupportedOperation`]
+/// (serialized in the client via `LogScannerInner::poll_session`).
+///
+/// # Construction
+///
+/// Use [`RecordBatchLogReader::new_until_latest`] for the common case of
+/// reading all currently-available data, or 
[`RecordBatchLogReader::new_until_offsets`]
+/// for custom stopping offsets.
+///
+/// # Async iteration
+///
+/// Call [`next_batch`](RecordBatchLogReader::next_batch) repeatedly to get
+/// `RecordBatch`es lazily, one at a time. Returns `None` when all buckets
+/// have reached their stopping offsets.
+///
+/// # Sync adapter
+///
+/// Call 
[`to_record_batch_reader`](RecordBatchLogReader::to_record_batch_reader)
+/// to get a synchronous [`arrow::record_batch::RecordBatchReader`] suitable
+/// for Arrow FFI consumers.
+pub struct RecordBatchLogReader {
+    scanner: RecordBatchLogScanner,
+    stopping_offsets: HashMap<TableBucket, i64>,
+    buffer: VecDeque<RecordBatch>,
+    schema: SchemaRef,
+}
+
+impl RecordBatchLogReader {
+    /// Create a reader that reads until the latest offsets at the time of 
creation.
+    ///
+    /// Queries the server for the current latest offset of each subscribed
+    /// bucket, then reads until those offsets are reached.
+    pub async fn new_until_latest(
+        scanner: RecordBatchLogScanner,
+        admin: &FlussAdmin,
+    ) -> Result<Self> {
+        let subscribed = scanner.get_subscribed_buckets();
+        if subscribed.is_empty() {
+            return Err(Error::IllegalArgument {
+                message: "No buckets subscribed. Call subscribe() before 
creating a reader."
+                    .to_string(),
+            });
+        }
+
+        let stopping_offsets = query_latest_offsets(admin, &scanner, 
&subscribed).await?;
+        let schema = scanner.schema();
+
+        Ok(Self {
+            scanner,
+            stopping_offsets,
+            buffer: VecDeque::new(),
+            schema,
+        })
+    }
+
+    /// Create a reader with explicit stopping offsets per bucket.
+    pub fn new_until_offsets(
+        scanner: RecordBatchLogScanner,
+        stopping_offsets: HashMap<TableBucket, i64>,
+    ) -> Self {
+        let schema = scanner.schema();
+        Self {
+            scanner,
+            stopping_offsets,
+            buffer: VecDeque::new(),
+            schema,
+        }
+    }
+
+    /// Returns the Arrow schema for batches produced by this reader.
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Drain all remaining batches until stopping offsets are satisfied.
+    ///
+    /// This is a convenience for callers (e.g. bindings building a single 
Arrow
+    /// table) that want to materialize the full result in Rust without 
per-batch
+    /// Python iteration.
+    pub async fn collect_all_batches(&mut self) -> Result<Vec<RecordBatch>> {
+        let mut out = Vec::new();
+        while let Some(b) = self.next_batch().await? {
+            out.push(b);
+        }
+        Ok(out)
+    }
+
+    /// Fetch the next `RecordBatch`, or `None` if all buckets are caught up.
+    ///
+    /// Each call may internally poll multiple batches from the scanner,
+    /// buffer them, and return one at a time. Batches that cross a stopping
+    /// offset boundary are sliced to exclude records at or beyond the stop 
point.
+    pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {

Review Comment:
   next_batch() returns RecordBatch discarding bucket/offset metadata that was 
in use before with ScanRecord



##########
crates/fluss/src/client/table/reader.rs:
##########
@@ -0,0 +1,500 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Bounded log reader that polls until stopping offsets, then terminates.
+//!
+//! Unlike [`RecordBatchLogScanner`] which is unbounded (continuous streaming),
+//! [`RecordBatchLogReader`] reads log data up to a finite set of stopping
+//! offsets and then signals completion. This enables "snapshot-style" reads
+//! from a streaming log: capture the latest offsets, then consume all data
+//! up to those offsets.
+//!
+//! The reader also provides a synchronous 
[`arrow::record_batch::RecordBatchReader`]
+//! adapter via [`RecordBatchLogReader::to_record_batch_reader`] for Arrow
+//! ecosystem interop and FFI consumers (Python, C++).
+
+use crate::client::admin::FlussAdmin;
+use crate::client::table::RecordBatchLogScanner;
+use crate::error::{Error, Result};
+use crate::metadata::TableBucket;
+use crate::record::ScanBatch;
+use crate::rpc::message::OffsetSpec;
+use arrow::record_batch::RecordBatch;
+use arrow_schema::SchemaRef;
+use std::collections::{HashMap, VecDeque};
+use std::time::Duration;
+
+const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(500);
+
+/// Bounded log reader that consumes log data up to specified stopping offsets.
+///
+/// This type wraps a [`RecordBatchLogScanner`] and adds stopping semantics:
+/// it polls batches from the scanner, filters/slices them against per-bucket
+/// stopping offsets, and signals completion when all buckets are caught up.
+///
+/// # Concurrent use (important)
+///
+/// [`RecordBatchLogScanner`] is cheaply clonable and all clones share the same
+/// underlying scanner state (fetch buffer, subscription state, in-flight
+/// fetches). A `RecordBatchLogReader` typically takes one clone while the
+/// [`crate::client::FlussTable`] scan path may still hold another handle to 
the
+/// same logical scanner.
+///
+/// **Do not** interleave [`RecordBatchLogScanner::poll`] (or Python
+/// `poll_arrow` / `poll_record_batch`) with 
[`next_batch`](RecordBatchLogReader::next_batch)
+/// on scanners that share this state. Use either the bounded reader **or** the
+/// low-level poll loop for a given subscription session, not both at once.
+/// Overlapping calls fail fast with 
[`crate::error::Error::UnsupportedOperation`]
+/// (serialized in the client via `LogScannerInner::poll_session`).
+///
+/// # Construction
+///
+/// Use [`RecordBatchLogReader::new_until_latest`] for the common case of
+/// reading all currently-available data, or 
[`RecordBatchLogReader::new_until_offsets`]
+/// for custom stopping offsets.
+///
+/// # Async iteration
+///
+/// Call [`next_batch`](RecordBatchLogReader::next_batch) repeatedly to get
+/// `RecordBatch`es lazily, one at a time. Returns `None` when all buckets
+/// have reached their stopping offsets.
+///
+/// # Sync adapter
+///
+/// Call 
[`to_record_batch_reader`](RecordBatchLogReader::to_record_batch_reader)
+/// to get a synchronous [`arrow::record_batch::RecordBatchReader`] suitable
+/// for Arrow FFI consumers.
+pub struct RecordBatchLogReader {
+    scanner: RecordBatchLogScanner,
+    stopping_offsets: HashMap<TableBucket, i64>,
+    buffer: VecDeque<RecordBatch>,
+    schema: SchemaRef,
+}
+
+impl RecordBatchLogReader {
+    /// Create a reader that reads until the latest offsets at the time of 
creation.
+    ///
+    /// Queries the server for the current latest offset of each subscribed
+    /// bucket, then reads until those offsets are reached.
+    pub async fn new_until_latest(
+        scanner: RecordBatchLogScanner,
+        admin: &FlussAdmin,
+    ) -> Result<Self> {
+        let subscribed = scanner.get_subscribed_buckets();
+        if subscribed.is_empty() {
+            return Err(Error::IllegalArgument {
+                message: "No buckets subscribed. Call subscribe() before 
creating a reader."
+                    .to_string(),
+            });
+        }
+
+        let stopping_offsets = query_latest_offsets(admin, &scanner, 
&subscribed).await?;

Review Comment:
   Buckets where subscribed offset >= latest offset stay in stopping_offsets 
forever, next_batch() loops indefinitely on empty polls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to