LiaCastaneda commented on code in PR #21589: URL: https://github.com/apache/datafusion/pull/21589#discussion_r3109453265
########## datafusion/physical-plan/src/aggregates/group_values/single_group_by/dictionary.rs: ########## @@ -0,0 +1,3491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregates::group_values::GroupValues; +use crate::hash_utils::RandomState; +use arrow::array::{ + Array, ArrayRef, DictionaryArray, LargeStringArray, LargeStringBuilder, ListArray, + ListBuilder, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, + StringViewArray, StringViewBuilder, UInt64Array, +}; +use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType}; +use datafusion_common::Result; +use datafusion_common::hash_utils::create_hashes; +use datafusion_expr::EmitTo; +use std::borrow::Cow; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; + +macro_rules! decode_list { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = ListBuilder::new(<$builder_type>::new()); + for raw_bytes in $raw { + match raw_bytes { + None => builder.append_null(), + Some(raw_vector) => { + let mut offset = 0; + while offset < raw_vector.len() { + let len = i32::from_ne_bytes( + raw_vector[offset..offset + 4] + .try_into() + .expect("slice of length 4"), + ); + offset += 4; + if len == -1 { + builder.values().append_null(); + } else { + let s = std::str::from_utf8( + &raw_vector[offset..offset + len as usize], + ) + .map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in list element: {e}" + )) + })?; + builder.values().append_value(s); + offset += len as usize; + } + } + builder.append(true); + } + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +macro_rules! decode_scalar_string { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = <$builder_type>::new(); + for raw_bytes in $raw { + match raw_bytes { + Some(raw_vector) => { + let s = std::str::from_utf8(raw_vector).map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in GroupValuesDictionary: {e}" + )) + })?; + builder.append_value(s); + } + None => builder.append_null(), + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +type GroupEntry = (usize, Option<Vec<u8>>); +pub struct GroupValuesDictionary<K: ArrowDictionaryKeyType + Send> { + // stores the order new unique elements are seen for self.emit() + seen_elements: Vec<Option<Vec<u8>>>, // Box<dyn Builder> doesnt provide the flexibility of building partition arrays that wed need to support emit::First(N) + value_dt: DataType, + _phantom: PhantomData<K>, + // keeps track of which values weve already seen. stored as -> <unique_value_hash:(initial_group_id, raw_bytes)> + unique_dict_value_mapping: HashMap<u64, Vec<GroupEntry>>, + + random_state: RandomState, + + // cache the group id for nulls since they all map to the same group + null_group_id: Option<usize>, + // tracks if intern has ever been called. this is used to determine if we can skip phaase 1 of of intern. + // phrase one is where we build a hash -> group id mapping for all unique values in the dictionary to avoid repeated hashmap lookups + equality checks in the hot loop of phase 2. + // if intern has never been called, we know for certain that no insertions have been made and we can skip phase 1 entirely since the mapping will be empty and not match any values. + // after the first call to intern, we know that at least one insertion has been made and we have to do phase 1 on every subsequent call to intern to ensure correctness. + intern_called: bool, +} + +impl<K: ArrowDictionaryKeyType + Send> GroupValuesDictionary<K> { + pub fn new(data_type: &DataType) -> Self { + Self { + seen_elements: Vec::new(), + unique_dict_value_mapping: HashMap::new(), + value_dt: data_type.clone(), + _phantom: PhantomData, + random_state: RandomState::with_seed(0), + null_group_id: None, + intern_called: false, + } + } + fn compute_value_hashes(&mut self, values: &ArrayRef) -> Result<Vec<u64>> { + let mut hashes = vec![0u64; values.len()]; + create_hashes([Arc::clone(values)], &self.random_state, &mut hashes)?; + Ok(hashes) + } + + fn get_raw_bytes(values: &ArrayRef, index: usize) -> Cow<'_, [u8]> { + match values.data_type() { + DataType::Utf8 => Cow::Borrowed( + values + .as_any() + .downcast_ref::<StringArray>() + .expect("Expected StringArray") + .value(index) + .as_bytes(), + ), + DataType::LargeUtf8 => Cow::Borrowed( + values + .as_any() + .downcast_ref::<LargeStringArray>() + .expect("Expected LargeStringArray") + .value(index) + .as_bytes(), + ), + DataType::Utf8View => Cow::Borrowed( + values + .as_any() + .downcast_ref::<StringViewArray>() + .expect("Expected StringViewArray") + .value(index) + .as_bytes(), + ), + DataType::List(_) => { + let list_array = values + .as_any() + .downcast_ref::<ListArray>() + .expect("Expected ListArray"); + + if list_array.is_null(index) { + panic!() // this cannot happen. leaving this here as an invariant + } + + let start = list_array.value_offsets()[index] as usize; + let end = list_array.value_offsets()[index + 1] as usize; + let child = list_array.values(); + + let mut bytes = Vec::new(); + for i in start..end { + if child.is_null(i) { + // acts as a marker for transform_into_array to write a null + bytes.extend_from_slice(&(-1i32).to_ne_bytes()); + } else { + let raw = Self::get_raw_bytes(child, i); + bytes.extend_from_slice(&(raw.len() as i32).to_ne_bytes()); + bytes.extend_from_slice(&raw); + } + } + Cow::Owned(bytes) + } + other => unimplemented!("get_raw_bytes not implemented for {other:?}"), + } + } + + #[inline] + fn get_null_group_id(&mut self) -> usize { + if let Some(group_id) = self.null_group_id { + group_id + } else { + // first time we've seen a null + let new_group_id = self.seen_elements.len(); + self.seen_elements.push(None); + self.unique_dict_value_mapping + .insert((usize::MAX - 1) as u64, vec![(new_group_id, None)]); + self.null_group_id = Some(new_group_id); // never compute this again + new_group_id + } + } + fn transform_into_array(&self, raw: &[Option<Vec<u8>>]) -> Result<ArrayRef> { + match &self.value_dt { + DataType::Utf8 => decode_scalar_string!(raw, StringBuilder), + DataType::LargeUtf8 => decode_scalar_string!(raw, LargeStringBuilder), + DataType::Utf8View => decode_scalar_string!(raw, StringViewBuilder), + DataType::List(field) => match field.data_type() { + DataType::Utf8 => decode_list!(raw, StringBuilder), + DataType::LargeUtf8 => decode_list!(raw, LargeStringBuilder), + DataType::Utf8View => decode_list!(raw, StringViewBuilder), + other => Err(datafusion_common::DataFusionError::NotImplemented( + format!("transform_into_array not implemented for List<{other:?}>"), + )), + }, + other => Err(datafusion_common::DataFusionError::NotImplemented(format!( + "transform_into_array not implemented for {other:?}" + ))), + } + } + fn normalize_dict_array( + values: &ArrayRef, + key_array: &PrimitiveArray<K>, + ) -> (ArrayRef, Vec<Option<usize>>) { + // maps old value index -> new canonical index + let mut old_to_new: Vec<Option<usize>> = vec![None; values.len()]; + let mut canonical_indices: Vec<usize> = Vec::new(); + + for (i, slot) in old_to_new.iter_mut().enumerate() { + if values.is_null(i) { + continue; + } + let raw = Self::get_raw_bytes(values, i); + let canonical = canonical_indices + .iter() + .position(|&j| Self::get_raw_bytes(values, j) == raw); + if let Some(idx) = canonical { + *slot = Some(idx); + } else { + *slot = Some(canonical_indices.len()); + canonical_indices.push(i); + } + } + // build new deduplicated values array using take Review Comment: I'm suprised arrow does not enforce values to be unique, theoretically when you insert a value into a dict if it was there before we only update the key array no? ########## datafusion/physical-plan/src/aggregates/group_values/single_group_by/dictionary.rs: ########## @@ -0,0 +1,3491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregates::group_values::GroupValues; +use crate::hash_utils::RandomState; +use arrow::array::{ + Array, ArrayRef, DictionaryArray, LargeStringArray, LargeStringBuilder, ListArray, + ListBuilder, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, + StringViewArray, StringViewBuilder, UInt64Array, +}; +use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType}; +use datafusion_common::Result; +use datafusion_common::hash_utils::create_hashes; +use datafusion_expr::EmitTo; +use std::borrow::Cow; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; + +macro_rules! decode_list { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = ListBuilder::new(<$builder_type>::new()); + for raw_bytes in $raw { + match raw_bytes { + None => builder.append_null(), + Some(raw_vector) => { + let mut offset = 0; + while offset < raw_vector.len() { + let len = i32::from_ne_bytes( + raw_vector[offset..offset + 4] + .try_into() + .expect("slice of length 4"), + ); + offset += 4; + if len == -1 { + builder.values().append_null(); + } else { + let s = std::str::from_utf8( + &raw_vector[offset..offset + len as usize], + ) + .map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in list element: {e}" + )) + })?; + builder.values().append_value(s); + offset += len as usize; + } + } + builder.append(true); + } + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +macro_rules! decode_scalar_string { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = <$builder_type>::new(); + for raw_bytes in $raw { + match raw_bytes { + Some(raw_vector) => { + let s = std::str::from_utf8(raw_vector).map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in GroupValuesDictionary: {e}" + )) + })?; + builder.append_value(s); + } + None => builder.append_null(), + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +type GroupEntry = (usize, Option<Vec<u8>>); +pub struct GroupValuesDictionary<K: ArrowDictionaryKeyType + Send> { + // stores the order new unique elements are seen for self.emit() + seen_elements: Vec<Option<Vec<u8>>>, // Box<dyn Builder> doesnt provide the flexibility of building partition arrays that wed need to support emit::First(N) Review Comment: Looking at other implementations, I see they keep this in a separate field and use `MaybeNullBufferBuilder` ########## datafusion/physical-plan/src/aggregates/group_values/single_group_by/dictionary.rs: ########## @@ -0,0 +1,3491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregates::group_values::GroupValues; +use crate::hash_utils::RandomState; +use arrow::array::{ + Array, ArrayRef, DictionaryArray, LargeStringArray, LargeStringBuilder, ListArray, + ListBuilder, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, + StringViewArray, StringViewBuilder, UInt64Array, +}; +use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType}; +use datafusion_common::Result; +use datafusion_common::hash_utils::create_hashes; +use datafusion_expr::EmitTo; +use std::borrow::Cow; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; + +macro_rules! decode_list { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = ListBuilder::new(<$builder_type>::new()); + for raw_bytes in $raw { + match raw_bytes { + None => builder.append_null(), + Some(raw_vector) => { + let mut offset = 0; + while offset < raw_vector.len() { + let len = i32::from_ne_bytes( + raw_vector[offset..offset + 4] + .try_into() + .expect("slice of length 4"), + ); + offset += 4; + if len == -1 { + builder.values().append_null(); + } else { + let s = std::str::from_utf8( + &raw_vector[offset..offset + len as usize], + ) + .map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in list element: {e}" + )) + })?; + builder.values().append_value(s); + offset += len as usize; + } + } + builder.append(true); + } + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +macro_rules! decode_scalar_string { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = <$builder_type>::new(); + for raw_bytes in $raw { + match raw_bytes { + Some(raw_vector) => { + let s = std::str::from_utf8(raw_vector).map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in GroupValuesDictionary: {e}" + )) + })?; + builder.append_value(s); + } + None => builder.append_null(), + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +type GroupEntry = (usize, Option<Vec<u8>>); +pub struct GroupValuesDictionary<K: ArrowDictionaryKeyType + Send> { + // stores the order new unique elements are seen for self.emit() + seen_elements: Vec<Option<Vec<u8>>>, // Box<dyn Builder> doesnt provide the flexibility of building partition arrays that wed need to support emit::First(N) + value_dt: DataType, + _phantom: PhantomData<K>, + // keeps track of which values weve already seen. stored as -> <unique_value_hash:(initial_group_id, raw_bytes)> + unique_dict_value_mapping: HashMap<u64, Vec<GroupEntry>>, + + random_state: RandomState, + + // cache the group id for nulls since they all map to the same group + null_group_id: Option<usize>, + // tracks if intern has ever been called. this is used to determine if we can skip phaase 1 of of intern. + // phrase one is where we build a hash -> group id mapping for all unique values in the dictionary to avoid repeated hashmap lookups + equality checks in the hot loop of phase 2. + // if intern has never been called, we know for certain that no insertions have been made and we can skip phase 1 entirely since the mapping will be empty and not match any values. + // after the first call to intern, we know that at least one insertion has been made and we have to do phase 1 on every subsequent call to intern to ensure correctness. + intern_called: bool, +} + +impl<K: ArrowDictionaryKeyType + Send> GroupValuesDictionary<K> { + pub fn new(data_type: &DataType) -> Self { + Self { + seen_elements: Vec::new(), + unique_dict_value_mapping: HashMap::new(), + value_dt: data_type.clone(), + _phantom: PhantomData, + random_state: RandomState::with_seed(0), + null_group_id: None, + intern_called: false, + } + } + fn compute_value_hashes(&mut self, values: &ArrayRef) -> Result<Vec<u64>> { + let mut hashes = vec![0u64; values.len()]; + create_hashes([Arc::clone(values)], &self.random_state, &mut hashes)?; + Ok(hashes) + } + + fn get_raw_bytes(values: &ArrayRef, index: usize) -> Cow<'_, [u8]> { + match values.data_type() { + DataType::Utf8 => Cow::Borrowed( + values + .as_any() + .downcast_ref::<StringArray>() + .expect("Expected StringArray") + .value(index) + .as_bytes(), + ), + DataType::LargeUtf8 => Cow::Borrowed( + values + .as_any() + .downcast_ref::<LargeStringArray>() + .expect("Expected LargeStringArray") + .value(index) + .as_bytes(), + ), + DataType::Utf8View => Cow::Borrowed( + values + .as_any() + .downcast_ref::<StringViewArray>() + .expect("Expected StringViewArray") + .value(index) + .as_bytes(), + ), + DataType::List(_) => { + let list_array = values + .as_any() + .downcast_ref::<ListArray>() + .expect("Expected ListArray"); + + if list_array.is_null(index) { + panic!() // this cannot happen. leaving this here as an invariant + } Review Comment: I see other places in the codebase follow this pattern: ```suggestion debug_assert!(!values.is_null(index)); ``` ########## datafusion/physical-plan/src/aggregates/group_values/single_group_by/dictionary.rs: ########## @@ -0,0 +1,3491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::aggregates::group_values::GroupValues; +use crate::hash_utils::RandomState; +use arrow::array::{ + Array, ArrayRef, DictionaryArray, LargeStringArray, LargeStringBuilder, ListArray, + ListBuilder, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, + StringViewArray, StringViewBuilder, UInt64Array, +}; +use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType}; +use datafusion_common::Result; +use datafusion_common::hash_utils::create_hashes; +use datafusion_expr::EmitTo; +use std::borrow::Cow; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; + +macro_rules! decode_list { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = ListBuilder::new(<$builder_type>::new()); + for raw_bytes in $raw { + match raw_bytes { + None => builder.append_null(), + Some(raw_vector) => { + let mut offset = 0; + while offset < raw_vector.len() { + let len = i32::from_ne_bytes( + raw_vector[offset..offset + 4] + .try_into() + .expect("slice of length 4"), + ); + offset += 4; + if len == -1 { + builder.values().append_null(); + } else { + let s = std::str::from_utf8( + &raw_vector[offset..offset + len as usize], + ) + .map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in list element: {e}" + )) + })?; + builder.values().append_value(s); + offset += len as usize; + } + } + builder.append(true); + } + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +macro_rules! decode_scalar_string { + ($raw:expr, $builder_type:ty) => {{ + let mut builder = <$builder_type>::new(); + for raw_bytes in $raw { + match raw_bytes { + Some(raw_vector) => { + let s = std::str::from_utf8(raw_vector).map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Invalid utf8 in GroupValuesDictionary: {e}" + )) + })?; + builder.append_value(s); + } + None => builder.append_null(), + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + }}; +} +type GroupEntry = (usize, Option<Vec<u8>>); +pub struct GroupValuesDictionary<K: ArrowDictionaryKeyType + Send> { + // stores the order new unique elements are seen for self.emit() + seen_elements: Vec<Option<Vec<u8>>>, // Box<dyn Builder> doesnt provide the flexibility of building partition arrays that wed need to support emit::First(N) + value_dt: DataType, + _phantom: PhantomData<K>, + // keeps track of which values weve already seen. stored as -> <unique_value_hash:(initial_group_id, raw_bytes)> + unique_dict_value_mapping: HashMap<u64, Vec<GroupEntry>>, + + random_state: RandomState, + + // cache the group id for nulls since they all map to the same group + null_group_id: Option<usize>, + // tracks if intern has ever been called. this is used to determine if we can skip phaase 1 of of intern. + // phrase one is where we build a hash -> group id mapping for all unique values in the dictionary to avoid repeated hashmap lookups + equality checks in the hot loop of phase 2. + // if intern has never been called, we know for certain that no insertions have been made and we can skip phase 1 entirely since the mapping will be empty and not match any values. + // after the first call to intern, we know that at least one insertion has been made and we have to do phase 1 on every subsequent call to intern to ensure correctness. + intern_called: bool, +} + +impl<K: ArrowDictionaryKeyType + Send> GroupValuesDictionary<K> { + pub fn new(data_type: &DataType) -> Self { + Self { + seen_elements: Vec::new(), + unique_dict_value_mapping: HashMap::new(), + value_dt: data_type.clone(), + _phantom: PhantomData, + random_state: RandomState::with_seed(0), + null_group_id: None, + intern_called: false, + } + } + fn compute_value_hashes(&mut self, values: &ArrayRef) -> Result<Vec<u64>> { + let mut hashes = vec![0u64; values.len()]; + create_hashes([Arc::clone(values)], &self.random_state, &mut hashes)?; + Ok(hashes) + } + + fn get_raw_bytes(values: &ArrayRef, index: usize) -> Cow<'_, [u8]> { + match values.data_type() { + DataType::Utf8 => Cow::Borrowed( + values + .as_any() + .downcast_ref::<StringArray>() + .expect("Expected StringArray") + .value(index) + .as_bytes(), + ), + DataType::LargeUtf8 => Cow::Borrowed( + values + .as_any() + .downcast_ref::<LargeStringArray>() + .expect("Expected LargeStringArray") + .value(index) + .as_bytes(), + ), + DataType::Utf8View => Cow::Borrowed( + values + .as_any() + .downcast_ref::<StringViewArray>() + .expect("Expected StringViewArray") + .value(index) + .as_bytes(), + ), + DataType::List(_) => { + let list_array = values + .as_any() + .downcast_ref::<ListArray>() + .expect("Expected ListArray"); + + if list_array.is_null(index) { + panic!() // this cannot happen. leaving this here as an invariant + } + + let start = list_array.value_offsets()[index] as usize; + let end = list_array.value_offsets()[index + 1] as usize; + let child = list_array.values(); + + let mut bytes = Vec::new(); + for i in start..end { + if child.is_null(i) { + // acts as a marker for transform_into_array to write a null + bytes.extend_from_slice(&(-1i32).to_ne_bytes()); + } else { + let raw = Self::get_raw_bytes(child, i); + bytes.extend_from_slice(&(raw.len() as i32).to_ne_bytes()); + bytes.extend_from_slice(&raw); + } + } + Cow::Owned(bytes) + } + other => unimplemented!("get_raw_bytes not implemented for {other:?}"), + } + } + + #[inline] + fn get_null_group_id(&mut self) -> usize { + if let Some(group_id) = self.null_group_id { + group_id + } else { + // first time we've seen a null + let new_group_id = self.seen_elements.len(); + self.seen_elements.push(None); + self.unique_dict_value_mapping + .insert((usize::MAX - 1) as u64, vec![(new_group_id, None)]); + self.null_group_id = Some(new_group_id); // never compute this again + new_group_id + } + } + fn transform_into_array(&self, raw: &[Option<Vec<u8>>]) -> Result<ArrayRef> { + match &self.value_dt { + DataType::Utf8 => decode_scalar_string!(raw, StringBuilder), + DataType::LargeUtf8 => decode_scalar_string!(raw, LargeStringBuilder), + DataType::Utf8View => decode_scalar_string!(raw, StringViewBuilder), + DataType::List(field) => match field.data_type() { + DataType::Utf8 => decode_list!(raw, StringBuilder), + DataType::LargeUtf8 => decode_list!(raw, LargeStringBuilder), + DataType::Utf8View => decode_list!(raw, StringViewBuilder), + other => Err(datafusion_common::DataFusionError::NotImplemented( + format!("transform_into_array not implemented for List<{other:?}>"), + )), + }, + other => Err(datafusion_common::DataFusionError::NotImplemented(format!( + "transform_into_array not implemented for {other:?}" + ))), + } + } + fn normalize_dict_array( + values: &ArrayRef, + key_array: &PrimitiveArray<K>, + ) -> (ArrayRef, Vec<Option<usize>>) { + // maps old value index -> new canonical index + let mut old_to_new: Vec<Option<usize>> = vec![None; values.len()]; + let mut canonical_indices: Vec<usize> = Vec::new(); + + for (i, slot) in old_to_new.iter_mut().enumerate() { + if values.is_null(i) { + continue; + } + let raw = Self::get_raw_bytes(values, i); + let canonical = canonical_indices + .iter() + .position(|&j| Self::get_raw_bytes(values, j) == raw); + if let Some(idx) = canonical { + *slot = Some(idx); + } else { + *slot = Some(canonical_indices.len()); + canonical_indices.push(i); + } + } + // build new deduplicated values array using take + let indices = UInt64Array::from( + canonical_indices + .iter() + .map(|&i| i as u64) + .collect::<Vec<_>>(), + ); + let new_values = arrow::compute::take(values.as_ref(), &indices, None).unwrap(); + + // remap keys + let new_keys: Vec<Option<usize>> = (0..key_array.len()) + .map(|i| { + if key_array.is_null(i) { + None + } else { + let old_key = key_array.value(i).to_usize().unwrap(); + old_to_new[old_key] + } + }) + .collect(); + + (new_values, new_keys) + } +} + +impl<K: ArrowDictionaryKeyType + Send> GroupValues for GroupValuesDictionary<K> { + // not really sure how to return the size of strings and binary values so this is a best effort approach + fn size(&self) -> usize { + size_of::<Self>() + + self + .seen_elements + .iter() + .filter_map(|opt| opt.as_ref()) + .map(|inner| inner.capacity()) + .sum::<usize>() + + self.unique_dict_value_mapping.capacity() + * size_of::<(u64, Vec<(usize, Vec<u8>)>)>() + } + fn len(&self) -> usize { + self.seen_elements.len() + } + fn is_empty(&self) -> bool { + self.seen_elements.is_empty() + } + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> { + if cols.len() != 1 { + return Err(datafusion_common::DataFusionError::Internal( + "GroupValuesDictionary only supports single column group by".to_string(), + )); + } Review Comment: ```suggestion assert_eq!(cols.len(), 1); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
