qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r3037506500


##########
bindings/python/src/table.rs:
##########
@@ -2198,6 +2197,156 @@ impl LogScanner {
         Ok(df)
     }
 
+    fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+        let py = slf.py();
+
+        // Single lock for the generic async generator
+        static ASYNC_GEN_FN: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
+
+        let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+            let code = pyo3::ffi::c_str!(
+                r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+    # Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+    poll_method = getattr(scanner, method_name)
+    while True:
+        items = await poll_method(timeout_ms)
+        if items:
+            for item in items:
+                yield item
+"#
+            );
+            let globals = pyo3::types::PyDict::new(py);
+            py.run(code, Some(&globals), None).unwrap();
+            globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+        });
+
+        // Determine which internal method to call based on the scanner kind
+        let method_name = match slf.kind.as_ref() {
+            ScannerKind::Record(_) => "_async_poll",
+            ScannerKind::Batch(_) => "_async_poll_batches",
+        };
+
+        // Instantiate the generator with the scanner instance and the target 
method name
+        gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+    }
+
+    /// Perform a single bounded poll and return a list of ScanRecord objects.
+    ///
+    /// This is the async building block used by `__aiter__` to implement
+    /// `async for`. Each call does exactly one network poll (bounded by
+    /// `timeout_ms`), converts any results to Python objects, and returns
+    /// them as a list. An empty list signals a timeout (no data yet), not
+    /// end-of-stream.
+    ///
+    /// Args:
+    ///     timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+    ///
+    /// Returns:
+    ///     Awaitable that resolves to a list of ScanRecord objects
+    fn _async_poll<'py>(
+        &self,
+        py: Python<'py>,
+        timeout_ms: Option<i64>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let timeout_ms = timeout_ms.unwrap_or(1000);

Review Comment:
   Hi @leekeiabstraction, this is mentioned above.



##########
bindings/python/src/table.rs:
##########
@@ -2198,6 +2197,156 @@ impl LogScanner {
         Ok(df)
     }
 
+    fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+        let py = slf.py();
+
+        // Single lock for the generic async generator
+        static ASYNC_GEN_FN: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
+
+        let gen_fn = ASYNC_GEN_FN.get_or_init(py, || {
+            let code = pyo3::ffi::c_str!(
+                r#"
+async def _async_scan_generic(scanner, method_name, timeout_ms=1000):
+    # Dynamically resolve the polling method (e.g., _async_poll or 
_async_poll_batches)
+    poll_method = getattr(scanner, method_name)
+    while True:
+        items = await poll_method(timeout_ms)
+        if items:
+            for item in items:
+                yield item
+"#
+            );
+            let globals = pyo3::types::PyDict::new(py);
+            py.run(code, Some(&globals), None).unwrap();
+            globals.get_item("_async_scan_generic").unwrap().unwrap().unbind()
+        });
+
+        // Determine which internal method to call based on the scanner kind
+        let method_name = match slf.kind.as_ref() {
+            ScannerKind::Record(_) => "_async_poll",
+            ScannerKind::Batch(_) => "_async_poll_batches",
+        };
+
+        // Instantiate the generator with the scanner instance and the target 
method name
+        gen_fn.bind(py).call1((slf.into_bound_py_any(py)?, method_name))
+    }
+
+    /// Perform a single bounded poll and return a list of ScanRecord objects.
+    ///
+    /// This is the async building block used by `__aiter__` to implement
+    /// `async for`. Each call does exactly one network poll (bounded by
+    /// `timeout_ms`), converts any results to Python objects, and returns
+    /// them as a list. An empty list signals a timeout (no data yet), not
+    /// end-of-stream.
+    ///
+    /// Args:
+    ///     timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+    ///
+    /// Returns:
+    ///     Awaitable that resolves to a list of ScanRecord objects
+    fn _async_poll<'py>(
+        &self,
+        py: Python<'py>,
+        timeout_ms: Option<i64>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let timeout_ms = timeout_ms.unwrap_or(1000);
+        if timeout_ms < 0 {
+            return Err(FlussError::new_err(format!(
+                "timeout_ms must be non-negative, got: {timeout_ms}"
+            )));
+        }
+
+        let scanner = Arc::clone(&self.kind);
+        let projected_row_type = self.projected_row_type.clone();
+        let timeout = Duration::from_millis(timeout_ms as u64);
+
+        future_into_py(py, async move {
+            let core_scanner = match scanner.as_ref() {
+                ScannerKind::Record(s) => s,
+                ScannerKind::Batch(_) => {
+                    return Err(PyTypeError::new_err(
+                        "This internal method only supports record-based 
scanners. \
+                         For batch-based scanners, use 'async for' or 
'poll_record_batch' instead.",
+                    ));
+                }
+            };
+
+            let scan_records = core_scanner
+                .poll(timeout)
+                .await
+                .map_err(|e| FlussError::from_core_error(&e))?;
+
+            // Convert to Python list
+            Python::attach(|py| {
+                let mut result: Vec<Py<ScanRecord>> = Vec::new();
+                for (_, records) in scan_records.into_records_by_buckets() {
+                    for core_record in records {
+                        let scan_record =
+                            ScanRecord::from_core(py, &core_record, 
&projected_row_type)?;
+                        result.push(Py::new(py, scan_record)?);
+                    }
+                }
+                Ok(result)
+            })
+        })
+    }
+
+    /// Perform a single bounded poll and return a list of RecordBatch objects.
+    ///
+    /// This is the async building block used by `__aiter__` (batch mode) to
+    /// implement `async for`. Each call does exactly one network poll (bounded
+    /// by `timeout_ms`), converts any results to Python RecordBatch objects,
+    /// and returns them as a list. An empty list signals a timeout (no data
+    /// yet), not end-of-stream.
+    ///
+    /// Args:
+    ///     timeout_ms: Timeout in milliseconds for the network poll (default: 
1000)
+    ///
+    /// Returns:
+    ///     Awaitable that resolves to a list of RecordBatch objects
+    fn _async_poll_batches<'py>(
+        &self,
+        py: Python<'py>,
+        timeout_ms: Option<i64>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let timeout_ms = timeout_ms.unwrap_or(1000);

Review Comment:
   Hi @leekeiabstraction, this is mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to