zhuqi-lucas commented on code in PR #22751:
URL: https://github.com/apache/datafusion/pull/22751#discussion_r3354417910
##########
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs:
##########
@@ -898,168 +898,128 @@ macro_rules! instantiate_primitive {
};
}
+/// Build a [`GroupColumn`] for a single schema field.
+///
+/// Extracted from the inline match that used to live in
+/// [`GroupValuesColumn::intern`] so the per-field dispatch lives in one
+/// place. This factory is the single source of truth for which Arrow types
+/// map to which builder, and it is the function that future nested-type
+/// specializations (e.g. `Struct`, `List`, `LargeList`) plug into without
+/// having to enumerate every combination inline.
+///
+/// Returns `Err(not_impl_err!(...))` for any type not in the supported set;
+/// callers (`GroupValues::intern`) propagate that error so the
+/// `GroupValuesRows` fallback can take over upstream of this builder.
+fn make_group_column(field: &Field) -> Result<Box<dyn GroupColumn>> {
+ let nullable = field.is_nullable();
+ let data_type = field.data_type();
+ let mut v: Vec<Box<dyn GroupColumn>> = Vec::with_capacity(1);
+ match *data_type {
+ DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type,
data_type),
+ DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type,
data_type),
+ DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type,
data_type),
+ DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type,
data_type),
+ DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type,
data_type),
+ DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type,
data_type),
+ DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type,
data_type),
+ DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type,
data_type),
+ DataType::Float32 => {
+ instantiate_primitive!(v, nullable, Float32Type, data_type)
+ }
+ DataType::Float64 => {
+ instantiate_primitive!(v, nullable, Float64Type, data_type)
+ }
+ DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type,
data_type),
+ DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type,
data_type),
+ DataType::Time32(t) => match t {
+ TimeUnit::Second => {
+ instantiate_primitive!(v, nullable, Time32SecondType,
data_type)
+ }
+ TimeUnit::Millisecond => {
+ instantiate_primitive!(v, nullable, Time32MillisecondType,
data_type)
+ }
+ // Time32 with Microsecond / Nanosecond is not a valid Arrow type
+ // combination; reject explicitly so supported_type and this
+ // dispatcher stay in lockstep (see consistency fuzz below).
+ _ => return not_impl_err!("{data_type} not supported in
GroupValuesColumn"),
+ },
+ DataType::Time64(t) => match t {
+ TimeUnit::Microsecond => {
+ instantiate_primitive!(v, nullable, Time64MicrosecondType,
data_type)
+ }
+ TimeUnit::Nanosecond => {
+ instantiate_primitive!(v, nullable, Time64NanosecondType,
data_type)
+ }
+ // Time64 with Second / Millisecond is not a valid Arrow type
+ // combination; reject explicitly.
+ _ => return not_impl_err!("{data_type} not supported in
GroupValuesColumn"),
+ },
+ DataType::Timestamp(t, _) => match t {
+ TimeUnit::Second => {
+ instantiate_primitive!(v, nullable, TimestampSecondType,
data_type)
+ }
+ TimeUnit::Millisecond => {
+ instantiate_primitive!(v, nullable, TimestampMillisecondType,
data_type)
+ }
+ TimeUnit::Microsecond => {
+ instantiate_primitive!(v, nullable, TimestampMicrosecondType,
data_type)
+ }
+ TimeUnit::Nanosecond => {
+ instantiate_primitive!(v, nullable, TimestampNanosecondType,
data_type)
+ }
+ },
+ DataType::Decimal128(_, _) => {
+ instantiate_primitive!(v, nullable, Decimal128Type, data_type)
+ }
+ DataType::Utf8 => {
+ v.push(Box::new(ByteGroupValueBuilder::<i32>::new(
+ OutputType::Utf8,
+ )));
+ }
+ DataType::LargeUtf8 => {
+ v.push(Box::new(ByteGroupValueBuilder::<i64>::new(
+ OutputType::Utf8,
+ )));
+ }
+ DataType::Binary => {
+ v.push(Box::new(ByteGroupValueBuilder::<i32>::new(
+ OutputType::Binary,
+ )));
+ }
+ DataType::LargeBinary => {
+ v.push(Box::new(ByteGroupValueBuilder::<i64>::new(
+ OutputType::Binary,
+ )));
+ }
+ DataType::Utf8View => {
+
v.push(Box::new(ByteViewGroupValueBuilder::<StringViewType>::new()));
+ }
+ DataType::BinaryView => {
+
v.push(Box::new(ByteViewGroupValueBuilder::<BinaryViewType>::new()));
+ }
+ DataType::Boolean => {
+ if nullable {
+ v.push(Box::new(BooleanGroupValueBuilder::<true>::new()));
+ } else {
+ v.push(Box::new(BooleanGroupValueBuilder::<false>::new()));
+ }
+ }
+ _ => return not_impl_err!("{data_type} not supported in
GroupValuesColumn"),
+ }
+ debug_assert_eq!(
+ v.len(),
+ 1,
+ "make_group_column must push exactly one builder"
+ );
+ Ok(v.into_iter().next().unwrap())
+}
+
impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) ->
Result<()> {
if self.group_values.is_empty() {
- let mut v = Vec::with_capacity(cols.len());
-
+ let mut v: Vec<Box<dyn GroupColumn>> =
Vec::with_capacity(cols.len());
for f in self.schema.fields().iter() {
- let nullable = f.is_nullable();
- let data_type = f.data_type();
- match data_type {
- &DataType::Int8 => {
- instantiate_primitive!(v, nullable, Int8Type,
data_type)
- }
- &DataType::Int16 => {
- instantiate_primitive!(v, nullable, Int16Type,
data_type)
- }
- &DataType::Int32 => {
- instantiate_primitive!(v, nullable, Int32Type,
data_type)
- }
- &DataType::Int64 => {
- instantiate_primitive!(v, nullable, Int64Type,
data_type)
- }
- &DataType::UInt8 => {
- instantiate_primitive!(v, nullable, UInt8Type,
data_type)
- }
- &DataType::UInt16 => {
- instantiate_primitive!(v, nullable, UInt16Type,
data_type)
- }
- &DataType::UInt32 => {
- instantiate_primitive!(v, nullable, UInt32Type,
data_type)
- }
- &DataType::UInt64 => {
- instantiate_primitive!(v, nullable, UInt64Type,
data_type)
- }
- &DataType::Float32 => {
- instantiate_primitive!(v, nullable, Float32Type,
data_type)
- }
- &DataType::Float64 => {
- instantiate_primitive!(v, nullable, Float64Type,
data_type)
- }
- &DataType::Date32 => {
- instantiate_primitive!(v, nullable, Date32Type,
data_type)
- }
- &DataType::Date64 => {
- instantiate_primitive!(v, nullable, Date64Type,
data_type)
- }
- &DataType::Time32(t) => match t {
- TimeUnit::Second => {
- instantiate_primitive!(
- v,
- nullable,
- Time32SecondType,
- data_type
- )
- }
- TimeUnit::Millisecond => {
- instantiate_primitive!(
- v,
- nullable,
- Time32MillisecondType,
- data_type
- )
- }
- _ => {}
- },
- &DataType::Time64(t) => match t {
- TimeUnit::Microsecond => {
- instantiate_primitive!(
- v,
- nullable,
- Time64MicrosecondType,
- data_type
- )
- }
- TimeUnit::Nanosecond => {
- instantiate_primitive!(
- v,
- nullable,
- Time64NanosecondType,
- data_type
- )
- }
- _ => {}
- },
- &DataType::Timestamp(t, _) => match t {
- TimeUnit::Second => {
- instantiate_primitive!(
- v,
- nullable,
- TimestampSecondType,
- data_type
- )
- }
- TimeUnit::Millisecond => {
- instantiate_primitive!(
- v,
- nullable,
- TimestampMillisecondType,
- data_type
- )
- }
- TimeUnit::Microsecond => {
- instantiate_primitive!(
- v,
- nullable,
- TimestampMicrosecondType,
- data_type
- )
- }
- TimeUnit::Nanosecond => {
- instantiate_primitive!(
- v,
- nullable,
- TimestampNanosecondType,
- data_type
- )
- }
- },
- &DataType::Decimal128(_, _) => {
- instantiate_primitive! {
- v,
- nullable,
- Decimal128Type,
- data_type
- }
- }
- &DataType::Utf8 => {
- let b =
ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
- v.push(Box::new(b) as _)
- }
- &DataType::LargeUtf8 => {
- let b =
ByteGroupValueBuilder::<i64>::new(OutputType::Utf8);
- v.push(Box::new(b) as _)
- }
- &DataType::Binary => {
- let b =
ByteGroupValueBuilder::<i32>::new(OutputType::Binary);
- v.push(Box::new(b) as _)
- }
- &DataType::LargeBinary => {
- let b =
ByteGroupValueBuilder::<i64>::new(OutputType::Binary);
- v.push(Box::new(b) as _)
- }
- &DataType::Utf8View => {
- let b =
ByteViewGroupValueBuilder::<StringViewType>::new();
- v.push(Box::new(b) as _)
- }
- &DataType::BinaryView => {
- let b =
ByteViewGroupValueBuilder::<BinaryViewType>::new();
- v.push(Box::new(b) as _)
- }
- &DataType::Boolean => {
- if nullable {
- let b = BooleanGroupValueBuilder::<true>::new();
- v.push(Box::new(b) as _)
- } else {
- let b = BooleanGroupValueBuilder::<false>::new();
- v.push(Box::new(b) as _)
- }
- }
- dt => {
- return not_impl_err!("{dt} not supported in
GroupValuesColumn");
- }
- }
+ v.push(make_group_column(f.as_ref())?);
Review Comment:
Good point @2010YOUY01 , addressed. Moved the per-field builder construction
into a private helper `build_group_columns` and now call it from three places:
`try_new` (initial construction), the `EmitTo::All` branch of `emit` (replaces
the drained Vec with a fresh one via `mem::replace`), and `clear_shrink`
(rebuilds after the Vec is cleared). `intern` no longer has the lazy if-empty
check.
One consequence is that an unsupported schema now fails at `try_new` time
instead of at the first `intern` call. In production this changes nothing
because the `new_group_values` factory in `aggregates/group_values/mod.rs` only
calls `GroupValuesColumn::try_new` after `multi_group_by::supported_schema`
returns true, and unsupported schemas still fall back to `GroupValuesRows`. The
fail-fast just surfaces a programming error sooner for callers that bypass the
factory.
Updated the unit test from
`intern_returns_not_impl_for_unsupported_top_level_type` to
`try_new_returns_not_impl_for_unsupported_top_level_type`. 30 group_values
tests + dhat test still pass.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]