qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3037506184


##########
bindings/python/src/table.rs:
##########
@@ -2198,6 +2197,156 @@ impl LogScanner {
         Ok(df)
     }
 
+    fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+        let py = slf.py();
+
+        // Single lock for the generic async generator
+        static ASYNC_GEN_FN: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
+
+        let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+            let code = pyo3::ffi::c_str!(
+                r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+    # Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+    poll_method = getattr(scanner, method_name)
+    while True:
+        items = await poll_method(timeout_ms)
+        if items:
+            for item in items:
+                yield item
+"#
+            );
+            let globals = pyo3::types::PyDict::new(py);
+            py.run(code, Some(&globals), None).unwrap();
+            globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+        });
+
+        // Determine which internal method to call based on the scanner kind
+        let method_name = match slf.kind.as_ref() {
+            ScannerKind::Record(_) => "_async_poll",
+            ScannerKind::Batch(_) => "_async_poll_batches",
+        };
+
+        // Instantiate the generator with the scanner instance and the target 
method name
+        gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+    }
+
+    /// Perform a single bounded poll and return a list of ScanRecord objects.
+    ///
+    /// This is the async building block used by `__aiter__` to implement
+    /// `async for`. Each call does exactly one network poll (bounded by
+    /// `timeout_ms`), converts any results to Python objects, and returns
+    /// them as a list. An empty list signals a timeout (no data yet), not
+    /// end-of-stream.
+    ///
+    /// Args:
+    ///     timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+    ///
+    /// Returns:
+    ///     Awaitable that resolves to a list of ScanRecord objects
+    fn _async_poll<'py>(
+        &self,
+        py: Python<'py>,
+        timeout_ms: Option<i64>,

Review Comment:
   Hi @leekeiabstraction, in d52113357d272192d8262f1de2b25006d50fb1ac I made 
this into a global variable, `DEFAULT_POLL_INTERVAL_MS` that is initialized (to 
1000) at the top of the script. It's then later referenced within the 
`_async_poll` and `_async_poll_batches` functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to