Jefffrey commented on code in PR #21720:
URL: https://github.com/apache/datafusion/pull/21720#discussion_r3165868616
##########
datafusion/common/src/config.rs:
##########
@@ -692,6 +692,17 @@ config_namespace! {
/// `false` — ANSI SQL mode is disabled by default.
pub enable_ansi_mode: bool, default = false
+ /// Policy for handling duplicate keys in Spark-compatible
map-construction
Review Comment:
I wonder if we should consider having a namespace for Spark specific
configs, cc @andygrove @comphead
##########
datafusion/common/src/config.rs:
##########
@@ -692,6 +692,17 @@ config_namespace! {
/// `false` — ANSI SQL mode is disabled by default.
pub enable_ansi_mode: bool, default = false
+ /// Policy for handling duplicate keys in Spark-compatible
map-construction
+ /// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`).
+ ///
+ /// Mirrors Spark's
+ ///
[`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961):
+ /// - `"EXCEPTION"` (default): raise `[DUPLICATED_MAP_KEY]` at runtime
on any duplicate key.
+ /// - `"LAST_WIN"`: keep the last occurrence of each duplicate key.
+ ///
+ /// Values are case-insensitive. Only affects functions under
`datafusion/spark`.
+ pub map_key_dedup_policy: String, default = "EXCEPTION".to_string()
Review Comment:
Would prefer having the enum value here along with the parsing, instead of
only being a string
Reference
- https://github.com/apache/datafusion/issues/17498
##########
datafusion/spark/src/function/map/utils.rs:
##########
@@ -193,43 +216,193 @@ fn map_deduplicate_keys(
return exec_err!(
"map_deduplicate_keys: keys and values lists in the same
row must have equal lengths"
);
- } else if num_keys_entries != 0 {
- let mut seen_keys = HashSet::new();
-
- for cur_entry_idx in (0..num_keys_entries).rev() {
- let key = ScalarValue::try_from_array(
- &flat_keys,
- cur_keys_offset + cur_entry_idx,
- )?
- .compacted();
- if seen_keys.contains(&key) {
- // TODO: implement configuration and logic for
spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config)
- // exec_err!("invalid argument: duplicate keys in map")
- //
https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961
- } else {
- // This code implements deduplication logic for
spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config)
- keys_mask_one[cur_entry_idx] = true;
- values_mask_one[cur_entry_idx] = true;
- seen_keys.insert(key);
- new_last_offset += 1;
+ }
+ key_to_output_idx.clear();
+ for cur_entry_idx in 0..num_keys_entries {
+ let key = ScalarValue::try_from_array(
+ &flat_keys,
+ cur_keys_offset + cur_entry_idx,
+ )?
+ .compacted();
+ let abs_value_idx = (cur_values_offset + cur_entry_idx) as i32;
+
+ if let Some(&output_idx) = key_to_output_idx.get(&key) {
+ if last_value_wins {
+ value_indices[output_idx] = abs_value_idx;
+ keys_mask_builder.append_value(false);
+ continue;
}
+ return exec_err!(
+ "[DUPLICATED_MAP_KEY] Duplicate map key {key} was
found, \
+ please check the input data. To allow duplicate keys
with \
+ last-value-wins semantics, set \
+ `datafusion.execution.map_key_dedup_policy` to
`LAST_WIN`."
+ );
}
+ keys_mask_builder.append_value(true);
+ key_to_output_idx.insert(key, value_indices.len());
+ value_indices.push(abs_value_idx);
+ new_last_offset += 1;
}
} else {
- // the result entry is NULL
- // both current row offsets are skipped
- // keys or values in the current row are marked false in the masks
+ // The result entry is NULL — no keys/values emitted. Still pad the
+ // mask so it stays aligned with `flat_keys`.
+ keys_mask_builder.append_n(num_keys_entries, false);
}
- keys_mask_builder.append_array(&keys_mask_one.into());
- values_mask_builder.append_array(&values_mask_one.into());
new_offsets.push(new_last_offset);
cur_keys_offset += num_keys_entries;
cur_values_offset += num_values_entries;
}
let keys_mask = keys_mask_builder.finish();
- let values_mask = values_mask_builder.finish();
let needed_keys = filter(&flat_keys, &keys_mask)?;
- let needed_values = filter(&flat_values, &values_mask)?;
+ let value_indices_array = Int32Array::from(value_indices);
+ let needed_values = take(&flat_values, &value_indices_array, None)?;
let offsets = OffsetBuffer::new(new_offsets.into());
Ok((needed_keys, needed_values, offsets))
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{Int32Array, MapArray, StringArray};
+
+ fn as_map(array: &ArrayRef) -> &MapArray {
+ array.as_any().downcast_ref::<MapArray>().expect("MapArray")
+ }
+
+ fn int32_utf8_inputs(
+ keys: Vec<i32>,
+ values: Vec<Option<&str>>,
+ ) -> (ArrayRef, ArrayRef) {
+ let keys: ArrayRef = Arc::new(Int32Array::from(keys));
+ let values: ArrayRef = Arc::new(StringArray::from(values));
+ (keys, values)
+ }
+
+ #[test]
+ fn parse_policy_accepts_both_values_case_insensitively() {
+ assert!(!parse_map_key_dedup_policy("EXCEPTION").unwrap());
+ assert!(!parse_map_key_dedup_policy("exception").unwrap());
+ assert!(parse_map_key_dedup_policy("LAST_WIN").unwrap());
+ assert!(parse_map_key_dedup_policy("last_win").unwrap());
+ }
+
+ #[test]
+ fn parse_policy_rejects_unknown() {
+ let err = parse_map_key_dedup_policy("BOGUS").unwrap_err().to_string();
+ assert!(err.contains("Unknown map_key_dedup_policy"), "{err}");
+ }
+
+ #[test]
+ fn happy_path_two_rows_no_duplicates() {
+ let (keys, values) =
+ int32_utf8_inputs(vec![1, 2, 3], vec![Some("a"), Some("b"),
Some("c")]);
+ let offsets = [0i32, 2, 3];
+
+ let result = map_from_keys_values_offsets_nulls(
+ &keys, &values, &offsets, &offsets, None, None, false,
+ )
+ .unwrap();
+
+ let map = as_map(&result);
+ assert_eq!(map.len(), 2);
+ assert_eq!(map.value_offsets(), &[0, 2, 3]);
+ }
+
+ #[test]
+ fn single_row_duplicate_errors_under_exception() {
+ let (keys, values) =
+ int32_utf8_inputs(vec![1, 2, 1], vec![Some("a"), Some("b"),
Some("c")]);
+ let offsets = [0i32, 3];
+
+ let err = map_from_keys_values_offsets_nulls(
+ &keys, &values, &offsets, &offsets, None, None, false,
+ )
+ .unwrap_err()
+ .to_string();
+
+ assert!(err.contains("[DUPLICATED_MAP_KEY]"), "{err}");
+ assert!(err.contains("map_key_dedup_policy"), "{err}");
+ }
+
+ #[test]
+ fn last_win_keeps_final_occurrence() {
+ let (keys, values) = int32_utf8_inputs(
+ vec![1, 2, 1, 3, 2],
+ vec![Some("a"), Some("b"), Some("c"), Some("d"), Some("e")],
+ );
+ let offsets = [0i32, 5];
+
+ let result = map_from_keys_values_offsets_nulls(
+ &keys, &values, &offsets, &offsets, None, None, true,
+ )
+ .unwrap();
+
+ let map = as_map(&result);
+ assert_eq!(map.len(), 1);
+ // 5 entries in, 3 unique keys -> offsets [0, 3]
+ assert_eq!(map.value_offsets(), &[0, 3]);
+ }
+
+ #[test]
+ fn duplicate_in_later_row_still_errors() {
+ let (keys, values) = int32_utf8_inputs(
+ vec![1, 2, 1, 1],
+ vec![Some("a"), Some("b"), Some("x"), Some("y")],
+ );
+ let offsets = [0i32, 2, 4];
+
+ let err = map_from_keys_values_offsets_nulls(
+ &keys, &values, &offsets, &offsets, None, None, false,
+ )
+ .unwrap_err()
+ .to_string();
+
+ assert!(err.contains("[DUPLICATED_MAP_KEY]"), "{err}");
+ }
+
+ #[test]
+ fn empty_row_does_not_trigger_dedup() {
+ let (keys, values) = int32_utf8_inputs(vec![], vec![]);
+ let offsets = [0i32, 0];
+
+ let result = map_from_keys_values_offsets_nulls(
+ &keys, &values, &offsets, &offsets, None, None, false,
+ )
+ .unwrap();
+
+ let map = as_map(&result);
+ assert_eq!(map.len(), 1);
+ assert_eq!(map.value_offsets(), &[0, 0]);
+ }
+
+ #[test]
+ fn null_row_is_skipped_and_not_checked() {
+ // Row 0 is NULL (keys null). Its duplicate keys should be ignored;
+ // row 1 is a clean row.
+ let (keys, values) = int32_utf8_inputs(
+ vec![1, 1, 2, 3],
+ vec![Some("dup-a"), Some("dup-b"), Some("x"), Some("y")],
+ );
+ let offsets = [0i32, 2, 4];
+ let keys_nulls = NullBuffer::from(vec![false, true]);
+
+ let result = map_from_keys_values_offsets_nulls(
+ &keys,
+ &values,
+ &offsets,
+ &offsets,
+ Some(&keys_nulls),
+ None,
+ false,
+ )
+ .unwrap();
+
+ let map = as_map(&result);
Review Comment:
```suggestion
let map = result.as_map();
```
Using
https://docs.rs/arrow/latest/arrow/array/trait.AsArray.html#method.as_map
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]