This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new 2c0a38be0e [branch-53] ser/de fetch in FilterExec (#20738) (#20883)
2c0a38be0e is described below
commit 2c0a38be0e18b5e8a2a6416721673e16e6d250d5
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 08:53:13 2026 -0400
[branch-53] ser/de fetch in FilterExec (#20738) (#20883)
- Part of https://github.com/apache/datafusion/issues/19692
- Closes https://github.com/apache/datafusion/issues/20737 on branch-53
This PR:
- Backports https://github.com/apache/datafusion/pull/20738 from
@haohuaijin to the branch-53 line
Co-authored-by: Huaijin <[email protected]>
---
datafusion/physical-plan/src/filter.rs | 4 ++++
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 19 +++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 2 ++
datafusion/proto/src/physical_plan/mod.rs | 2 ++
.../proto/tests/cases/roundtrip_physical_plan.rs | 13 +++++++++++++
6 files changed, 41 insertions(+)
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index af7bcc8e3b..fac6fa1e7c 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -718,6 +718,10 @@ impl ExecutionPlan for FilterExec {
})
}
+ fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+
fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
Some(Arc::new(Self {
predicate: Arc::clone(&self.predicate),
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 7c02688676..37b31a84de 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1030,6 +1030,7 @@ message FilterExecNode {
uint32 default_filter_selectivity = 3;
repeated uint32 projection = 9;
uint32 batch_size = 10;
+ optional uint32 fetch = 11;
}
message FileGroup {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 5b2b9133ce..419105c40c 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6894,6 +6894,9 @@ impl serde::Serialize for FilterExecNode {
if self.batch_size != 0 {
len += 1;
}
+ if self.fetch.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.FilterExecNode", len)?;
if let Some(v) = self.input.as_ref() {
struct_ser.serialize_field("input", v)?;
@@ -6910,6 +6913,9 @@ impl serde::Serialize for FilterExecNode {
if self.batch_size != 0 {
struct_ser.serialize_field("batchSize", &self.batch_size)?;
}
+ if let Some(v) = self.fetch.as_ref() {
+ struct_ser.serialize_field("fetch", v)?;
+ }
struct_ser.end()
}
}
@@ -6927,6 +6933,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
"projection",
"batch_size",
"batchSize",
+ "fetch",
];
#[allow(clippy::enum_variant_names)]
@@ -6936,6 +6943,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
DefaultFilterSelectivity,
Projection,
BatchSize,
+ Fetch,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -6962,6 +6970,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
"defaultFilterSelectivity" |
"default_filter_selectivity" => Ok(GeneratedField::DefaultFilterSelectivity),
"projection" => Ok(GeneratedField::Projection),
"batchSize" | "batch_size" =>
Ok(GeneratedField::BatchSize),
+ "fetch" => Ok(GeneratedField::Fetch),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -6986,6 +6995,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
let mut default_filter_selectivity__ = None;
let mut projection__ = None;
let mut batch_size__ = None;
+ let mut fetch__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Input => {
@@ -7025,6 +7035,14 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
+ GeneratedField::Fetch => {
+ if fetch__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("fetch"));
+ }
+ fetch__ =
+
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
x.0)
+ ;
+ }
}
}
Ok(FilterExecNode {
@@ -7033,6 +7051,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode {
default_filter_selectivity:
default_filter_selectivity__.unwrap_or_default(),
projection: projection__.unwrap_or_default(),
batch_size: batch_size__.unwrap_or_default(),
+ fetch: fetch__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index d9602665c2..a0d4ef9e97 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1563,6 +1563,8 @@ pub struct FilterExecNode {
pub projection: ::prost::alloc::vec::Vec<u32>,
#[prost(uint32, tag = "10")]
pub batch_size: u32,
+ #[prost(uint32, optional, tag = "11")]
+ pub fetch: ::core::option::Option<u32>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileGroup {
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index bfba715b91..47fa1319c5 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -691,6 +691,7 @@ impl protobuf::PhysicalPlanNode {
let filter = FilterExecBuilder::new(predicate, input)
.apply_projection(projection)?
.with_batch_size(filter.batch_size as usize)
+ .with_fetch(filter.fetch.map(|f| f as usize))
.build()?;
match filter_selectivity {
Ok(filter_selectivity) => Ok(Arc::new(
@@ -2320,6 +2321,7 @@ impl protobuf::PhysicalPlanNode {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
batch_size: exec.batch_size() as u32,
+ fetch: exec.fetch().map(|f| f as u32),
},
))),
})
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 230727c8c1..2b8c1056f3 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -794,6 +794,19 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
)?))
}
+#[test]
+fn roundtrip_filter_with_fetch() -> Result<()> {
+ let field_a = Field::new("a", DataType::Boolean, false);
+ let field_b = Field::new("b", DataType::Int64, false);
+ let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+ let predicate = col("a", &schema)?;
+ let filter = FilterExecBuilder::new(predicate,
Arc::new(EmptyExec::new(schema)))
+ .with_fetch(Some(10))
+ .build()?;
+ assert_eq!(filter.fetch(), Some(10));
+ roundtrip_test(Arc::new(filter))
+}
+
#[test]
fn roundtrip_sort() -> Result<()> {
let field_a = Field::new("a", DataType::Boolean, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]