qzyu999 commented on code in PR #438:
URL: https://github.com/apache/fluss-rust/pull/438#discussion_r2944095501
##########
bindings/python/src/table.rs:
##########
@@ -2199,6 +2200,171 @@ impl LogScanner {
Ok(df)
}
+ fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
+ let py = slf.py();
+
+ match slf.kind.as_ref() {
+ ScannerKind::Record(_) => {
+ static RECORD_ASYNC_GEN_FN: PyOnceLock<Py<PyAny>> =
PyOnceLock::new();
+ let gen_fn = RECORD_ASYNC_GEN_FN.get_or_init(py, || {
+ let code = pyo3::ffi::c_str!(
+ r#"
+async def _async_scan(scanner, timeout_ms=1000):
+ while True:
+ batch = await scanner._async_poll(timeout_ms)
+ if batch:
+ for record in batch:
+ yield record
+"#
+ );
+ let globals = pyo3::types::PyDict::new(py);
+ py.run(code, Some(&globals), None).unwrap();
+ globals.get_item("_async_scan").unwrap().unwrap().unbind()
+ });
+ gen_fn.bind(py).call1((slf.into_bound_py_any(py)?,))
+ }
+ ScannerKind::Batch(_) => {
+ static BATCH_ASYNC_GEN_FN: PyOnceLock<Py<PyAny>> =
PyOnceLock::new();
+ let gen_fn = BATCH_ASYNC_GEN_FN.get_or_init(py, || {
+ let code = pyo3::ffi::c_str!(
+ r#"
+async def _async_batch_scan(scanner, timeout_ms=1000):
Review Comment:
Hi @fresh-borzoni, made the changes in
3981fff33714680629fed5503a165d9589c007fe, tested locally and passing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]