toutane commented on code in PR #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298#discussion_r3419274698
##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -143,36 +213,34 @@ impl ExecutionPlan for IcebergTableScan {
fn execute(
&self,
- _partition: usize,
+ partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
- let fut = get_batch_stream(
+ let bucket = match &self.buckets {
+ Some(buckets) =>
Some(buckets.get(partition).cloned().ok_or_else(|| {
Review Comment:
Done: `buckets` is now `Option<Arc<[Arc<[FileScanTask]>]>>.
##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -187,57 +255,107 @@ impl DisplayAs for IcebergTableScan {
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
+ let projection = self
+ .projection
+ .as_deref()
+ .map_or(String::new(), |v| v.join(","));
+ let predicate = self
+ .predicates
+ .as_ref()
+ .map_or(String::new(), |p| p.to_string());
+
write!(
f,
- "IcebergTableScan projection:[{}] predicate:[{}]",
- self.projection
- .clone()
- .map_or(String::new(), |v| v.join(",")),
- self.predicates
- .clone()
- .map_or(String::from(""), |p| format!("{p}"))
+ "{} projection:[{projection}] predicate:[{predicate}]",
+ self.name()
)?;
+ if let Some(buckets) = &self.buckets {
+ let file_count = self.total_file_count();
+ let bucket_count = buckets.len();
+ write!(f, " buckets:[{bucket_count}] file_count:[{file_count}]")?;
+ }
if let Some(limit) = self.limit {
write!(f, " limit:[{limit}]")?;
}
Ok(())
}
}
-/// Asynchronously retrieves a stream of [`RecordBatch`] instances
-/// from a given table.
-///
-/// This function initializes a [`TableScan`], builds it,
-/// and then converts it into a stream of Arrow [`RecordBatch`]es.
-async fn get_batch_stream(
+fn build_table_scan(
table: Table,
snapshot_id: Option<i64>,
column_names: Option<Vec<String>>,
predicates: Option<Predicate>,
-) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
+) -> DFResult<TableScan> {
let scan_builder = match snapshot_id {
- Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
+ Some(id) => table.scan().snapshot_id(id),
None => table.scan(),
};
-
let mut scan_builder = match column_names {
- Some(column_names) => scan_builder.select(column_names),
+ Some(names) => scan_builder.select(names),
None => scan_builder.select_all(),
};
if let Some(pred) = predicates {
scan_builder = scan_builder.with_filter(pred);
}
- let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
+ scan_builder.build().map_err(to_datafusion_error)
+}
+
+/// Builds the `RecordBatch` stream for a single partition. When `bucket` is
+/// `Some`, streams the pre-planned tasks via `to_arrow_from_tasks`; when
+/// `None`, plans and reads the full scan via `to_arrow`.
+async fn build_record_batch_stream(
Review Comment:
Done, the stream is produced by the `match` and the error mapping is applied
in only one location.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]