featzhang opened a new pull request, #27590:
URL: https://github.com/apache/flink/pull/27590

   ## What is the purpose of the change
   
   This PR introduces a comprehensive AI inference framework for PyFlink, 
making it easy to perform machine learning inference on streaming data with 
automatic lifecycle management.
   
   Currently, PyFlink users face significant challenges when implementing AI 
inference:
   - Manual model management (loading, initialization, cleanup)
   - Manual batching logic for efficient inference  
   - Manual resource and concurrency control
   - Lack of lifecycle hooks for warmup and optimization
   
   This PR addresses these pain points with a simple, declarative API.
   
   ## Brief change log
   
   - Add `DataStream.infer()` method for easy AI inference
   - Implement `ModelLifecycleManager` for model loading and warmup
   - Implement `BatchInferenceExecutor` for efficient batch inference
   - Add support for multiple task types (embedding, classification, generation)
   - Integrate with HuggingFace Transformers ecosystem
   - Add comprehensive metrics tracking
   - Include examples and unit tests
   
   ## Example Usage
   
   ```python
   # Text embedding
   embeddings = data_stream.infer(
       model="sentence-transformers/all-MiniLM-L6-v2",
       input_col="text",
       output_col="embedding",
       batch_size=32,
       device="cuda:0"
   )
   
   # Sentiment classification
   sentiments = data_stream.infer(
       model="distilbert-base-uncased-finetuned-sst-2-english",
       input_col="text",
       output_col="sentiment",
       task_type="classification"
   )
   ```
   
   ## Verifying this change
   
   - Added unit tests for all core components (config, lifecycle, executor, 
metrics, function)
   - Added integration tests
   - Added example programs demonstrating different use cases
   - Tested with multiple HuggingFace models
   - Verified resource cleanup and memory management
   
   ## Does this pull request potentially affect one of the following parts:
   
   - [ ] Dependencies (does it add or upgrade a dependency)
   - [x] The public API (DataStream.infer() is a new public API)
   - [ ] The serializers
   - [ ] The runtime per-record code paths
   - [ ] Anything that affects deployment or recovery: JobManager, 
checkpointing, etc.
   - [ ] The S3 file system connector
   
   ## Documentation
   
   - [x] Does this pull request introduce a new feature? (yes)
   - [x] If yes, how is the feature documented? (JavaDocs/PyDocs and examples)
   
   ## Additional Context
   
   This is an initial MVP implementation focusing on core functionality. Future 
enhancements could include:
   - Integration with AsyncBatchFunction for true async batching
   - Support for additional model frameworks (TensorFlow, ONNX)
   - Model versioning and A/B testing
   - Advanced batching strategies (dynamic batch size)
   - Distributed model serving
   
   The implementation follows Flink's design principles and integrates 
naturally with existing PyFlink APIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to