kosiew opened a new issue, #23073: URL: https://github.com/apache/datafusion/issues/23073
## Summary `TopK` [currently](https://github.com/apache/datafusion/tree/ba67bb467a30c3dbe7f78089451ea4c4d1a191d5) derives heap-boundary information in two places: 1. dynamic filter pushdown (`TopK::update_filter`) 2. prefix early completion (`TopK::attempt_early_completion`) Both paths depend on the same local heap boundary: `self.heap.max()`, the current worst row still kept by the TopK heap. The implementation is correct, but the boundary-related work is spread across multiple methods and has duplicated comparison/control-flow. Refactor the local heap-boundary handling into a small private helper so full sort-key threshold comparison, scalar threshold extraction, and common-prefix comparison are easier to reason about and harder to accidentally diverge. ## Current state Relevant file: - `datafusion/physical-plan/src/topk/mod.rs` Relevant code: - `TopK::update_filter` - reads `self.heap.max()` - compares the max row bytes against `TopKDynamicFilters::threshold_row` - extracts scalar threshold values with `heap.get_threshold_values(&self.expr)` - builds and publishes the dynamic filter expression - `TopK::attempt_early_completion` - reads `self.heap.max()` independently - computes the common-prefix row for the current batch's last row - computes the common-prefix row for the local heap max row - finishes when the batch prefix is strictly greater than the heap boundary prefix - `TopKDynamicFilters` - stores only the shared full sort-key threshold row and dynamic filter expression: ```rust pub struct TopKDynamicFilters { threshold_row: Option<Vec<u8>>, expr: Arc<DynamicFilterPhysicalExpr>, } ``` There is no `TopKThreshold` type and no shared common-prefix threshold in the current codebase. ## Problem The local heap-boundary concept is implicit. `update_filter` and `attempt_early_completion` both reason about the worst kept heap row, but each method performs its own extraction and comparison logic. This makes future changes fragile because boundary-related behavior can drift, for example: - full sort-key threshold comparison changes in `update_filter` but prefix-boundary comparison is not reviewed alongside it - scalar threshold extraction and row-byte threshold comparison are no longer clearly tied to the same heap max row - `attempt_early_completion` grows more special cases around prefix encoding without a named boundary helper - lock-gap recheck logic in `update_filter` remains harder to scan because threshold construction and publication are mixed together The goal is not to change semantics. The goal is to name and isolate the local heap boundary so reviewers can see that all derived threshold data comes from the same heap max row. ## Proposed refactor Introduce a small private helper in `topk/mod.rs` for local heap-boundary handling. Possible shape: ```rust struct TopKHeapBoundary<'a> { row: &'a TopKRow, } impl<'a> TopKHeapBoundary<'a> { fn full_sort_key(&self) -> &[u8]; fn is_more_selective_than(&self, current_threshold: Option<&[u8]>) -> bool; fn threshold_values(&self, heap: &TopKHeap, expr: &[PhysicalSortExpr]) -> Result<Option<Vec<ScalarValue>>>; fn prefix_row(&self, topk: &TopK, scratch: &mut Rows) -> Result<()>; } ``` Exact naming and ownership can differ. Keep it private to `topk/mod.rs`. The refactor should make call sites read more like: ```rust let Some(boundary) = self.current_heap_boundary() else { return Ok(()); }; if !boundary.is_more_selective_than(self.filter.read().threshold_row.as_deref()) { return Ok(()); } let Some(thresholds) = boundary.threshold_values(&self.heap, &self.expr)? else { return Ok(()); }; ``` For early completion, use a helper that makes the comparison intent explicit: ```rust if self.batch_prefix_exceeds_heap_boundary(batch, boundary)? { self.finished = true; } ``` ## Goals - Make the local heap-boundary concept explicit. - Keep full sort-key threshold bytes and scalar predicate values tied to the same heap max row. - Make prefix early-completion comparison easier to scan. - Reduce duplicated boundary extraction and comparison code. - Preserve behavior exactly. - Keep changes private to `topk/mod.rs`; no public API change. ## Non-goals - Do not change TopK semantics. - Do not change dynamic filter pushdown behavior. - Do not add a shared common-prefix threshold. - Do not change partitioning or `SortExec` planning behavior. - Do not introduce public types. ## Suggested implementation steps 1. Add a private helper for accessing the current heap boundary (`self.heap.max()`). 2. Move full sort-key selectivity comparison into that helper or a small named function. 3. Move heap-max prefix encoding/comparison behind a named helper used by `attempt_early_completion`. 4. Update `TopK::update_filter` to construct/read the boundary once and use named helpers for the read-lock fast path and write-lock recheck. 5. Update `TopK::attempt_early_completion` so the code says directly that the batch's last prefix is compared with the local heap boundary prefix. 6. Keep existing tests passing; add tests only if the refactor exposes an uncovered edge. ## Tests At minimum run: ```bash cargo test -p datafusion-physical-plan topk --lib ``` Relevant existing coverage in current code includes: - `topk::tests::test_try_finish_marks_finished_with_prefix` - `topk::tests::test_try_finish_fires_when_filter_rejects_entire_batch` - `topk::tests::test_topk_marks_filter_complete` If behavior is intentionally unchanged, no SQLLogicTest should be needed. ## Expected benefit This reduces future regression risk around TopK dynamic filtering and prefix early completion by giving the local heap boundary a single, named implementation point. It should make the code easier to review without changing runtime behavior. ## Related PR #22991 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
