liurenjie1024 commented on code in PR #373: URL: https://github.com/apache/iceberg-rust/pull/373#discussion_r1689787900
########## crates/iceberg/src/scan.rs: ########## @@ -197,134 +193,103 @@ impl<'a> TableScanBuilder<'a> { field_ids.push(field_id); } - Ok(TableScan { + let snapshot_bound_predicate = if let Some(ref predicates) = self.filter { + Some(predicates.bind(schema.clone(), true)?) + } else { + None + }; + + let plan_context = PlanContext { snapshot, - file_io: self.table.file_io().clone(), table_metadata: self.table.metadata_ref(), - column_names: self.column_names, - field_ids, - bound_predicates, - schema, - batch_size: self.batch_size, + snapshot_schema: schema, case_sensitive: self.case_sensitive, - filter: self.filter.map(Arc::new), + predicate: self.filter.map(Arc::new), + snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), + file_io: self.table.file_io().clone(), + field_ids: Arc::new(field_ids), + partition_filter_cache: Arc::new(PartitionFilterCache::new()), + manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), + expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + }; + + Ok(TableScan { + batch_size: self.batch_size, + column_names: self.column_names, + file_io: self.table.file_io().clone(), + plan_context, }) } } /// Table scan. #[derive(Debug)] pub struct TableScan { - snapshot: SnapshotRef, - table_metadata: TableMetadataRef, + plan_context: PlanContext, + batch_size: Option<usize>, file_io: FileIO, column_names: Vec<String>, - field_ids: Vec<i32>, - bound_predicates: Option<BoundPredicate>, - schema: SchemaRef, - batch_size: Option<usize>, +} + +/// PlanContext wraps a [`SnapshotRef`] alongside all the other +/// objects that are required to perform a scan file plan. +#[derive(Debug)] +struct PlanContext { + snapshot: SnapshotRef, + + table_metadata: TableMetadataRef, + snapshot_schema: SchemaRef, case_sensitive: bool, - filter: Option<Arc<Predicate>>, + predicate: Option<Arc<Predicate>>, + snapshot_bound_predicate: Option<Arc<BoundPredicate>>, + file_io: FileIO, + field_ids: Arc<Vec<i32>>, + + partition_filter_cache: Arc<PartitionFilterCache>, + manifest_evaluator_cache: Arc<ManifestEvaluatorCache>, + expression_evaluator_cache: Arc<ExpressionEvaluatorCache>, } impl TableScan { /// Returns a stream of [`FileScanTask`]s. pub async fn plan_files(&self) -> Result<FileScanTaskStream> { - let context = FileScanStreamContext::new( - self.schema.clone(), - self.snapshot.clone(), - self.table_metadata.clone(), - self.file_io.clone(), - self.filter.clone(), - self.case_sensitive, - )?; - - let mut partition_filter_cache = PartitionFilterCache::new(); - let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); - let mut expression_evaluator_cache = ExpressionEvaluatorCache::new(); - - let field_ids = self.field_ids.clone(); - let bound_predicates = self.bound_predicates.clone(); - - Ok(try_stream! { - let manifest_list = context - .snapshot - .load_manifest_list(&context.file_io, &context.table_metadata) - .await?; - - for entry in manifest_list.entries() { - if !Self::content_type_is_data(entry) { - continue; - } - - let partition_spec_id = entry.partition_spec_id; - - let partition_filter = partition_filter_cache.get( - partition_spec_id, - &context, - )?; - - if let Some(partition_filter) = partition_filter { - let manifest_evaluator = manifest_evaluator_cache.get( - partition_spec_id, - partition_filter, - ); - - if !manifest_evaluator.eval(entry)? { - continue; - } - } + // used to stream ManifestEntryContexts between stages of the file plan operation + let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) = + channel(CONCURRENCY_LIMIT_MANIFEST_FILES); + // used to stream the results back to the caller + let (file_scan_task_tx, file_scan_task_rx) = channel(CONCURRENCY_LIMIT_MANIFEST_ENTRIES); Review Comment: Dittol ########## crates/iceberg/src/scan.rs: ########## @@ -197,134 +193,103 @@ impl<'a> TableScanBuilder<'a> { field_ids.push(field_id); } - Ok(TableScan { + let snapshot_bound_predicate = if let Some(ref predicates) = self.filter { + Some(predicates.bind(schema.clone(), true)?) + } else { + None + }; + + let plan_context = PlanContext { snapshot, - file_io: self.table.file_io().clone(), table_metadata: self.table.metadata_ref(), - column_names: self.column_names, - field_ids, - bound_predicates, - schema, - batch_size: self.batch_size, + snapshot_schema: schema, case_sensitive: self.case_sensitive, - filter: self.filter.map(Arc::new), + predicate: self.filter.map(Arc::new), + snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), + file_io: self.table.file_io().clone(), + field_ids: Arc::new(field_ids), + partition_filter_cache: Arc::new(PartitionFilterCache::new()), + manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), + expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + }; + + Ok(TableScan { + batch_size: self.batch_size, + column_names: self.column_names, + file_io: self.table.file_io().clone(), + plan_context, }) } } /// Table scan. #[derive(Debug)] pub struct TableScan { - snapshot: SnapshotRef, - table_metadata: TableMetadataRef, + plan_context: PlanContext, + batch_size: Option<usize>, file_io: FileIO, column_names: Vec<String>, - field_ids: Vec<i32>, - bound_predicates: Option<BoundPredicate>, - schema: SchemaRef, - batch_size: Option<usize>, +} + +/// PlanContext wraps a [`SnapshotRef`] alongside all the other +/// objects that are required to perform a scan file plan. +#[derive(Debug)] +struct PlanContext { + snapshot: SnapshotRef, + + table_metadata: TableMetadataRef, + snapshot_schema: SchemaRef, case_sensitive: bool, - filter: Option<Arc<Predicate>>, + predicate: Option<Arc<Predicate>>, + snapshot_bound_predicate: Option<Arc<BoundPredicate>>, + file_io: FileIO, + field_ids: Arc<Vec<i32>>, + + partition_filter_cache: Arc<PartitionFilterCache>, + manifest_evaluator_cache: Arc<ManifestEvaluatorCache>, + expression_evaluator_cache: Arc<ExpressionEvaluatorCache>, } impl TableScan { /// Returns a stream of [`FileScanTask`]s. pub async fn plan_files(&self) -> Result<FileScanTaskStream> { - let context = FileScanStreamContext::new( - self.schema.clone(), - self.snapshot.clone(), - self.table_metadata.clone(), - self.file_io.clone(), - self.filter.clone(), - self.case_sensitive, - )?; - - let mut partition_filter_cache = PartitionFilterCache::new(); - let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); - let mut expression_evaluator_cache = ExpressionEvaluatorCache::new(); - - let field_ids = self.field_ids.clone(); - let bound_predicates = self.bound_predicates.clone(); - - Ok(try_stream! { - let manifest_list = context - .snapshot - .load_manifest_list(&context.file_io, &context.table_metadata) - .await?; - - for entry in manifest_list.entries() { - if !Self::content_type_is_data(entry) { - continue; - } - - let partition_spec_id = entry.partition_spec_id; - - let partition_filter = partition_filter_cache.get( - partition_spec_id, - &context, - )?; - - if let Some(partition_filter) = partition_filter { - let manifest_evaluator = manifest_evaluator_cache.get( - partition_spec_id, - partition_filter, - ); - - if !manifest_evaluator.eval(entry)? { - continue; - } - } + // used to stream ManifestEntryContexts between stages of the file plan operation + let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) = + channel(CONCURRENCY_LIMIT_MANIFEST_FILES); Review Comment: How about making this a config of table scan with default value? ########## crates/iceberg/src/spec/manifest_list.rs: ########## @@ -79,6 +79,11 @@ impl ManifestList { pub fn entries(&self) -> &[ManifestFile] { &self.entries } + + /// Take ownership of the entries in the manifest list, consuming it + pub fn consume_entries(self) -> Box<dyn Iterator<Item = ManifestFile>> { Review Comment: Why not just `impl IntoIterator`? ########## crates/iceberg/src/expr/predicate.rs: ########## @@ -668,6 +668,10 @@ pub enum BoundPredicate { Set(SetExpression<BoundReference>), } +/// Newtype to prevent accidentally confusing predicates that are bound to a partition with ones that are bound to a schema. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PartitionBoundPredicate(pub(crate) BoundPredicate); Review Comment: Hmm, I have concerns adding this struct, since it's essentially the same as `BoundPredicate`, and I guess using variable name would be enough? cc @Xuanwo @Fokko WDYT? ########## crates/iceberg/src/scan.rs: ########## @@ -197,134 +193,103 @@ impl<'a> TableScanBuilder<'a> { field_ids.push(field_id); } - Ok(TableScan { + let snapshot_bound_predicate = if let Some(ref predicates) = self.filter { + Some(predicates.bind(schema.clone(), true)?) + } else { + None + }; + + let plan_context = PlanContext { snapshot, - file_io: self.table.file_io().clone(), table_metadata: self.table.metadata_ref(), - column_names: self.column_names, - field_ids, - bound_predicates, - schema, - batch_size: self.batch_size, + snapshot_schema: schema, case_sensitive: self.case_sensitive, - filter: self.filter.map(Arc::new), + predicate: self.filter.map(Arc::new), + snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), + file_io: self.table.file_io().clone(), + field_ids: Arc::new(field_ids), + partition_filter_cache: Arc::new(PartitionFilterCache::new()), + manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), + expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + }; + + Ok(TableScan { + batch_size: self.batch_size, + column_names: self.column_names, + file_io: self.table.file_io().clone(), + plan_context, }) } } /// Table scan. #[derive(Debug)] pub struct TableScan { - snapshot: SnapshotRef, - table_metadata: TableMetadataRef, + plan_context: PlanContext, + batch_size: Option<usize>, file_io: FileIO, column_names: Vec<String>, - field_ids: Vec<i32>, - bound_predicates: Option<BoundPredicate>, - schema: SchemaRef, - batch_size: Option<usize>, +} + +/// PlanContext wraps a [`SnapshotRef`] alongside all the other +/// objects that are required to perform a scan file plan. +#[derive(Debug)] +struct PlanContext { + snapshot: SnapshotRef, + + table_metadata: TableMetadataRef, + snapshot_schema: SchemaRef, case_sensitive: bool, - filter: Option<Arc<Predicate>>, + predicate: Option<Arc<Predicate>>, + snapshot_bound_predicate: Option<Arc<BoundPredicate>>, + file_io: FileIO, + field_ids: Arc<Vec<i32>>, + + partition_filter_cache: Arc<PartitionFilterCache>, + manifest_evaluator_cache: Arc<ManifestEvaluatorCache>, + expression_evaluator_cache: Arc<ExpressionEvaluatorCache>, } impl TableScan { /// Returns a stream of [`FileScanTask`]s. pub async fn plan_files(&self) -> Result<FileScanTaskStream> { - let context = FileScanStreamContext::new( - self.schema.clone(), - self.snapshot.clone(), - self.table_metadata.clone(), - self.file_io.clone(), - self.filter.clone(), - self.case_sensitive, - )?; - - let mut partition_filter_cache = PartitionFilterCache::new(); - let mut manifest_evaluator_cache = ManifestEvaluatorCache::new(); - let mut expression_evaluator_cache = ExpressionEvaluatorCache::new(); - - let field_ids = self.field_ids.clone(); - let bound_predicates = self.bound_predicates.clone(); - - Ok(try_stream! { - let manifest_list = context - .snapshot - .load_manifest_list(&context.file_io, &context.table_metadata) - .await?; - - for entry in manifest_list.entries() { - if !Self::content_type_is_data(entry) { - continue; - } - - let partition_spec_id = entry.partition_spec_id; - - let partition_filter = partition_filter_cache.get( - partition_spec_id, - &context, - )?; - - if let Some(partition_filter) = partition_filter { - let manifest_evaluator = manifest_evaluator_cache.get( - partition_spec_id, - partition_filter, - ); - - if !manifest_evaluator.eval(entry)? { - continue; - } - } + // used to stream ManifestEntryContexts between stages of the file plan operation + let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) = + channel(CONCURRENCY_LIMIT_MANIFEST_FILES); + // used to stream the results back to the caller + let (file_scan_task_tx, file_scan_task_rx) = channel(CONCURRENCY_LIMIT_MANIFEST_ENTRIES); + + let manifest_list = self.plan_context.get_manifest_list().await?; + + // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any + // whose content type is not Data or whose partitions cannot match this + // scan's filter + let manifest_file_contexts = self + .plan_context + .build_manifest_file_contexts(manifest_list, manifest_entry_ctx_tx)?; + + // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s + futures::stream::iter(manifest_file_contexts) + .try_for_each_concurrent(CONCURRENCY_LIMIT_MANIFEST_FILES, |ctx| async move { + ctx.fetch_manifest_and_stream_manifest_entries().await + }) + .await?; + + // Process the [`ManifestEntry`] stream in parallel + manifest_entry_ctx_rx + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + .try_for_each_concurrent( + CONCURRENCY_LIMIT_MANIFEST_ENTRIES, + |(manifest_entry_context, tx)| async move { + crate::runtime::spawn(async move { + Self::process_manifest_entry(manifest_entry_context, tx).await Review Comment: To be honest, I have some concerns about generating one task for each manifest entry. Though future is cheap, this means to have hundreds of thousands of `Futures`. But this is just some concern, maybe we can refactor this later if we actually meet some problems. cc @Xuanwo WDYT? ########## crates/iceberg/src/scan.rs: ########## @@ -338,157 +303,463 @@ impl TableScan { arrow_reader_builder.build().read(self.plan_files().await?) } - /// Checks whether the [`ManifestContentType`] is `Data` or not. - fn content_type_is_data(entry: &ManifestFile) -> bool { - if let ManifestContentType::Data = entry.content { - return true; - } - false - } - /// Returns a reference to the column names of the table scan. pub fn column_names(&self) -> &[String] { &self.column_names } + /// Returns a reference to the snapshot of the table scan. + pub fn snapshot(&self) -> &SnapshotRef { + &self.plan_context.snapshot + } + + async fn process_manifest_entry( + manifest_entry_context: ManifestEntryContext, + mut file_scan_task_tx: Sender<Result<FileScanTask>>, + ) -> Result<()> { + // skip processing this manifest entry if it has been marked as deleted + if !manifest_entry_context.manifest_entry.is_alive() { + return Ok(()); + } + + // abort the plan if we encounter a manifest entry whose data file's + // content type is currently unsupported + if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Only Data files currently supported", + )); + } + + if let Some(ref snapshot_bound_predicate) = manifest_entry_context.snapshot_bound_predicate + { + let Some(ref partition_bound_predicate) = + manifest_entry_context.partition_bound_predicate + else { + panic!("partition_bound_predicate expected to always be present if snapshot_bound_predicate is present"); Review Comment: I'm not a big fan of this approach, how about make the field in `ManifestEntryContext` as `Option<(BoundPredicate, PartitionBoundPredicae)>`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org