andygrove opened a new pull request, #4196:
URL: https://github.com/apache/datafusion-comet/pull/4196

   ## Which issue does this PR close?
   
   Closes #4193 (sub-issue of #4098).
   
   ## Rationale for this change
   
   Spark 4.1's \`BloomFilter.create\` defaults to a new V2 implementation 
(\`BloomFilterImplV2\`) with both a new binary format **and** a new 
bit-scattering algorithm. Comet's reader hard-rejects non-V1 bytes and Comet's 
writer always emits V1, so on Spark 4.1 both \`BloomFilterMightContain from 
random input\` and \`bloom_filter_agg\` produce wrong results vs Spark.
   
   V1 vs V2:
   
   | | V1 (Spark <= 4.0, still supported in 4.1) | V2 (Spark 4.1+ default) |
   |---|---|---|
   | Binary | \`[version=1][numHashFn][numWords][bits...]\` | 
\`[version=2][numHashFn][seed][numWords][bits...]\` |
   | Scatter | \`combinedHash = h1 + i*h2\`, i in 1..=N, 32-bit wrap | 
\`combinedHash = (long)h1 * Integer.MAX_VALUE; for (i=0; i<N; i++) combinedHash 
+= h2;\`, 64-bit wrap |
   | Bit index | \`combinedHash<0 ? ~combinedHash : combinedHash\`, mod 32-bit 
\`bitSize\` | same negation, mod 64-bit \`bitSize\` |
   | Seed | always 0 | per-filter (Spark's \`BloomFilterImplV2.DEFAULT_SEED\` 
is also 0) |
   
   ## What changes are included in this PR?
   
   - \`SparkBloomFilter\` 
(\`native/spark-expr/src/bloom_filter/spark_bloom_filter.rs\`): added 
\`SparkBloomFilterVersion\` (V1, V2) and a \`seed\` field. The deserializer 
detects the version from the leading 4 bytes; V2 reads an extra 4-byte seed. 
The serializer writes the layout matching the filter's version. \`put_long\` / 
\`put_binary\` / \`might_contain_long\` now branch on version for the 
bit-scattering algorithm and seed murmur3 with \`self.seed\` (always 0 for V1; 
configurable for V2).
   - \`BloomFilterAgg\` (\`bloom_filter_agg.rs\`): takes a 
\`SparkBloomFilterVersion\` so the aggregator emits the version matching 
Spark's output.
   - Proto (\`expr.proto\`): added \`BloomFilterVersion\` enum (V1 / V2 / 
Unspecified) and a \`version\` field on \`BloomFilterAgg\`.
   - JVM serde (\`CometBloomFilterAggregate\`): sets V2 on Spark 4.1+ and V1 on 
Spark <= 4.0 to match Spark's \`BloomFilter.create\` default.
   - New Rust unit tests cover V1 and V2 round-trips, that the two scattering 
schemes produce different bit patterns for the same input, and that the 
deserializer rejects unknown versions.
   - Removes the \`assume(!isSpark41Plus, ...)\` guards on 
\`BloomFilterMightContain from random input\` (CometExec3_4PlusSuite) and 
\`bloom_filter_agg\` (CometExecSuite); both now run on Spark 4.1.
   
   ## How are these changes tested?
   
   - 4 new Rust unit tests in \`spark_bloom_filter::tests\` (round-trip, V1≠V2 
bit pattern, panic on unknown version) — all pass.
   - \`CometExec3_4PlusSuite\` and \`CometExecSuite\` bloom filter tests pass 
locally on **both** Spark 4.0 (V1 path) and Spark 4.1 (V2 path).
   - Full \`CometExecSuite\` (124 tests) passes on Spark 4.0 — no regressions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to