martin-g commented on code in PR #20047:
URL: https://github.com/apache/datafusion/pull/20047#discussion_r3181769626
##########
docs/source/library-user-guide/upgrading/54.0.0.md:
##########
@@ -460,3 +460,42 @@ impl Default for MyTreeNode {
}
}
```
+
+[20047]: https://github.com/apache/datafusion/pull/20047
+
+### File statistics cache is now memory-limited and managed by the
`CacheManager`
+
+The file statistics cache used by `ListingTable` is now memory-limited and
+centrally managed through the `CacheManager`.
+
+To configure the cache size use the `file_statistics_cache_limit` setting:
+
+```sql
+SET datafusion.runtime.file_statistics_cache_limit = '10MB'
Review Comment:
```suggestion
SET datafusion.runtime.file_statistics_cache_limit = '10M'
```
##########
docs/source/library-user-guide/upgrading/54.0.0.md:
##########
@@ -460,3 +460,42 @@ impl Default for MyTreeNode {
}
}
```
+
+[20047]: https://github.com/apache/datafusion/pull/20047
+
+### File statistics cache is now memory-limited and managed by the
`CacheManager`
+
+The file statistics cache used by `ListingTable` is now memory-limited and
+centrally managed through the `CacheManager`.
+
+To configure the cache size use the `file_statistics_cache_limit` setting:
+
+```sql
+SET datafusion.runtime.file_statistics_cache_limit = '10MB'
+```
+
+To disable the file statistics cache, set the limit to 0.
+
+The file statistics cache is no longer created inside the `ListingTable`.
+Instead, it is created within the `CacheManager` and must be passed to the
`ListingTable`.
+
+**Who is affected:**
+
+- Users who want to limit the memory usage of the file statistics cache.
+- Users who want to disable the file statistics cache.
+- Users who want to create a `ListingTable` programmatically with a file
statistics cache.
+
+**Migration guide:**
+
+Disable the cache by setting the configuration value to 0:
+
+```sql
+SET datafusion.runtime.file_statistics_cache_limit = '0k'
Review Comment:
```suggestion
SET datafusion.runtime.file_statistics_cache_limit = '0K'
```
##########
datafusion/common/src/heap_size.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::stats::Precision;
+use crate::{ColumnStatistics, ScalarValue, Statistics, TableReference};
+use arrow::array::{
+ Array, FixedSizeListArray, LargeListArray, LargeListViewArray, ListArray,
+ ListViewArray, MapArray, StructArray,
+};
+use arrow::datatypes::{
+ DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano,
IntervalUnit,
+ TimeUnit, UnionFields, UnionMode, i256,
+};
+use chrono::{DateTime, Utc};
+use half::f16;
+use hashbrown::HashSet;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This is a temporary solution until
<https://github.com/apache/datafusion/pull/19599> and
+/// <https://github.com/apache/arrow-rs/pull/9138> are resolved.
+/// Trait for calculating the size of various containers
+pub trait DFHeapSize {
+ /// Return the size of any bytes allocated on the heap by this object,
+ /// including heap memory in those structures
+ ///
+ /// Note that the size of the type itself is not included in the result --
+ /// instead, that size is added by the caller (e.g. container).
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize;
+}
+
+#[derive(Default)]
+pub struct DFHeapSizeCtx {
+ seen: HashSet<usize>,
+}
+
+impl DFHeapSize for Statistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.num_rows.heap_size(ctx)
+ + self.total_byte_size.heap_size(ctx)
+ + self.column_statistics.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for TableReference {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ match self {
+ TableReference::Bare { table } => table.heap_size(ctx),
+ TableReference::Partial { schema, table } => {
+ schema.heap_size(ctx) + table.heap_size(ctx)
+ }
+ TableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => catalog.heap_size(ctx) + schema.heap_size(ctx) +
table.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize
+ for Precision<T>
+{
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.get_value().map_or_else(|| 0, |v| v.heap_size(ctx))
+ }
+}
+
+impl DFHeapSize for ColumnStatistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.null_count.heap_size(ctx)
+ + self.max_value.heap_size(ctx)
+ + self.min_value.heap_size(ctx)
+ + self.sum_value.heap_size(ctx)
+ + self.distinct_count.heap_size(ctx)
+ + self.byte_size.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for ScalarValue {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use crate::scalar::ScalarValue::*;
+ match self {
+ Null => 0,
+ Boolean(b) => b.heap_size(ctx),
+ Float16(f) => f.heap_size(ctx),
+ Float32(f) => f.heap_size(ctx),
+ Float64(f) => f.heap_size(ctx),
+ Decimal32(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal64(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal128(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal256(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Int8(i) => i.heap_size(ctx),
+ Int16(i) => i.heap_size(ctx),
+ Int32(i) => i.heap_size(ctx),
+ Int64(i) => i.heap_size(ctx),
+ UInt8(u) => u.heap_size(ctx),
+ UInt16(u) => u.heap_size(ctx),
+ UInt32(u) => u.heap_size(ctx),
+ UInt64(u) => u.heap_size(ctx),
+ Utf8(u) => u.heap_size(ctx),
+ Utf8View(u) => u.heap_size(ctx),
+ LargeUtf8(l) => l.heap_size(ctx),
+ Binary(b) => b.heap_size(ctx),
+ BinaryView(b) => b.heap_size(ctx),
+ FixedSizeBinary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ LargeBinary(l) => l.heap_size(ctx),
+ FixedSizeList(f) => f.heap_size(ctx),
+ List(l) => l.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Map(m) => m.heap_size(ctx),
+ Date32(d) => d.heap_size(ctx),
+ Date64(d) => d.heap_size(ctx),
+ Time32Second(t) => t.heap_size(ctx),
+ Time32Millisecond(t) => t.heap_size(ctx),
+ Time64Microsecond(t) => t.heap_size(ctx),
+ Time64Nanosecond(t) => t.heap_size(ctx),
+ TimestampSecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMillisecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMicrosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampNanosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ IntervalYearMonth(i) => i.heap_size(ctx),
+ IntervalDayTime(i) => i.heap_size(ctx),
+ IntervalMonthDayNano(i) => i.heap_size(ctx),
+ DurationSecond(d) => d.heap_size(ctx),
+ DurationMillisecond(d) => d.heap_size(ctx),
+ DurationMicrosecond(d) => d.heap_size(ctx),
+ DurationNanosecond(d) => d.heap_size(ctx),
+ Union(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b, c) => {
+ a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx)
+ }
+ ListView(a) => a.heap_size(ctx),
+ LargeListView(a) => a.heap_size(ctx),
+ }
+ }
+}
+
+impl DFHeapSize for DataType {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use DataType::*;
+ match self {
+ Null => 0,
+ Boolean => 0,
+ Int8 => 0,
+ Int16 => 0,
+ Int32 => 0,
+ Int64 => 0,
+ UInt8 => 0,
+ UInt16 => 0,
+ UInt32 => 0,
+ UInt64 => 0,
+ Float16 => 0,
+ Float32 => 0,
+ Float64 => 0,
+ Timestamp(t, s) => t.heap_size(ctx) + s.heap_size(ctx),
+ Date32 => 0,
+ Date64 => 0,
+ Time32(t) => t.heap_size(ctx),
+ Time64(t) => t.heap_size(ctx),
+ Duration(t) => t.heap_size(ctx),
+ Interval(i) => i.heap_size(ctx),
+ Binary => 0,
+ FixedSizeBinary(i) => i.heap_size(ctx),
+ LargeBinary => 0,
+ BinaryView => 0,
+ Utf8 => 0,
+ LargeUtf8 => 0,
+ Utf8View => 0,
+ List(v) => v.heap_size(ctx),
+ ListView(v) => v.heap_size(ctx),
+ FixedSizeList(f, i) => f.heap_size(ctx) + i.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ LargeListView(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Union(u, m) => u.heap_size(ctx) + m.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ Decimal32(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal64(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal128(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal256(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Map(m, b) => m.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Vec<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let item_size = size_of::<T>();
+ // account for the contents of the Vec
+ (self.capacity() * item_size) +
+ // add any heap allocations by contents
+ self.iter().map(|t| t.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let capacity = self.capacity();
+ if capacity == 0 {
+ return 0;
+ }
+
+ // HashMap doesn't provide a way to get its heap size, so this is an
approximation based on
+ // the behavior of hashbrown::HashMap as at version 0.16.0, and may
become inaccurate
+ // if the implementation changes.
+ let key_val_size = size_of::<(K, V)>();
+ // Overhead for the control tags group, which may be smaller depending
on architecture
+ let group_size = 16;
+ // 1 byte of metadata stored per bucket.
+ let metadata_size = 1;
+
+ // Compute the number of buckets for the capacity. Based on
hashbrown's capacity_to_buckets
+ let buckets = if capacity < 15 {
+ let min_cap = match key_val_size {
+ 0..=1 => 14,
+ 2..=3 => 7,
+ _ => 3,
+ };
+ let cap = min_cap.max(capacity);
+ if cap < 4 {
+ 4
+ } else if cap < 8 {
+ 8
+ } else {
+ 16
+ }
+ } else {
+ (capacity.saturating_mul(8) / 7).next_power_of_two()
+ };
+
+ group_size
+ + (buckets * (key_val_size + metadata_size))
+ + self.keys().map(|k| k.heap_size(ctx)).sum::<usize>()
+ + self.values().map(|v| v.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Arc<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
+
+ if !ctx.seen.insert(ptr) {
+ return 0;
+ }
+
+ // Arc stores weak and strong counts on the heap alongside an instance
of T
+ 2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for Arc<str> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
Review Comment:
```suggestion
let ptr = Arc::as_ptr(self) as usize;
```
##########
docs/source/library-user-guide/upgrading/54.0.0.md:
##########
@@ -460,3 +460,42 @@ impl Default for MyTreeNode {
}
}
```
+
+[20047]: https://github.com/apache/datafusion/pull/20047
+
+### File statistics cache is now memory-limited and managed by the
`CacheManager`
+
+The file statistics cache used by `ListingTable` is now memory-limited and
+centrally managed through the `CacheManager`.
+
+To configure the cache size use the `file_statistics_cache_limit` setting:
+
+```sql
+SET datafusion.runtime.file_statistics_cache_limit = '10MB'
+```
+
+To disable the file statistics cache, set the limit to 0.
+
+The file statistics cache is no longer created inside the `ListingTable`.
+Instead, it is created within the `CacheManager` and must be passed to the
`ListingTable`.
+
+**Who is affected:**
+
+- Users who want to limit the memory usage of the file statistics cache.
+- Users who want to disable the file statistics cache.
+- Users who want to create a `ListingTable` programmatically with a file
statistics cache.
+
+**Migration guide:**
+
+Disable the cache by setting the configuration value to 0:
Review Comment:
`0` is misleading here.
`parse_capacity_limit()` looks for a unit, so it really should be:
```suggestion
Disable the cache by setting the configuration value to '0K':
```
But now it looks like `OK` (Okay) :-/
Maybe `parse_capacity_limit()` should be improved to make the unit optional
?! Or at least the reading of file_statistics_cache_limit should normalize `0`
to `0K` ?!
##########
datafusion/common/src/heap_size.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::stats::Precision;
+use crate::{ColumnStatistics, ScalarValue, Statistics, TableReference};
+use arrow::array::{
+ Array, FixedSizeListArray, LargeListArray, LargeListViewArray, ListArray,
+ ListViewArray, MapArray, StructArray,
+};
+use arrow::datatypes::{
+ DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano,
IntervalUnit,
+ TimeUnit, UnionFields, UnionMode, i256,
+};
+use chrono::{DateTime, Utc};
+use half::f16;
+use hashbrown::HashSet;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This is a temporary solution until
<https://github.com/apache/datafusion/pull/19599> and
+/// <https://github.com/apache/arrow-rs/pull/9138> are resolved.
+/// Trait for calculating the size of various containers
+pub trait DFHeapSize {
+ /// Return the size of any bytes allocated on the heap by this object,
+ /// including heap memory in those structures
+ ///
+ /// Note that the size of the type itself is not included in the result --
+ /// instead, that size is added by the caller (e.g. container).
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize;
+}
+
+#[derive(Default)]
+pub struct DFHeapSizeCtx {
+ seen: HashSet<usize>,
+}
+
+impl DFHeapSize for Statistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.num_rows.heap_size(ctx)
+ + self.total_byte_size.heap_size(ctx)
+ + self.column_statistics.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for TableReference {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ match self {
+ TableReference::Bare { table } => table.heap_size(ctx),
+ TableReference::Partial { schema, table } => {
+ schema.heap_size(ctx) + table.heap_size(ctx)
+ }
+ TableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => catalog.heap_size(ctx) + schema.heap_size(ctx) +
table.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize
+ for Precision<T>
+{
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.get_value().map_or_else(|| 0, |v| v.heap_size(ctx))
+ }
+}
+
+impl DFHeapSize for ColumnStatistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.null_count.heap_size(ctx)
+ + self.max_value.heap_size(ctx)
+ + self.min_value.heap_size(ctx)
+ + self.sum_value.heap_size(ctx)
+ + self.distinct_count.heap_size(ctx)
+ + self.byte_size.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for ScalarValue {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use crate::scalar::ScalarValue::*;
+ match self {
+ Null => 0,
+ Boolean(b) => b.heap_size(ctx),
+ Float16(f) => f.heap_size(ctx),
+ Float32(f) => f.heap_size(ctx),
+ Float64(f) => f.heap_size(ctx),
+ Decimal32(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal64(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal128(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal256(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Int8(i) => i.heap_size(ctx),
+ Int16(i) => i.heap_size(ctx),
+ Int32(i) => i.heap_size(ctx),
+ Int64(i) => i.heap_size(ctx),
+ UInt8(u) => u.heap_size(ctx),
+ UInt16(u) => u.heap_size(ctx),
+ UInt32(u) => u.heap_size(ctx),
+ UInt64(u) => u.heap_size(ctx),
+ Utf8(u) => u.heap_size(ctx),
+ Utf8View(u) => u.heap_size(ctx),
+ LargeUtf8(l) => l.heap_size(ctx),
+ Binary(b) => b.heap_size(ctx),
+ BinaryView(b) => b.heap_size(ctx),
+ FixedSizeBinary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ LargeBinary(l) => l.heap_size(ctx),
+ FixedSizeList(f) => f.heap_size(ctx),
+ List(l) => l.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Map(m) => m.heap_size(ctx),
+ Date32(d) => d.heap_size(ctx),
+ Date64(d) => d.heap_size(ctx),
+ Time32Second(t) => t.heap_size(ctx),
+ Time32Millisecond(t) => t.heap_size(ctx),
+ Time64Microsecond(t) => t.heap_size(ctx),
+ Time64Nanosecond(t) => t.heap_size(ctx),
+ TimestampSecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMillisecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMicrosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampNanosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ IntervalYearMonth(i) => i.heap_size(ctx),
+ IntervalDayTime(i) => i.heap_size(ctx),
+ IntervalMonthDayNano(i) => i.heap_size(ctx),
+ DurationSecond(d) => d.heap_size(ctx),
+ DurationMillisecond(d) => d.heap_size(ctx),
+ DurationMicrosecond(d) => d.heap_size(ctx),
+ DurationNanosecond(d) => d.heap_size(ctx),
+ Union(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b, c) => {
+ a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx)
+ }
+ ListView(a) => a.heap_size(ctx),
+ LargeListView(a) => a.heap_size(ctx),
+ }
+ }
+}
+
+impl DFHeapSize for DataType {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use DataType::*;
+ match self {
+ Null => 0,
+ Boolean => 0,
+ Int8 => 0,
+ Int16 => 0,
+ Int32 => 0,
+ Int64 => 0,
+ UInt8 => 0,
+ UInt16 => 0,
+ UInt32 => 0,
+ UInt64 => 0,
+ Float16 => 0,
+ Float32 => 0,
+ Float64 => 0,
+ Timestamp(t, s) => t.heap_size(ctx) + s.heap_size(ctx),
+ Date32 => 0,
+ Date64 => 0,
+ Time32(t) => t.heap_size(ctx),
+ Time64(t) => t.heap_size(ctx),
+ Duration(t) => t.heap_size(ctx),
+ Interval(i) => i.heap_size(ctx),
+ Binary => 0,
+ FixedSizeBinary(i) => i.heap_size(ctx),
+ LargeBinary => 0,
+ BinaryView => 0,
+ Utf8 => 0,
+ LargeUtf8 => 0,
+ Utf8View => 0,
+ List(v) => v.heap_size(ctx),
+ ListView(v) => v.heap_size(ctx),
+ FixedSizeList(f, i) => f.heap_size(ctx) + i.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ LargeListView(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Union(u, m) => u.heap_size(ctx) + m.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ Decimal32(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal64(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal128(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal256(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Map(m, b) => m.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Vec<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let item_size = size_of::<T>();
+ // account for the contents of the Vec
+ (self.capacity() * item_size) +
+ // add any heap allocations by contents
+ self.iter().map(|t| t.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let capacity = self.capacity();
+ if capacity == 0 {
+ return 0;
+ }
+
+ // HashMap doesn't provide a way to get its heap size, so this is an
approximation based on
+ // the behavior of hashbrown::HashMap as at version 0.16.0, and may
become inaccurate
+ // if the implementation changes.
+ let key_val_size = size_of::<(K, V)>();
+ // Overhead for the control tags group, which may be smaller depending
on architecture
+ let group_size = 16;
+ // 1 byte of metadata stored per bucket.
+ let metadata_size = 1;
+
+ // Compute the number of buckets for the capacity. Based on
hashbrown's capacity_to_buckets
+ let buckets = if capacity < 15 {
+ let min_cap = match key_val_size {
+ 0..=1 => 14,
+ 2..=3 => 7,
+ _ => 3,
+ };
+ let cap = min_cap.max(capacity);
+ if cap < 4 {
+ 4
+ } else if cap < 8 {
+ 8
+ } else {
+ 16
+ }
+ } else {
+ (capacity.saturating_mul(8) / 7).next_power_of_two()
+ };
+
+ group_size
+ + (buckets * (key_val_size + metadata_size))
+ + self.keys().map(|k| k.heap_size(ctx)).sum::<usize>()
+ + self.values().map(|v| v.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Arc<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
+
+ if !ctx.seen.insert(ptr) {
+ return 0;
+ }
+
+ // Arc stores weak and strong counts on the heap alongside an instance
of T
+ 2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for Arc<str> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
+
+ if !ctx.seen.insert(ptr) {
+ return 0;
+ }
+
+ // Arc stores weak and strong counts on the heap alongside an instance
of T
+ 2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for Arc<dyn DFHeapSize> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
+
+ if !ctx.seen.insert(ptr) {
+ return 0;
+ }
+
+ // Arc stores weak and strong counts on the heap alongside an instance
of T
+ 2 * size_of::<usize>() + size_of_val(self.as_ref()) +
self.as_ref().heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for Fields {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.into_iter().map(|f| f.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl DFHeapSize for StructArray {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.get_array_memory_size()
+ }
+}
+
+impl DFHeapSize for LargeListArray {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.get_array_memory_size()
+ }
+}
+
+impl DFHeapSize for LargeListViewArray {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.get_array_memory_size()
+ }
+}
+
+impl DFHeapSize for ListArray {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.get_array_memory_size()
+ }
+}
+
+impl DFHeapSize for ListViewArray {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.get_array_memory_size()
+ }
+}
+
+impl DFHeapSize for FixedSizeListArray {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.get_array_memory_size()
+ }
+}
+impl DFHeapSize for MapArray {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.get_array_memory_size()
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Box<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ size_of::<T>() + self.as_ref().heap_size(ctx)
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Option<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.as_ref().map(|inner| inner.heap_size(ctx)).unwrap_or(0)
+ }
+}
+
+impl<A, B> DFHeapSize for (A, B)
+where
+ A: DFHeapSize,
+ B: DFHeapSize,
+{
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.0.heap_size(ctx) + self.1.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for String {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.capacity()
+ }
+}
+
+impl DFHeapSize for str {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ self.len()
+ }
+}
+
+impl DFHeapSize for UnionFields {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.iter()
+ .map(|f| f.0.heap_size(ctx) + f.1.heap_size(ctx))
+ .sum()
+ }
+}
+
+impl DFHeapSize for UnionMode {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for TimeUnit {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for IntervalUnit {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for Field {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.name().heap_size(ctx)
+ + self.data_type().heap_size(ctx)
+ + self.is_nullable().heap_size(ctx)
+ + self.dict_is_ordered().heap_size(ctx)
+ + self.metadata().heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for IntervalMonthDayNano {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.days.heap_size(ctx)
+ + self.months.heap_size(ctx)
+ + self.nanoseconds.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for IntervalDayTime {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.days.heap_size(ctx) + self.milliseconds.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for DateTime<Utc> {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for bool {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+impl DFHeapSize for u8 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for u16 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for u32 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for u64 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for i8 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for i16 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for i32 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+impl DFHeapSize for i64 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for i128 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for i256 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for f16 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for f32 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+impl DFHeapSize for f64 {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl DFHeapSize for usize {
+ fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_heap_size_arc_avoid_double_accounting() {
+ let a1 = Arc::new(vec![1, 2, 3]);
+ let mut ctx = DFHeapSizeCtx::default();
+ let heap_size = a1.heap_size(&mut ctx);
+
+ let a2 = Arc::clone(&a1);
+ let a3 = Arc::clone(&a1);
+ let a4 = Arc::clone(&a3);
+
+ let mut ctx = DFHeapSizeCtx::default();
+ let heap_size_with_clones = a1.heap_size(&mut ctx)
+ + a2.heap_size(&mut ctx)
+ + a3.heap_size(&mut ctx)
+ + a4.heap_size(&mut ctx);
+
+ assert_eq!(heap_size, heap_size_with_clones);
+ }
+
+ #[test]
+ fn test_heap_size_arc_str_avoid_double_accounting() {
+ let a1 = Arc::new("Hello".to_string());
Review Comment:
This tests `Arc<String>`, not `Arc<str>`.
```suggestion
let a1 = Arc::from("Hello");
```
##########
datafusion/execution/src/cache/file_statistics_cache.rs:
##########
@@ -0,0 +1,745 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::cache_manager::{
+ CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheEntry,
+};
+use crate::cache::{CacheAccessor, TableScopedPath};
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+pub use crate::cache::DefaultFilesMetadataCache;
+use crate::cache::lru_queue::LruQueue;
+use datafusion_common::TableReference;
+use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
+
+/// Default implementation of [`FileStatisticsCache`]
+///
+/// Stores cached file metadata (statistics and orderings) for files.
+///
+/// The typical usage pattern is:
+/// 1. Call `get(path)` to check for cached value
+/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)`
+/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
+///
+/// # Internal details
+///
+/// The `memory_limit` controls the maximum size of the cache, which uses a
+/// Least Recently Used eviction algorithm. When adding a new entry, if the
total
+/// size of the cached entries exceeds `memory_limit`, the least recently used
entries
+/// are evicted until the total size is lower than `memory_limit`.
+///
+///
+/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
+#[derive(Default)]
+pub struct DefaultFileStatisticsCache {
+ state: Mutex<DefaultFileStatisticsCacheState>,
+}
+
+impl DefaultFileStatisticsCache {
+ pub fn new(memory_limit: usize) -> Self {
+ Self {
+ state:
Mutex::new(DefaultFileStatisticsCacheState::new(memory_limit)),
+ }
+ }
+
+ /// Returns the size of the cached memory, in bytes.
+ pub fn memory_used(&self) -> usize {
+ let state = self.state.lock().unwrap();
+ state.memory_used
+ }
+}
+
+struct DefaultFileStatisticsCacheState {
+ lru_queue: LruQueue<TableScopedPath, CachedFileMetadata>,
+ memory_limit: usize,
+ memory_used: usize,
+}
+
+pub const DEFAULT_FILE_STATISTICS_MEMORY_LIMIT: usize = 20 * 1024 * 1024; //
20MiB
+
+impl Default for DefaultFileStatisticsCacheState {
+ fn default() -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ memory_limit: DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
+ memory_used: 0,
+ }
+ }
+}
+
+impl DefaultFileStatisticsCacheState {
+ fn new(memory_limit: usize) -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ memory_limit,
+ memory_used: 0,
+ }
+ }
+ fn get(&mut self, key: &TableScopedPath) -> Option<CachedFileMetadata> {
+ self.lru_queue.get(key).cloned()
+ }
+
+ fn put(
+ &mut self,
+ key: &TableScopedPath,
+ value: CachedFileMetadata,
+ ) -> Option<CachedFileMetadata> {
+ let mut ctx = DFHeapSizeCtx::default();
+ let key_size = key.heap_size(&mut ctx);
+ let entry_size = value.heap_size(&mut ctx);
+
+ if entry_size + key_size > self.memory_limit {
+ // Remove potential stale entry
+ self.remove(key);
+ return None;
Review Comment:
```suggestion
remove self.remove(key);
```
Return the old value if there was an entry.
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -448,14 +495,20 @@ impl Default for CacheManagerConfig {
}
impl CacheManagerConfig {
- /// Set the cache for files statistics.
+ /// Set the cache for file statistics.
///
/// Default is `None` (disabled).
Review Comment:
https://github.com/apache/datafusion/pull/20047/changes#diff-f9f5864c305467f36afedd7af56430f2f19cf70c51a8780459e41b7d2727d7daR364-R368
creates a cache depending on `file_statistics_cache_limit`
##########
datafusion/common/src/heap_size.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::stats::Precision;
+use crate::{ColumnStatistics, ScalarValue, Statistics, TableReference};
+use arrow::array::{
+ Array, FixedSizeListArray, LargeListArray, LargeListViewArray, ListArray,
+ ListViewArray, MapArray, StructArray,
+};
+use arrow::datatypes::{
+ DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano,
IntervalUnit,
+ TimeUnit, UnionFields, UnionMode, i256,
+};
+use chrono::{DateTime, Utc};
+use half::f16;
+use hashbrown::HashSet;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This is a temporary solution until
<https://github.com/apache/datafusion/pull/19599> and
+/// <https://github.com/apache/arrow-rs/pull/9138> are resolved.
+/// Trait for calculating the size of various containers
+pub trait DFHeapSize {
+ /// Return the size of any bytes allocated on the heap by this object,
+ /// including heap memory in those structures
+ ///
+ /// Note that the size of the type itself is not included in the result --
+ /// instead, that size is added by the caller (e.g. container).
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize;
+}
+
+#[derive(Default)]
+pub struct DFHeapSizeCtx {
+ seen: HashSet<usize>,
+}
+
+impl DFHeapSize for Statistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.num_rows.heap_size(ctx)
+ + self.total_byte_size.heap_size(ctx)
+ + self.column_statistics.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for TableReference {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ match self {
+ TableReference::Bare { table } => table.heap_size(ctx),
+ TableReference::Partial { schema, table } => {
+ schema.heap_size(ctx) + table.heap_size(ctx)
+ }
+ TableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => catalog.heap_size(ctx) + schema.heap_size(ctx) +
table.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize
+ for Precision<T>
+{
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.get_value().map_or_else(|| 0, |v| v.heap_size(ctx))
+ }
+}
+
+impl DFHeapSize for ColumnStatistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.null_count.heap_size(ctx)
+ + self.max_value.heap_size(ctx)
+ + self.min_value.heap_size(ctx)
+ + self.sum_value.heap_size(ctx)
+ + self.distinct_count.heap_size(ctx)
+ + self.byte_size.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for ScalarValue {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use crate::scalar::ScalarValue::*;
+ match self {
+ Null => 0,
+ Boolean(b) => b.heap_size(ctx),
+ Float16(f) => f.heap_size(ctx),
+ Float32(f) => f.heap_size(ctx),
+ Float64(f) => f.heap_size(ctx),
+ Decimal32(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal64(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal128(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal256(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Int8(i) => i.heap_size(ctx),
+ Int16(i) => i.heap_size(ctx),
+ Int32(i) => i.heap_size(ctx),
+ Int64(i) => i.heap_size(ctx),
+ UInt8(u) => u.heap_size(ctx),
+ UInt16(u) => u.heap_size(ctx),
+ UInt32(u) => u.heap_size(ctx),
+ UInt64(u) => u.heap_size(ctx),
+ Utf8(u) => u.heap_size(ctx),
+ Utf8View(u) => u.heap_size(ctx),
+ LargeUtf8(l) => l.heap_size(ctx),
+ Binary(b) => b.heap_size(ctx),
+ BinaryView(b) => b.heap_size(ctx),
+ FixedSizeBinary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ LargeBinary(l) => l.heap_size(ctx),
+ FixedSizeList(f) => f.heap_size(ctx),
+ List(l) => l.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Map(m) => m.heap_size(ctx),
+ Date32(d) => d.heap_size(ctx),
+ Date64(d) => d.heap_size(ctx),
+ Time32Second(t) => t.heap_size(ctx),
+ Time32Millisecond(t) => t.heap_size(ctx),
+ Time64Microsecond(t) => t.heap_size(ctx),
+ Time64Nanosecond(t) => t.heap_size(ctx),
+ TimestampSecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMillisecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMicrosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampNanosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ IntervalYearMonth(i) => i.heap_size(ctx),
+ IntervalDayTime(i) => i.heap_size(ctx),
+ IntervalMonthDayNano(i) => i.heap_size(ctx),
+ DurationSecond(d) => d.heap_size(ctx),
+ DurationMillisecond(d) => d.heap_size(ctx),
+ DurationMicrosecond(d) => d.heap_size(ctx),
+ DurationNanosecond(d) => d.heap_size(ctx),
+ Union(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b, c) => {
+ a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx)
+ }
+ ListView(a) => a.heap_size(ctx),
+ LargeListView(a) => a.heap_size(ctx),
+ }
+ }
+}
+
+impl DFHeapSize for DataType {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use DataType::*;
+ match self {
+ Null => 0,
+ Boolean => 0,
+ Int8 => 0,
+ Int16 => 0,
+ Int32 => 0,
+ Int64 => 0,
+ UInt8 => 0,
+ UInt16 => 0,
+ UInt32 => 0,
+ UInt64 => 0,
+ Float16 => 0,
+ Float32 => 0,
+ Float64 => 0,
+ Timestamp(t, s) => t.heap_size(ctx) + s.heap_size(ctx),
+ Date32 => 0,
+ Date64 => 0,
+ Time32(t) => t.heap_size(ctx),
+ Time64(t) => t.heap_size(ctx),
+ Duration(t) => t.heap_size(ctx),
+ Interval(i) => i.heap_size(ctx),
+ Binary => 0,
+ FixedSizeBinary(i) => i.heap_size(ctx),
+ LargeBinary => 0,
+ BinaryView => 0,
+ Utf8 => 0,
+ LargeUtf8 => 0,
+ Utf8View => 0,
+ List(v) => v.heap_size(ctx),
+ ListView(v) => v.heap_size(ctx),
+ FixedSizeList(f, i) => f.heap_size(ctx) + i.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ LargeListView(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Union(u, m) => u.heap_size(ctx) + m.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ Decimal32(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal64(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal128(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal256(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Map(m, b) => m.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Vec<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let item_size = size_of::<T>();
+ // account for the contents of the Vec
+ (self.capacity() * item_size) +
+ // add any heap allocations by contents
+ self.iter().map(|t| t.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let capacity = self.capacity();
+ if capacity == 0 {
+ return 0;
+ }
+
+ // HashMap doesn't provide a way to get its heap size, so this is an
approximation based on
+ // the behavior of hashbrown::HashMap as at version 0.16.0, and may
become inaccurate
+ // if the implementation changes.
+ let key_val_size = size_of::<(K, V)>();
+ // Overhead for the control tags group, which may be smaller depending
on architecture
+ let group_size = 16;
+ // 1 byte of metadata stored per bucket.
+ let metadata_size = 1;
+
+ // Compute the number of buckets for the capacity. Based on
hashbrown's capacity_to_buckets
+ let buckets = if capacity < 15 {
+ let min_cap = match key_val_size {
+ 0..=1 => 14,
+ 2..=3 => 7,
+ _ => 3,
+ };
+ let cap = min_cap.max(capacity);
+ if cap < 4 {
+ 4
+ } else if cap < 8 {
+ 8
+ } else {
+ 16
+ }
+ } else {
+ (capacity.saturating_mul(8) / 7).next_power_of_two()
+ };
+
+ group_size
+ + (buckets * (key_val_size + metadata_size))
+ + self.keys().map(|k| k.heap_size(ctx)).sum::<usize>()
+ + self.values().map(|v| v.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Arc<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
+
+ if !ctx.seen.insert(ptr) {
+ return 0;
+ }
+
+ // Arc stores weak and strong counts on the heap alongside an instance
of T
+ 2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
Review Comment:
```suggestion
2 * size_of::<usize>() + size_of::<T>() +
self.as_ref().heap_size(ctx)
```
##########
datafusion/common/src/heap_size.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::stats::Precision;
+use crate::{ColumnStatistics, ScalarValue, Statistics, TableReference};
+use arrow::array::{
+ Array, FixedSizeListArray, LargeListArray, LargeListViewArray, ListArray,
+ ListViewArray, MapArray, StructArray,
+};
+use arrow::datatypes::{
+ DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano,
IntervalUnit,
+ TimeUnit, UnionFields, UnionMode, i256,
+};
+use chrono::{DateTime, Utc};
+use half::f16;
+use hashbrown::HashSet;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This is a temporary solution until
<https://github.com/apache/datafusion/pull/19599> and
+/// <https://github.com/apache/arrow-rs/pull/9138> are resolved.
+/// Trait for calculating the size of various containers
+pub trait DFHeapSize {
+ /// Return the size of any bytes allocated on the heap by this object,
+ /// including heap memory in those structures
+ ///
+ /// Note that the size of the type itself is not included in the result --
+ /// instead, that size is added by the caller (e.g. container).
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize;
+}
+
+#[derive(Default)]
+pub struct DFHeapSizeCtx {
+ seen: HashSet<usize>,
+}
+
+impl DFHeapSize for Statistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.num_rows.heap_size(ctx)
+ + self.total_byte_size.heap_size(ctx)
+ + self.column_statistics.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for TableReference {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ match self {
+ TableReference::Bare { table } => table.heap_size(ctx),
+ TableReference::Partial { schema, table } => {
+ schema.heap_size(ctx) + table.heap_size(ctx)
+ }
+ TableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => catalog.heap_size(ctx) + schema.heap_size(ctx) +
table.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize
+ for Precision<T>
+{
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.get_value().map_or_else(|| 0, |v| v.heap_size(ctx))
+ }
+}
+
+impl DFHeapSize for ColumnStatistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.null_count.heap_size(ctx)
+ + self.max_value.heap_size(ctx)
+ + self.min_value.heap_size(ctx)
+ + self.sum_value.heap_size(ctx)
+ + self.distinct_count.heap_size(ctx)
+ + self.byte_size.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for ScalarValue {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use crate::scalar::ScalarValue::*;
+ match self {
+ Null => 0,
+ Boolean(b) => b.heap_size(ctx),
+ Float16(f) => f.heap_size(ctx),
+ Float32(f) => f.heap_size(ctx),
+ Float64(f) => f.heap_size(ctx),
+ Decimal32(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal64(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal128(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal256(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Int8(i) => i.heap_size(ctx),
+ Int16(i) => i.heap_size(ctx),
+ Int32(i) => i.heap_size(ctx),
+ Int64(i) => i.heap_size(ctx),
+ UInt8(u) => u.heap_size(ctx),
+ UInt16(u) => u.heap_size(ctx),
+ UInt32(u) => u.heap_size(ctx),
+ UInt64(u) => u.heap_size(ctx),
+ Utf8(u) => u.heap_size(ctx),
+ Utf8View(u) => u.heap_size(ctx),
+ LargeUtf8(l) => l.heap_size(ctx),
+ Binary(b) => b.heap_size(ctx),
+ BinaryView(b) => b.heap_size(ctx),
+ FixedSizeBinary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ LargeBinary(l) => l.heap_size(ctx),
+ FixedSizeList(f) => f.heap_size(ctx),
+ List(l) => l.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Map(m) => m.heap_size(ctx),
+ Date32(d) => d.heap_size(ctx),
+ Date64(d) => d.heap_size(ctx),
+ Time32Second(t) => t.heap_size(ctx),
+ Time32Millisecond(t) => t.heap_size(ctx),
+ Time64Microsecond(t) => t.heap_size(ctx),
+ Time64Nanosecond(t) => t.heap_size(ctx),
+ TimestampSecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMillisecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMicrosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampNanosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ IntervalYearMonth(i) => i.heap_size(ctx),
+ IntervalDayTime(i) => i.heap_size(ctx),
+ IntervalMonthDayNano(i) => i.heap_size(ctx),
+ DurationSecond(d) => d.heap_size(ctx),
+ DurationMillisecond(d) => d.heap_size(ctx),
+ DurationMicrosecond(d) => d.heap_size(ctx),
+ DurationNanosecond(d) => d.heap_size(ctx),
+ Union(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b, c) => {
+ a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx)
+ }
+ ListView(a) => a.heap_size(ctx),
+ LargeListView(a) => a.heap_size(ctx),
+ }
+ }
+}
+
+impl DFHeapSize for DataType {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use DataType::*;
+ match self {
+ Null => 0,
+ Boolean => 0,
+ Int8 => 0,
+ Int16 => 0,
+ Int32 => 0,
+ Int64 => 0,
+ UInt8 => 0,
+ UInt16 => 0,
+ UInt32 => 0,
+ UInt64 => 0,
+ Float16 => 0,
+ Float32 => 0,
+ Float64 => 0,
+ Timestamp(t, s) => t.heap_size(ctx) + s.heap_size(ctx),
+ Date32 => 0,
+ Date64 => 0,
+ Time32(t) => t.heap_size(ctx),
+ Time64(t) => t.heap_size(ctx),
+ Duration(t) => t.heap_size(ctx),
+ Interval(i) => i.heap_size(ctx),
+ Binary => 0,
+ FixedSizeBinary(i) => i.heap_size(ctx),
+ LargeBinary => 0,
+ BinaryView => 0,
+ Utf8 => 0,
+ LargeUtf8 => 0,
+ Utf8View => 0,
+ List(v) => v.heap_size(ctx),
+ ListView(v) => v.heap_size(ctx),
+ FixedSizeList(f, i) => f.heap_size(ctx) + i.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ LargeListView(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Union(u, m) => u.heap_size(ctx) + m.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ Decimal32(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal64(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal128(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal256(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Map(m, b) => m.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Vec<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let item_size = size_of::<T>();
+ // account for the contents of the Vec
+ (self.capacity() * item_size) +
+ // add any heap allocations by contents
+ self.iter().map(|t| t.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let capacity = self.capacity();
+ if capacity == 0 {
+ return 0;
+ }
+
+ // HashMap doesn't provide a way to get its heap size, so this is an
approximation based on
+ // the behavior of hashbrown::HashMap as at version 0.16.0, and may
become inaccurate
+ // if the implementation changes.
+ let key_val_size = size_of::<(K, V)>();
+ // Overhead for the control tags group, which may be smaller depending
on architecture
+ let group_size = 16;
+ // 1 byte of metadata stored per bucket.
+ let metadata_size = 1;
+
+ // Compute the number of buckets for the capacity. Based on
hashbrown's capacity_to_buckets
+ let buckets = if capacity < 15 {
+ let min_cap = match key_val_size {
+ 0..=1 => 14,
+ 2..=3 => 7,
+ _ => 3,
+ };
+ let cap = min_cap.max(capacity);
+ if cap < 4 {
+ 4
+ } else if cap < 8 {
+ 8
+ } else {
+ 16
+ }
+ } else {
+ (capacity.saturating_mul(8) / 7).next_power_of_two()
+ };
+
+ group_size
+ + (buckets * (key_val_size + metadata_size))
+ + self.keys().map(|k| k.heap_size(ctx)).sum::<usize>()
+ + self.values().map(|v| v.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Arc<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
Review Comment:
```suggestion
let ptr = Arc::as_ptr(self) as usize;
```
I think the intermediate `as *const i32` is not needed.
Since `T` might be anything it should be `as *const ()` but again it is not
really needed here.
##########
datafusion/common/src/heap_size.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::stats::Precision;
+use crate::{ColumnStatistics, ScalarValue, Statistics, TableReference};
+use arrow::array::{
+ Array, FixedSizeListArray, LargeListArray, LargeListViewArray, ListArray,
+ ListViewArray, MapArray, StructArray,
+};
+use arrow::datatypes::{
+ DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano,
IntervalUnit,
+ TimeUnit, UnionFields, UnionMode, i256,
+};
+use chrono::{DateTime, Utc};
+use half::f16;
+use hashbrown::HashSet;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This is a temporary solution until
<https://github.com/apache/datafusion/pull/19599> and
+/// <https://github.com/apache/arrow-rs/pull/9138> are resolved.
+/// Trait for calculating the size of various containers
+pub trait DFHeapSize {
+ /// Return the size of any bytes allocated on the heap by this object,
+ /// including heap memory in those structures
+ ///
+ /// Note that the size of the type itself is not included in the result --
+ /// instead, that size is added by the caller (e.g. container).
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize;
+}
+
+#[derive(Default)]
+pub struct DFHeapSizeCtx {
+ seen: HashSet<usize>,
+}
+
+impl DFHeapSize for Statistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.num_rows.heap_size(ctx)
+ + self.total_byte_size.heap_size(ctx)
+ + self.column_statistics.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for TableReference {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ match self {
+ TableReference::Bare { table } => table.heap_size(ctx),
+ TableReference::Partial { schema, table } => {
+ schema.heap_size(ctx) + table.heap_size(ctx)
+ }
+ TableReference::Full {
+ catalog,
+ schema,
+ table,
+ } => catalog.heap_size(ctx) + schema.heap_size(ctx) +
table.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize
+ for Precision<T>
+{
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.get_value().map_or_else(|| 0, |v| v.heap_size(ctx))
+ }
+}
+
+impl DFHeapSize for ColumnStatistics {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ self.null_count.heap_size(ctx)
+ + self.max_value.heap_size(ctx)
+ + self.min_value.heap_size(ctx)
+ + self.sum_value.heap_size(ctx)
+ + self.distinct_count.heap_size(ctx)
+ + self.byte_size.heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for ScalarValue {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use crate::scalar::ScalarValue::*;
+ match self {
+ Null => 0,
+ Boolean(b) => b.heap_size(ctx),
+ Float16(f) => f.heap_size(ctx),
+ Float32(f) => f.heap_size(ctx),
+ Float64(f) => f.heap_size(ctx),
+ Decimal32(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal64(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal128(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Decimal256(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Int8(i) => i.heap_size(ctx),
+ Int16(i) => i.heap_size(ctx),
+ Int32(i) => i.heap_size(ctx),
+ Int64(i) => i.heap_size(ctx),
+ UInt8(u) => u.heap_size(ctx),
+ UInt16(u) => u.heap_size(ctx),
+ UInt32(u) => u.heap_size(ctx),
+ UInt64(u) => u.heap_size(ctx),
+ Utf8(u) => u.heap_size(ctx),
+ Utf8View(u) => u.heap_size(ctx),
+ LargeUtf8(l) => l.heap_size(ctx),
+ Binary(b) => b.heap_size(ctx),
+ BinaryView(b) => b.heap_size(ctx),
+ FixedSizeBinary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ LargeBinary(l) => l.heap_size(ctx),
+ FixedSizeList(f) => f.heap_size(ctx),
+ List(l) => l.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Map(m) => m.heap_size(ctx),
+ Date32(d) => d.heap_size(ctx),
+ Date64(d) => d.heap_size(ctx),
+ Time32Second(t) => t.heap_size(ctx),
+ Time32Millisecond(t) => t.heap_size(ctx),
+ Time64Microsecond(t) => t.heap_size(ctx),
+ Time64Nanosecond(t) => t.heap_size(ctx),
+ TimestampSecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMillisecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampMicrosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ TimestampNanosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ IntervalYearMonth(i) => i.heap_size(ctx),
+ IntervalDayTime(i) => i.heap_size(ctx),
+ IntervalMonthDayNano(i) => i.heap_size(ctx),
+ DurationSecond(d) => d.heap_size(ctx),
+ DurationMillisecond(d) => d.heap_size(ctx),
+ DurationMicrosecond(d) => d.heap_size(ctx),
+ DurationNanosecond(d) => d.heap_size(ctx),
+ Union(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) +
c.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b, c) => {
+ a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx)
+ }
+ ListView(a) => a.heap_size(ctx),
+ LargeListView(a) => a.heap_size(ctx),
+ }
+ }
+}
+
+impl DFHeapSize for DataType {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ use DataType::*;
+ match self {
+ Null => 0,
+ Boolean => 0,
+ Int8 => 0,
+ Int16 => 0,
+ Int32 => 0,
+ Int64 => 0,
+ UInt8 => 0,
+ UInt16 => 0,
+ UInt32 => 0,
+ UInt64 => 0,
+ Float16 => 0,
+ Float32 => 0,
+ Float64 => 0,
+ Timestamp(t, s) => t.heap_size(ctx) + s.heap_size(ctx),
+ Date32 => 0,
+ Date64 => 0,
+ Time32(t) => t.heap_size(ctx),
+ Time64(t) => t.heap_size(ctx),
+ Duration(t) => t.heap_size(ctx),
+ Interval(i) => i.heap_size(ctx),
+ Binary => 0,
+ FixedSizeBinary(i) => i.heap_size(ctx),
+ LargeBinary => 0,
+ BinaryView => 0,
+ Utf8 => 0,
+ LargeUtf8 => 0,
+ Utf8View => 0,
+ List(v) => v.heap_size(ctx),
+ ListView(v) => v.heap_size(ctx),
+ FixedSizeList(f, i) => f.heap_size(ctx) + i.heap_size(ctx),
+ LargeList(l) => l.heap_size(ctx),
+ LargeListView(l) => l.heap_size(ctx),
+ Struct(s) => s.heap_size(ctx),
+ Union(u, m) => u.heap_size(ctx) + m.heap_size(ctx),
+ Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ Decimal32(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal64(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal128(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Decimal256(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
+ Map(m, b) => m.heap_size(ctx) + b.heap_size(ctx),
+ RunEndEncoded(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
+ }
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Vec<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let item_size = size_of::<T>();
+ // account for the contents of the Vec
+ (self.capacity() * item_size) +
+ // add any heap allocations by contents
+ self.iter().map(|t| t.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let capacity = self.capacity();
+ if capacity == 0 {
+ return 0;
+ }
+
+ // HashMap doesn't provide a way to get its heap size, so this is an
approximation based on
+ // the behavior of hashbrown::HashMap as at version 0.16.0, and may
become inaccurate
+ // if the implementation changes.
+ let key_val_size = size_of::<(K, V)>();
+ // Overhead for the control tags group, which may be smaller depending
on architecture
+ let group_size = 16;
+ // 1 byte of metadata stored per bucket.
+ let metadata_size = 1;
+
+ // Compute the number of buckets for the capacity. Based on
hashbrown's capacity_to_buckets
+ let buckets = if capacity < 15 {
+ let min_cap = match key_val_size {
+ 0..=1 => 14,
+ 2..=3 => 7,
+ _ => 3,
+ };
+ let cap = min_cap.max(capacity);
+ if cap < 4 {
+ 4
+ } else if cap < 8 {
+ 8
+ } else {
+ 16
+ }
+ } else {
+ (capacity.saturating_mul(8) / 7).next_power_of_two()
+ };
+
+ group_size
+ + (buckets * (key_val_size + metadata_size))
+ + self.keys().map(|k| k.heap_size(ctx)).sum::<usize>()
+ + self.values().map(|v| v.heap_size(ctx)).sum::<usize>()
+ }
+}
+
+impl<T: DFHeapSize> DFHeapSize for Arc<T> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
+
+ if !ctx.seen.insert(ptr) {
+ return 0;
+ }
+
+ // Arc stores weak and strong counts on the heap alongside an instance
of T
+ 2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for Arc<str> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
+
+ if !ctx.seen.insert(ptr) {
+ return 0;
+ }
+
+ // Arc stores weak and strong counts on the heap alongside an instance
of T
+ 2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
+ }
+}
+
+impl DFHeapSize for Arc<dyn DFHeapSize> {
+ fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
+ let ptr = Arc::as_ptr(self) as *const i32 as usize;
Review Comment:
```suggestion
let ptr = Arc::as_ptr(self) as usize;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]