gene-bordegaray commented on code in PR #21931:
URL: https://github.com/apache/datafusion/pull/21931#discussion_r3188067905


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -202,6 +195,64 @@ fn combine_membership_and_bounds(
     }
 }
 
+/// Compute the global (envelope) min/max bounds across a set of partition 
bounds.
+///
+/// For each column index, returns the smallest min seen and the largest max 
seen.
+/// Columns where any partition is missing bounds, or where bounds are not 
totally
+/// ordered (e.g. mixed-type comparisons), are dropped from the global 
envelope.
+fn compute_global_bounds(per_partition: &[&PartitionBounds]) -> 
Option<PartitionBounds> {
+    let mut iter = per_partition.iter();
+    let first = iter.next()?;
+    let mut acc: Vec<Option<ColumnBounds>> = first
+        .column_bounds
+        .iter()
+        .map(|cb| Some(cb.clone()))
+        .collect();
+
+    for partition in iter {
+        if partition.column_bounds.len() != acc.len() {
+            return None;
+        }
+        for (slot, cb) in acc.iter_mut().zip(partition.column_bounds.iter()) {
+            let Some(existing) = slot.as_mut() else {
+                continue;
+            };
+            match cb.min.partial_cmp(&existing.min) {
+                Some(std::cmp::Ordering::Less) => existing.min = 
cb.min.clone(),
+                Some(_) => {}
+                None => {
+                    *slot = None;
+                    continue;
+                }
+            }
+            match cb.max.partial_cmp(&existing.max) {
+                Some(std::cmp::Ordering::Greater) => existing.max = 
cb.max.clone(),
+                Some(_) => {}
+                None => *slot = None,
+            }
+        }
+    }
+
+    let merged: Vec<ColumnBounds> = acc.into_iter().flatten().collect();

Review Comment:
   I believe this can cause an issue here. If we insert `None` when bounds 
can't be computed for a column, then call `flatten()`, this will remove that 
`None` slot in the Vec.
   
   Thus the slot of a col that was not computed correctly would cause other 
slots to shift and get incrrect bounds when we do `create_bounds_predicate()`



##########
datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs:
##########
@@ -343,6 +343,197 @@ impl PhysicalExpr for HashTableLookupExpr {
     }
 }
 
+/// Physical expression that probes the same join keys against multiple 
[`Map`]s
+/// and returns `true` for any row whose join keys match at least one map.
+///
+/// Equivalent to `OR`-ing several [`HashTableLookupExpr`]s, but
+/// `create_hashes` runs exactly once for the whole batch and every
+/// [`Map::HashMap`] probe shares the same hash buffer. All `HashMap`
+/// entries must therefore have been built with the same `RandomState`;
+/// [`Map::ArrayMap`] entries are queried via `contain_keys` and do not
+/// consume hashes.
+pub struct MultiMapLookupExpr {
+    /// Join-key expressions evaluated against each input batch.
+    on_columns: Vec<PhysicalExprRef>,
+    /// Hashing seed shared by every entry in `maps`.
+    random_state: SeededRandomState,
+    /// Build-side maps to OR over, one per partition.
+    maps: Vec<Arc<Map>>,

Review Comment:
   I noticed we aren't handling this is in the proto like we do with 
`HashTableLookupExpr`. For that we are handling it in 
`serialize_physical_expr_with_converter()` in `to_proto.rs` by passing 
`lit(true)` for now.



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -649,47 +700,157 @@ impl SharedBuildAccumulator {
                     }
                 }
 
-                let filter_expr = if has_canceled_unknown {
-                    let mut when_then_branches = empty_partition_ids
-                        .into_iter()
-                        .map(|partition_id| {
-                            (
-                                lit(ScalarValue::UInt64(Some(partition_id as 
u64))),
-                                lit(false),
-                            )
-                        })
-                        .collect::<Vec<_>>();
-                    when_then_branches.extend(real_branches);
-
-                    if when_then_branches.is_empty() {
-                        lit(true)
-                    } else {
-                        Arc::new(CaseExpr::try_new(
-                            Some(modulo_expr),
-                            when_then_branches,
-                            Some(lit(true)),
-                        )?) as Arc<dyn PhysicalExpr>
-                    }
-                } else if real_branches.is_empty() {
-                    lit(false)
-                } else if real_branches.len() == 1
-                    && empty_partition_ids.len() + 1 == num_partitions
-                {
-                    Arc::clone(&real_branches[0].1)
-                } else {
-                    Arc::new(CaseExpr::try_new(
-                        Some(modulo_expr),
-                        real_branches,
-                        Some(lit(false)),
-                    )?) as Arc<dyn PhysicalExpr>
-                };
-
+                let filter_expr = self
+                    .build_partitioned_filter(&real_partitions, 
has_canceled_unknown)?;
                 self.dynamic_filter.update(filter_expr)?;
             }
         }
 
         Ok(())
     }
+
+    /// CollectLeft has a single shared build side, so we always have one
+    /// `Map`. We prefer the InList expression when it's available (the build
+    /// side fit under the InList caps) because it's directly representable in
+    /// parquet stats / bloom-filter pruning at the scan; otherwise fall back
+    /// to a single `hash_lookup` against the map.
+    fn collect_left_membership(
+        &self,
+        pushdown: &PushdownStrategy,
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        if let Some(arr) = &pushdown.inlist {
+            return Ok(Some(create_inlist_predicate(
+                &self.on_right,
+                Arc::clone(arr),
+                self.probe_schema.as_ref(),
+            )?));
+        }
+        Ok(pushdown.map.as_ref().map(|map| {
+            create_hash_lookup_predicate(&self.on_right, Arc::clone(map), 
&HASH_JOIN_SEED)
+        }))
+    }
+
+    /// Build the dynamic filter for `PartitionMode::Partitioned`. Emits
+    /// `global_minmax AND ([merged_in_list AND] multi_hash_lookup)` —
+    /// independent of how the build side was repartitioned.
+    ///
+    /// * `global_minmax` — envelope of every partition's per-column min/max.
+    ///   Cheap short-circuit and the only piece visible to scan-level
+    ///   `pruning_predicate` extraction.
+    /// * `merged_in_list` — concatenated, deduplicated build keys when every
+    ///   reported partition contributed an `InList` array and the
+    ///   cross-partition union fits under
+    ///   `optimizer.hash_join_inlist_pushdown_max_distinct_values`. A small
+    ///   `IN (SET)` participates in parquet stats / bloom-filter pruning,
+    ///   which `multi_hash_lookup` does not. When present it fully replaces
+    ///   the lookup.
+    /// * `multi_hash_lookup` — hashes the join keys once and ORs
+    ///   `contain_hashes()` across every partition's hash table.
+    ///
+    /// `has_canceled_unknown` partitions short-circuit to `lit(true)`: we
+    /// don't have their maps, so we cannot include them in the lookup, and
+    /// the query is being torn down anyway.
+    fn build_partitioned_filter(
+        &self,
+        real_partitions: &[&PartitionData],
+        has_canceled_unknown: bool,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        if has_canceled_unknown {
+            return Ok(lit(true));
+        }
+        if real_partitions.is_empty() {
+            return Ok(lit(false));
+        }
+
+        let bounds_refs: Vec<&PartitionBounds> =
+            real_partitions.iter().map(|p| &p.bounds).collect();
+        let global_bounds_expr = compute_global_bounds(&bounds_refs)
+            .as_ref()
+            .and_then(|b| create_bounds_predicate(&self.on_right, b));
+
+        // The merged InList covers the union of every partition's
+        // build-side keys, so when it fires it stands alone — there is no
+        // need to also AND a `multi_hash_lookup` (which would just probe
+        // the same data via a different structure).
+        let membership_expr =
+            if let Some(merged) = 
self.try_build_merged_inlist(real_partitions)? {
+                Some(merged)
+            } else {
+                let maps: Vec<Arc<Map>> = real_partitions
+                    .iter()
+                    .filter_map(|p| p.pushdown.map.clone())
+                    .collect();
+                if maps.is_empty() {
+                    // Defensive: every reported (non-empty) partition is
+                    // supposed to carry a Map. Falling through to None means
+                    // we degrade to bounds-only filtering.
+                    None
+                } else {
+                    Some(Arc::new(MultiMapLookupExpr::new(
+                        self.on_right.clone(),
+                        HASH_JOIN_SEED.clone(),
+                        maps,
+                        "multi_hash_lookup".to_string(),
+                    )) as Arc<dyn PhysicalExpr>)
+                }
+            };
+
+        Ok(
+            combine_membership_and_bounds(membership_expr, global_bounds_expr)
+                .unwrap_or_else(|| lit(true)),
+        )
+    }
+
+    /// If every reported partition contributed an InList array, concatenate
+    /// them, deduplicate by scalar value, and gate on the
+    /// `inlist_max_distinct_values` cap. Returns the merged
+    /// `(struct(...))? IN (SET) ([…])` predicate built over the
+    /// deduplicated keys when the cap is satisfied; `None` otherwise.
+    ///
+    /// Per-partition arrays carry duplicates — each partition ships its raw
+    /// build-side join keys, dedup happens here. The dedup walk early-aborts
+    /// the moment we cross the cap, so the cost stays bounded by
+    /// `O(rows-until-cap+1-distinct-found)` rather than total input size.
+    fn try_build_merged_inlist(
+        &self,
+        real_partitions: &[&PartitionData],
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        let cap = self.inlist_max_distinct_values;
+        let mut arrays: Vec<ArrayRef> = 
Vec::with_capacity(real_partitions.len());
+        for p in real_partitions {
+            let Some(arr) = &p.pushdown.inlist else {
+                return Ok(None);
+            };
+            arrays.push(Arc::clone(arr));
+        }
+        let Some(merged) = merge_inlist_arrays(&arrays) else {

Review Comment:
   `merge_inlist_arrays()` concats which is allocating one large array with 
each partition `InList` value.
   
   Like:
   ```text
   p0: 20 vals
   p1: 20 vals
   ...
   pN: 20 vals
   ```
   
   So I don't think we are necessarily aborting at the cap as intended.
   
   I think if we traverse each partition array and track distinct values, then 
abort right when we hit the cap, then build the deduped array, that would have 
less allocation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to