gstvg commented on code in PR #21193:
URL: https://github.com/apache/datafusion/pull/21193#discussion_r3165586968
##########
datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs:
##########
@@ -481,6 +526,27 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
};
substrait_err!("Missing handler for user-defined literals {}",
type_ref)
}
+
+ // Lambda related methods
+
+ /// Returns a new instance of this consumer which includes the given
`lambda_parameters` and the names they got assigned
+ ///
+ /// Note for custom implementations it's possible to embed a
[DefaultSubstraitLambdaConsumer] and forward this method to it
+ fn with_lambda_parameters(
+ &self,
+ lambda_parameters: &[Type],
+ input_schema: &DFSchema,
+ ) -> datafusion::common::Result<(Vec<String>, Self)>;
Review Comment:
@benbellick I usually follows the pattern of existing methods, like
`push/pop_outer_schema` from https://github.com/apache/datafusion/pull/20439.
This is my first time dealing with substrait, so I may be wrong, but I didn't
followed this pattern (using `push/pop_lambda_parameter`) because it modifies
&self via a RwLock and I'm note sure this couldn't lead to conflicts if the
same consumer is used to consume two different plans at the same time in
different threads. If that's not the case I can change this to use
`push/pop_lambda_parameter` as well.
##########
datafusion/substrait/src/logical_plan/producer/expr/scalar_function.rs:
##########
@@ -35,7 +38,68 @@ pub fn from_higher_order_function(
fun: &expr::HigherOrderFunction,
schema: &DFSchemaRef,
) -> datafusion::common::Result<Expression> {
- from_function(producer, fun.name(), &fun.args, schema)
+ let mut lambda_parameters = fun.lambda_parameters(schema)?.into_iter();
+
+ let num_lambdas = fun
+ .args
+ .iter()
+ .filter(|arg| matches!(arg, Expr::Lambda(_)))
+ .count();
+
+ if lambda_parameters.len() != num_lambdas {
+ return substrait_err!(
+ "{} returned {} lambdas but {num_lambdas} expected",
+ fun.name(),
+ lambda_parameters.len()
+ );
+ }
+
+ let arguments = fun
+ .args
+ .iter()
+ .map(|arg| {
+ let arg = match arg {
+ Expr::Lambda(l) => {
+ let lambda_parameters =
+ lambda_parameters.next().ok_or_else(|| {
+ internal_datafusion_err!(
+ "lambda_parameters len should have been
checked above"
+ )
+ })?;
+
+ let named_lambda_parameters =
+ std::iter::zip(&l.params, lambda_parameters)
+ .map(|(name, parameter)| parameter.renamed(name))
+ .collect();
+
+ producer.push_lambda_parameters(named_lambda_parameters)?;
+
+ let arg = producer.handle_expr(arg, schema)?;
Review Comment:
Changed in
https://github.com/apache/datafusion/pull/21193/changes/9b6a8e1f18aadd976aaae9af40d7c7ca12095c96
##########
datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs:
##########
@@ -594,6 +662,124 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
let plan = plan.with_exprs_and_inputs(plan.expressions(), inputs)?;
Ok(LogicalPlan::Extension(Extension { node: plan }))
}
+
+ fn with_lambda_parameters(
+ &self,
+ lambda_parameters: &[Type],
+ input_schema: &DFSchema,
+ ) -> datafusion::common::Result<(Vec<String>, Self)> {
+ let (names, lambda_consumer) =
self.lambda_consumer.with_lambda_parameters(
+ self,
+ lambda_parameters,
+ input_schema,
+ )?;
+
+ Ok((
+ names,
+ Self {
+ extensions: self.extensions,
+ state: self.state,
+ outer_schemas:
RwLock::new(self.outer_schemas.read().unwrap().clone()),
+ lambda_consumer,
+ },
+ ))
+ }
+
+ fn lambda_variable(
+ &self,
+ steps_out: usize,
+ field_idx: usize,
+ ) -> datafusion::common::Result<Expr> {
+ self.lambda_consumer.lambda_variable(steps_out, field_idx)
+ }
+}
+
+/// Default implementation of lambda related methods of the
[SubstraitConsumer] trait
+///
+/// Can be embedded into a custom [SubstraitConsumer] to implement them
+pub struct DefaultSubstraitLambdaConsumer {
Review Comment:
The existing required methods for `trait SubstraitConsumer` are trivial to
implement, but that's not true for the newly added lambda methods. This is a
just a convenience to custom implementations which don't want to customize the
default lambda handling, should I remove it?
```rust
struct CustomSubstraitConsumer {
extensions: Arc<Extensions>,
state: Arc<SessionState>,
// You can reuse existing consumer code related to lambdas
lambda_consumer: DefaultSubstraitLambdaConsumer,
}
#[async_trait]
impl SubstraitConsumer for CustomSubstraitConsumer {
async fn resolve_table_ref(
&self,
table_ref: &TableReference,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table = table_ref.table().to_string();
let schema = self.state.schema_for_ref(table_ref.clone())?;
let table_provider = schema.table(&table).await?;
Ok(table_provider)
}
fn get_extensions(&self) -> &Extensions {
self.extensions.as_ref()
}
fn get_function_registry(&self) -> &impl FunctionRegistry {
self.state.as_ref()
}
fn with_lambda_parameters(
&self,
lambda_parameters: &[Type],
input_schema: &DFSchema,
) -> datafusion::common::Result<(Vec<String>, Self)> {
let (names, lambda_consumer) =
self.lambda_consumer.with_lambda_parameters(
self,
lambda_parameters,
input_schema,
)?;
Ok((
names,
Self {
extensions: self.extensions.clone(),
state: self.state.clone(),
lambda_consumer,
},
))
}
fn lambda_variable(
&self,
steps_out: usize,
field_idx: usize,
) -> datafusion::common::Result<Expr> {
self.lambda_consumer.lambda_variable(steps_out, field_idx)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]