WencongLiu opened a new pull request, #56877:
URL: https://github.com/apache/doris/pull/56877

   ### Overview
   This PR introduces full support for Python User-Defined Functions (UDF) and 
User-Defined Table Functions (UDTF) in Doris, aligning with existing Java 
UDF/UDTF syntax while addressing key requirements like high concurrency, 
dependency isolation, and batch processing capabilities.
   
   ### Key Features & Implementations
   #### 1. Syntax Design for Python UDF/UDTF
   - **Python UDF**:
     - Follows standard `CREATE FUNCTION` syntax with `PROPERTIES` for 
configuration
     - Mandatory properties:
       - `file`: HTTP path to UDF compression package (e.g., Aliyun OSS/Toutiao 
TOS: `https://tos.byted.org/user/udf.zip`)
       - `symbol`: Entry point filename of user code
       - `type`: Set to `PYTHON_UDF` to identify Python implementation
     - Example:
       ```sql
       CREATE FUNCTION my_python_udf(INT) RETURNS INT PROPERTIES (
           "file"="https://tos.byted.org/user/udf.zip";,
           "symbol"="get_result",
           "type"="PYTHON_UDF"
       );
       ```
   
   - **Python UDTF**:
     - Uses `CREATE TABLES FUNCTION` keyword (aligns with Java UDTF)
     - Mandatory return type: `ARRAY<T>` (element type maps to actual TF output 
type)
     - Shares same `PROPERTIES` configuration as UDF
     - Example:
       ```sql
       CREATE TABLES FUNCTION my_python_udtf(STRING, STRING) RETURNS 
ARRAY<STRING> PROPERTIES (
           "file"="https://tos.byted.org/user/udf.zip";,
           "symbol"="get_result",
           "type"="PYTHON_UDF"
       );
       ```
   
   #### 2. Python Code Specification
   - **UDF Code Requirements**:
     - Entry function must be named `evaluate`
     - Number of `evaluate` parameters must match SQL function's input count
     - Supports 1:1 (single input → single output) or M:1 (multiple inputs → 
single output) semantics
     - Example:
       ```python
       def evaluate(arg1, arg2):
           # User-defined processing logic
           return processed_result
       ```
   
   - **UDTF Code Requirements**:
     - Entry function must be named `evaluate`
     - Number of `evaluate` parameters must match SQL function's input count
     - Uses `yield` to return multiple results (1:M semantics)
     - Example:
       ```python
       def evaluate(text, delimiter):
           # Split string by delimiter and return multiple results
           for item in text.split(delimiter):
               yield item
       ```
    
   
   #### 3. Type Mapping (SQL ↔ Python 3)
   | SQL Type                | Python 3 Type       | Notes                      
            |
   
|-------------------------|---------------------|----------------------------------------|
   | TINYINT/SMALLINT/INT/BIGINT | int             | Numeric types              
            |
   | BOOLEAN                 | bool                | Boolean type               
            |
   | FLOAT/DOUBLE            | float               | Floating-point types       
            |
   | CHAR/VARCHAR/STRING     | str                 | String types               
            |
   | ARRAY<T>                | list[T]             | T supports numeric/string 
types        |
   | MAP<K, V>               | dict[K, V]          | K/V support numeric/string 
types       |
   
   #### 4. Runtime Architecture
   - **Multi-Process Model**:
     - Addresses Python GIL (Global Interpreter Lock) limitation (critical for 
Python 3.12 and earlier; 3.13 lock-free mode is experimental)
     - Doris BE acts as **Client**; Python subprocesses act as **UDF Servers**
     - 1:1 mapping between BE execution threads and UDF Servers (ensures 
concurrency alignment)
     - UDF Servers are reused across multiple Pipeline Tasks within the same BE 
thread
   
   - **Inter-Process Communication**:
     - Based on Python `marshal` module's C interface
     - Serializes Python objects to byte strings for pipe-based transmission
     - Ensures efficient and reliable data exchange between BE and UDF Servers
   
   #### 5. Batch Execution Capability
   - **High-Concurrency Optimization**:
     - Introduces a batch processing interface that handles multiple input rows 
per `evaluate` call
     - Activated by adding the `"batch_size"` property (with value > 1) in the 
UDF registration SQL
     - Input parameter for batch processing becomes `batch_args` (a list of 
tuples, where each tuple represents one row of inputs)
     - SQL function signature remains unchanged (still declares individual 
input/output types rather than list types)
     - Supports user-side multi-threading (via 
`concurrent.futures.ThreadPoolExecutor`) for parallel processing of batch data
     - Delivers **multi-fold concurrency improvement**, particularly valuable 
for LLM processing and external API call scenarios
     - Example:
       ```python
       from concurrent.futures import ThreadPoolExecutor
   
       def evaluate(batch_args) -> list[int]:
           # Multi-threaded processing example for batch inputs
           with ThreadPoolExecutor(max_workers=32) as executor:
               # Process each row in batch_args using thread pool
               futures = [executor.submit(handle, arg1, arg2) for (arg1, arg2) 
in batch_args]
               return [f.result() for f in futures]
       ```
   
     - Corresponding SQL registration:
       ```sql
       use {database_name};
       
       CREATE FUNCTION batch_process(STRING, FLOAT) RETURNS INT PROPERTIES (
           "file"="https://tosv.byted.org/obj/flow-byte-rag/pyudf.zip";,
           "symbol"="batch_process",
           "type"="PYTHON_UDF",
           "batch_size"="32"  -- Triggers batch execution when value > 1
       );
       ```
   
   #### 6. Dependency Isolation
     - Stores third-party dependencies of different UDFs in separate directories
     - Eliminates dependency conflicts between UDFs
   
   ### Compatibility Guarantee
   - Strictly based on CPython interpreter (ensures 100% compatibility with 
machine learning libraries relying on C extensions)
   - Aligns with existing Doris UDF/UDTF syntax patterns (minimizes learning 
cost for users)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to