haohuaijin opened a new issue, #21526:
URL: https://github.com/apache/datafusion/issues/21526
### Describe the bug
for the physical plan like below
```
FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0]
SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
DataSourceExec: partitions=1, partition_sizes=[1]
```
then i try use `FilterPushdown` to pushdown the filter under the sort, but
current not work
After FilterPushdown:
```
FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0]
SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
DataSourceExec: partitions=1, partition_sizes=[1]
```
### To Reproduce
```rust
use std::sync::Arc;
use arrow::array::{RecordBatch, StringArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::memory::MemorySourceConfig;
use datafusion::common::config::ConfigOptions;
use datafusion::physical_expr::expressions::{col, like, lit};
use datafusion::physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::filter::FilterExecBuilder;
use datafusion::physical_plan::sorts::sort::SortExec;
/// Constructs and executes a physical plan equivalent to:
///
/// ```text
/// FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0]
/// SortExec: expr=[_timestamp@0 DESC NULLS LAST],
preserve_partitioning=[true]
/// DataSourceExec (MemoryExec): projection=[_timestamp, log]
/// ```
#[tokio::main]
async fn main() {
let schema = Arc::new(Schema::new(vec![
Field::new("_timestamp", DataType::Utf8, false),
Field::new("log", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["2026-04-09T02:00:00Z"])),
Arc::new(StringArray::from(vec!["starting datafusion query
engine"])),
],
)
.unwrap();
// DataSourceExec (MemoryExec): 1 partitions, projection=[_timestamp,
log]
let memory_exec =
MemorySourceConfig::try_new_exec(&[vec![batch1]], schema.clone(),
None).unwrap();
// SortExec: expr=[_timestamp@0 DESC NULLS LAST],
preserve_partitioning=[true]
let sort_expr = PhysicalSortExpr::new(
col("_timestamp", &schema).unwrap(),
SortOptions {
descending: true,
nulls_first: false,
},
);
let lex_ordering = LexOrdering::new(vec![sort_expr]).unwrap();
let sort_exec = Arc::new(
SortExec::new(lex_ordering,
memory_exec).with_preserve_partitioning(true),
);
// FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0],
fetch=10
let like_predicate = like(
false, // not negated
false, // case sensitive
col("log", &schema).unwrap(),
lit("%datafusion%"),
&schema,
)
.unwrap();
let filter_exec = Arc::new(
FilterExecBuilder::new(like_predicate, sort_exec)
.apply_projection(Some(vec![0])) // projection=[_timestamp@0]
.unwrap()
.build()
.unwrap(),
);
// Print the physical plan
let display = displayable(filter_exec.as_ref());
println!("Physical plan:\n{}", display.indent(true));
// Run filter pushdown optimization and print the optimized plan
let config = ConfigOptions::default();
let optimized_plan = FilterPushdown::new()
.optimize(filter_exec.clone(), &config)
.unwrap();
let optimized_display = displayable(optimized_plan.as_ref());
println!("After FilterPushdown:\n{}", optimized_display.indent(true));
}
```
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]