Copilot commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2915483929


##########
bindings/python/src/table.rs:
##########
@@ -2030,7 +2041,10 @@ impl LogScanner {
     ///     - Returns an empty ScanRecords if no records are available
     ///     - When timeout expires, returns an empty ScanRecords (NOT an error)
     fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<ScanRecords> {
-        let scanner = self.scanner.as_record()?;
+        let scanner_ref =
+            unsafe { &*(&self.state as *const 
std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
+        let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });

Review Comment:
   Avoid the `unsafe` pointer cast when accessing `self.state`. You can lock 
the mutex directly via `self.state.lock()` (or clone the `Arc` first) without 
`unsafe`; the current cast is unnecessary and introduces unsoundness risk if 
the field type ever changes.
   ```suggestion
           let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });
   ```



##########
bindings/python/src/table.rs:
##########
@@ -2199,6 +2226,90 @@ impl LogScanner {
         Ok(df)
     }
 
+    fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+        let py = slf.py();
+        let code = pyo3::ffi::c_str!(
+            r#"
+async def _adapter(obj):
+    while True:
+        try:
+            yield await obj.__anext__()
+        except StopAsyncIteration:
+            break
+"#
+        );
+        let globals = pyo3::types::PyDict::new(py);
+        py.run(code, Some(&globals), None)?;
+        let adapter = globals.get_item("_adapter")?.unwrap();
+        // Return adapt(self)
+        adapter.call1((slf.into_bound_py_any(py)?,))
+    }
+
+    fn __anext__<'py>(slf: PyRefMut<'py, Self>) -> PyResult<Option<Bound<'py, 
PyAny>>> {
+        let state_arc = slf.state.clone();
+        let projected_row_type = slf.projected_row_type.clone();
+        let py = slf.py();
+
+        let future = future_into_py(py, async move {
+            let mut state = state_arc.lock().await;
+
+            // 1. If we already have buffered records, pop and return 
immediately
+            if let Some(record) = state.pending_records.pop_front() {
+                return Ok(record.into_any());
+            }
+
+            // 2. Buffer is empty, we must poll the network for the next batch
+            // The underlying kind must be a Record-based scanner.
+            let scanner = match state.kind.as_record() {
+                Ok(s) => s,
+                Err(_) => {
+                    return Err(pyo3::exceptions::PyStopAsyncIteration::new_err(
+                        "Stream Ended",

Review Comment:
   `__anext__` treats the batch-based scanner variant as end-of-stream 
(`StopAsyncIteration`). That will silently terminate `async for` on scanners 
created via `create_record_batch_log_scanner()`, and it also masks the helpful 
error message from `as_record()`. Either implement async iteration for the 
batch variant (yielding `RecordBatch`/Arrow), or raise a `TypeError` explaining 
that async iteration is only supported for record scanners.
   ```suggestion
                       return Err(PyTypeError::new_err(
                           "Async iteration is only supported for record 
scanners; \
                            use create_record_log_scanner() instead.",
   ```



##########
bindings/python/test/test_log_table.py:
##########
@@ -729,6 +729,55 @@ async def 
test_scan_records_indexing_and_slicing(connection, admin):
     await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_async_iterator(connection, admin):
+    """Test the Python asynchronous iterator loop (`async for`) on 
LogScanner."""
+    table_path = fluss.TablePath("fluss", "py_test_async_iterator")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+    )
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+    table = await connection.get_table(table_path)
+    writer = table.new_append().create_writer()
+    
+    # Write 5 records
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array(list(range(1, 6)), type=pa.int32()),
+             pa.array([f"async{i}" for i in range(1, 6)])],
+            schema=pa.schema([pa.field("id", pa.int32()), pa.field("val", 
pa.string())]),
+        )
+    )
+    await writer.flush()
+
+    scanner = await table.new_scan().create_log_scanner()
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})
+
+    collected = []
+    
+    # Here is the magical Issue #424 async iterator logic at work:
+    async def consume_scanner():
+        async for record in scanner:
+            collected.append(record)
+            if len(collected) == 5:
+                break
+                
+    # We must race the consumption against a timeout so the test doesn't hang 
if the iterator is broken

Review Comment:
   The comment on this line is likely to exceed the repository’s configured 
Ruff line length (88) and may trigger E501 in CI. Please wrap/split the comment 
to keep lines within the configured limit.
   ```suggestion
       # We must race the consumption against a timeout so the test doesn't hang
       # if the iterator is broken
   ```



##########
bindings/python/src/table.rs:
##########
@@ -2114,7 +2131,10 @@ impl LogScanner {
     ///     - Returns an empty table (with correct schema) if no records are 
available
     ///     - When timeout expires, returns an empty table (NOT an error)
     fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
-        let scanner = self.scanner.as_batch()?;
+        let scanner_ref =
+            unsafe { &*(&self.state as *const 
std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
+        let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });

Review Comment:
   Remove the `unsafe` pointer cast when locking `self.state` in 
`poll_arrow()`. This can be expressed safely with `self.state.lock().await` 
(via `TOKIO_RUNTIME.block_on`) and avoids introducing UB hazards.
   ```suggestion
           let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });
   ```



##########
bindings/python/src/table.rs:
##########
@@ -2367,7 +2484,10 @@ impl LogScanner {
         py: Python,
         mut stopping_offsets: HashMap<fcore::metadata::TableBucket, i64>,
     ) -> PyResult<Py<PyAny>> {
-        let scanner = self.scanner.as_batch()?;
+        let scanner_ref =
+            unsafe { &*(&self.state as *const 
std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };

Review Comment:
   `poll_until_offsets()` also relies on the `unsafe` cast to access 
`self.state`. This should be refactored to lock `self.state` safely; keeping 
`unsafe` here is especially risky because this method can run for a long time 
and is on a hot path for `to_arrow()`.
   ```suggestion
           let scanner_ref = &self.state;
   ```



##########
bindings/python/src/table.rs:
##########
@@ -2079,7 +2093,10 @@ impl LogScanner {
     ///     - Returns an empty list if no batches are available
     ///     - When timeout expires, returns an empty list (NOT an error)
     fn poll_record_batch(&self, py: Python, timeout_ms: i64) -> 
PyResult<Vec<RecordBatch>> {
-        let scanner = self.scanner.as_batch()?;
+        let scanner_ref =
+            unsafe { &*(&self.state as *const 
std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
+        let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
+        let scanner = lock.kind.as_batch()?;
 

Review Comment:
   Same as in `poll()`: please remove the `unsafe` cast used to get 
`scanner_ref`. Lock `self.state` directly; keeping this `unsafe` here makes the 
method harder to reason about and can hide real lifetime/aliasing issues.
   ```suggestion
           let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });
           let scanner = lock.kind.as_batch()?;
   ```



##########
bindings/python/test/test_log_table.py:
##########
@@ -729,6 +729,55 @@ async def 
test_scan_records_indexing_and_slicing(connection, admin):
     await admin.drop_table(table_path, ignore_if_not_exists=False)
 
 
+async def test_async_iterator(connection, admin):
+    """Test the Python asynchronous iterator loop (`async for`) on 
LogScanner."""
+    table_path = fluss.TablePath("fluss", "py_test_async_iterator")
+    await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+    schema = fluss.Schema(
+        pa.schema([pa.field("id", pa.int32()), pa.field("val", pa.string())])
+    )
+    await admin.create_table(table_path, fluss.TableDescriptor(schema))
+
+    table = await connection.get_table(table_path)
+    writer = table.new_append().create_writer()
+    
+    # Write 5 records
+    writer.write_arrow_batch(
+        pa.RecordBatch.from_arrays(
+            [pa.array(list(range(1, 6)), type=pa.int32()),
+             pa.array([f"async{i}" for i in range(1, 6)])],
+            schema=pa.schema([pa.field("id", pa.int32()), pa.field("val", 
pa.string())]),
+        )
+    )
+    await writer.flush()
+
+    scanner = await table.new_scan().create_log_scanner()
+    num_buckets = (await admin.get_table_info(table_path)).num_buckets
+    scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in 
range(num_buckets)})

Review Comment:
   This test only covers `async for` on a record-based scanner 
(`create_log_scanner()`). Since `LogScanner` can also wrap the batch variant 
(`create_record_batch_log_scanner()`), consider adding a companion test for 
async iteration on the batch scanner (or explicitly asserting that async 
iteration is unsupported there) so the intended behavior is locked in by tests.



##########
bindings/python/src/table.rs:
##########
@@ -2199,6 +2226,90 @@ impl LogScanner {
         Ok(df)
     }
 
+    fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+        let py = slf.py();
+        let code = pyo3::ffi::c_str!(
+            r#"
+async def _adapter(obj):
+    while True:
+        try:
+            yield await obj.__anext__()
+        except StopAsyncIteration:
+            break
+"#
+        );
+        let globals = pyo3::types::PyDict::new(py);
+        py.run(code, Some(&globals), None)?;
+        let adapter = globals.get_item("_adapter")?.unwrap();
+        // Return adapt(self)
+        adapter.call1((slf.into_bound_py_any(py)?,))
+    }

Review Comment:
   `__aiter__` recompiles and executes Python source via `py.run()` on every 
iteration start. Consider caching the adapter function (e.g., in a 
`PyOnceLock`) or returning `self` directly as the async iterator if possible; 
this avoids repeated code compilation and reduces overhead per `async for` loop.



##########
bindings/python/src/table.rs:
##########
@@ -2199,6 +2226,90 @@ impl LogScanner {
         Ok(df)
     }
 
+    fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+        let py = slf.py();
+        let code = pyo3::ffi::c_str!(
+            r#"
+async def _adapter(obj):
+    while True:
+        try:
+            yield await obj.__anext__()
+        except StopAsyncIteration:
+            break
+"#
+        );
+        let globals = pyo3::types::PyDict::new(py);
+        py.run(code, Some(&globals), None)?;
+        let adapter = globals.get_item("_adapter")?.unwrap();
+        // Return adapt(self)
+        adapter.call1((slf.into_bound_py_any(py)?,))
+    }
+
+    fn __anext__<'py>(slf: PyRefMut<'py, Self>) -> PyResult<Option<Bound<'py, 
PyAny>>> {
+        let state_arc = slf.state.clone();
+        let projected_row_type = slf.projected_row_type.clone();
+        let py = slf.py();
+
+        let future = future_into_py(py, async move {
+            let mut state = state_arc.lock().await;
+
+            // 1. If we already have buffered records, pop and return 
immediately
+            if let Some(record) = state.pending_records.pop_front() {
+                return Ok(record.into_any());
+            }
+
+            // 2. Buffer is empty, we must poll the network for the next batch
+            // The underlying kind must be a Record-based scanner.
+            let scanner = match state.kind.as_record() {
+                Ok(s) => s,
+                Err(_) => {
+                    return Err(pyo3::exceptions::PyStopAsyncIteration::new_err(
+                        "Stream Ended",
+                    ));
+                }
+            };
+
+            // Poll with a reasonable internal timeout before unblocking the 
event loop
+            let timeout = core::time::Duration::from_millis(5000);
+
+            let mut current_records = scanner
+                .poll(timeout)
+                .await
+                .map_err(|e| FlussError::from_core_error(&e))?;
+
+            // If it's a real timeout with zero records, loop or throw 
StopAsyncIteration?
+            // Since it's a streaming log, we can yield None or block. 
Blocking requires a loop in the future.
+            while current_records.is_empty() {
+                current_records = scanner
+                    .poll(timeout)
+                    .await
+                    .map_err(|e| FlussError::from_core_error(&e))?;
+            }

Review Comment:
   `__anext__` holds `state_arc.lock()` across `scanner.poll(timeout).await` 
(and the retry loop). This blocks all other methods needing the same mutex 
(e.g., subscribe/unsubscribe/poll/to_arrow) for the full network wait time and 
can lead to poor responsiveness or deadlock-like behavior under concurrent use. 
Consider narrowing the critical section (e.g., split locks for `kind` vs 
`pending_records`, or temporarily take/move the scanner out of the state while 
polling).



##########
bindings/python/src/table.rs:
##########
@@ -2264,7 +2378,10 @@ impl LogScanner {
         py: Python,
         subscribed: &[(fcore::metadata::TableBucket, i64)],
     ) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
-        let scanner = self.scanner.as_batch()?;
+        let scanner_ref =
+            unsafe { &*(&self.state as *const 
std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
+        let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });

Review Comment:
   `query_latest_offsets()` uses the same `unsafe` cast pattern to lock 
`self.state`. Please replace this with a safe lock on `self.state` (or a cloned 
`Arc`) to avoid unnecessary `unsafe` in the Python bindings.
   ```suggestion
           let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });
   ```



##########
bindings/python/src/table.rs:
##########
@@ -2167,13 +2187,20 @@ impl LogScanner {
     /// Returns:
     ///     PyArrow Table containing all data from subscribed buckets
     fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
-        let scanner = self.scanner.as_batch()?;
-        let subscribed = scanner.get_subscribed_buckets();
-        if subscribed.is_empty() {
-            return Err(FlussError::new_err(
-                "No buckets subscribed. Call subscribe(), subscribe_buckets(), 
subscribe_partition(), or subscribe_partition_buckets() first.",
-            ));
-        }
+        let subscribed = {
+            let scanner_ref = unsafe {
+                &*(&self.state as *const 
std::sync::Arc<tokio::sync::Mutex<ScannerState>>)
+            };

Review Comment:
   `to_arrow()` also uses an `unsafe` cast to access `self.state`. This should 
be rewritten to safely clone/borrow `self.state` and lock it without `unsafe` 
to keep the bindings memory-safe.
   ```suggestion
               let scanner_ref = &self.state;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to