WencongLiu opened a new pull request, #56877:
URL: https://github.com/apache/doris/pull/56877
### Overview
This PR introduces full support for Python User-Defined Functions (UDF) and
User-Defined Table Functions (UDTF) in Doris, aligning with existing Java
UDF/UDTF syntax while addressing key requirements like high concurrency,
dependency isolation, and batch processing capabilities.
### Key Features & Implementations
#### 1. Syntax Design for Python UDF/UDTF
- **Python UDF**:
- Follows standard `CREATE FUNCTION` syntax with `PROPERTIES` for
configuration
- Mandatory properties:
- `file`: HTTP path to UDF compression package (e.g., Aliyun OSS/Toutiao
TOS: `https://tos.byted.org/user/udf.zip`)
- `symbol`: Entry point filename of user code
- `type`: Set to `PYTHON_UDF` to identify Python implementation
- Example:
```sql
CREATE FUNCTION my_python_udf(INT) RETURNS INT PROPERTIES (
"file"="https://tos.byted.org/user/udf.zip",
"symbol"="get_result",
"type"="PYTHON_UDF"
);
```
- **Python UDTF**:
- Uses `CREATE TABLES FUNCTION` keyword (aligns with Java UDTF)
- Mandatory return type: `ARRAY<T>` (element type maps to actual TF output
type)
- Shares same `PROPERTIES` configuration as UDF
- Example:
```sql
CREATE TABLES FUNCTION my_python_udtf(STRING, STRING) RETURNS
ARRAY<STRING> PROPERTIES (
"file"="https://tos.byted.org/user/udf.zip",
"symbol"="get_result",
"type"="PYTHON_UDF"
);
```
#### 2. Python Code Specification
- **UDF Code Requirements**:
- Entry function must be named `evaluate`
- Number of `evaluate` parameters must match SQL function's input count
- Supports 1:1 (single input → single output) or M:1 (multiple inputs →
single output) semantics
- Example:
```python
def evaluate(arg1, arg2):
# User-defined processing logic
return processed_result
```
- **UDTF Code Requirements**:
- Entry function must be named `evaluate`
- Number of `evaluate` parameters must match SQL function's input count
- Uses `yield` to return multiple results (1:M semantics)
- Example:
```python
def evaluate(text, delimiter):
# Split string by delimiter and return multiple results
for item in text.split(delimiter):
yield item
```
#### 3. Type Mapping (SQL ↔ Python 3)
| SQL Type | Python 3 Type | Notes
|
|-------------------------|---------------------|----------------------------------------|
| TINYINT/SMALLINT/INT/BIGINT | int | Numeric types
|
| BOOLEAN | bool | Boolean type
|
| FLOAT/DOUBLE | float | Floating-point types
|
| CHAR/VARCHAR/STRING | str | String types
|
| ARRAY<T> | list[T] | T supports numeric/string
types |
| MAP<K, V> | dict[K, V] | K/V support numeric/string
types |
#### 4. Runtime Architecture
- **Multi-Process Model**:
- Addresses Python GIL (Global Interpreter Lock) limitation (critical for
Python 3.12 and earlier; 3.13 lock-free mode is experimental)
- Doris BE acts as **Client**; Python subprocesses act as **UDF Servers**
- 1:1 mapping between BE execution threads and UDF Servers (ensures
concurrency alignment)
- UDF Servers are reused across multiple Pipeline Tasks within the same BE
thread
- **Inter-Process Communication**:
- Based on Python `marshal` module's C interface
- Serializes Python objects to byte strings for pipe-based transmission
- Ensures efficient and reliable data exchange between BE and UDF Servers
#### 5. Batch Execution Capability
- **High-Concurrency Optimization**:
- Introduces a batch processing interface that handles multiple input rows
per `evaluate` call
- Activated by adding the `"batch_size"` property (with value > 1) in the
UDF registration SQL
- Input parameter for batch processing becomes `batch_args` (a list of
tuples, where each tuple represents one row of inputs)
- SQL function signature remains unchanged (still declares individual
input/output types rather than list types)
- Supports user-side multi-threading (via
`concurrent.futures.ThreadPoolExecutor`) for parallel processing of batch data
- Delivers **multi-fold concurrency improvement**, particularly valuable
for LLM processing and external API call scenarios
- Example:
```python
from concurrent.futures import ThreadPoolExecutor
def evaluate(batch_args) -> list[int]:
# Multi-threaded processing example for batch inputs
with ThreadPoolExecutor(max_workers=32) as executor:
# Process each row in batch_args using thread pool
futures = [executor.submit(handle, arg1, arg2) for (arg1, arg2)
in batch_args]
return [f.result() for f in futures]
```
- Corresponding SQL registration:
```sql
use {database_name};
CREATE FUNCTION batch_process(STRING, FLOAT) RETURNS INT PROPERTIES (
"file"="https://tosv.byted.org/obj/flow-byte-rag/pyudf.zip",
"symbol"="batch_process",
"type"="PYTHON_UDF",
"batch_size"="32" -- Triggers batch execution when value > 1
);
```
#### 6. Dependency Isolation
- Stores third-party dependencies of different UDFs in separate directories
- Eliminates dependency conflicts between UDFs
### Compatibility Guarantee
- Strictly based on CPython interpreter (ensures 100% compatibility with
machine learning libraries relying on C extensions)
- Aligns with existing Doris UDF/UDTF syntax patterns (minimizes learning
cost for users)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]