GitHub user kinthaiofficial closed the discussion with a comment: How do I 
create a UDF in datafusion rust?

Creating UDFs in DataFusion is well-supported — here's a complete example 
covering scalar UDFs and the newer function API:

```rust
use datafusion::prelude::*;
use datafusion::logical_expr::{ScalarUDF, ScalarUDFImpl, Signature, Volatility};
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result;
use std::sync::Arc;

// Implement ScalarUDFImpl for your function
#[derive(Debug)]
struct TruncateText {
    signature: Signature,
}

impl TruncateText {
    fn new() -> Self {
        Self {
            signature: Signature::exact(
                vec![DataType::Utf8, DataType::Int64],
                Volatility::Immutable,
            ),
        }
    }
}

impl ScalarUDFImpl for TruncateText {
    fn as_any(&self) -> &dyn std::any::Any { self }
    fn name(&self) -> &str { "truncate_text" }
    fn signature(&self) -> &Signature { &self.signature }
    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
        Ok(DataType::Utf8)
    }
    
    fn invoke(&self, args: &[datafusion::physical_expr::ColumnarValue]) 
        -> Result<datafusion::physical_expr::ColumnarValue> 
    {
        use datafusion::physical_expr::ColumnarValue;
        use datafusion::arrow::array::{StringArray, Int64Array};
        
        let text_array = args[0].clone().into_array(1)?;
        let len_array = args[1].clone().into_array(1)?;
        
        let texts = text_array.as_any().downcast_ref::<StringArray>().unwrap();
        let lens = len_array.as_any().downcast_ref::<Int64Array>().unwrap();
        
        let result: StringArray = texts.iter().zip(lens.iter())
            .map(|(text, len)| {
                match (text, len) {
                    (Some(t), Some(l)) => Some(&t[..l.min(t.len() as i64) as 
usize]),
                    _ => None,
                }
            })
            .collect();
            
        Ok(ColumnarValue::Array(Arc::new(result)))
    }
}

// Register and use
#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    ctx.register_udf(ScalarUDF::from(TruncateText::new()));
    
    let result = ctx.sql(
        "SELECT truncate_text(name, 10) FROM my_table"
    ).await?.collect().await?;
    
    Ok(())
}
```

For aggregate UDFs (`AggregateUDFImpl`) and window UDFs, the pattern is similar 
but you implement `update`, `merge`, and `evaluate` methods instead of `invoke`.

GitHub link: 
https://github.com/apache/datafusion/discussions/2980#discussioncomment-16750330

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to