Mryange opened a new pull request, #62040:
URL: https://github.com/apache/doris/pull/62040

   ### What problem does this PR solve?
   
   Problem Summary:
   
   ```
   exchange_sink_buffer.cpp:569:24: error: reading variable '_queue_deps' 
requires holding mutex '_m'
   [-Werror,-Wthread-safety-analysis]
     569 |         for (auto& dep : _queue_deps) {
         |                        ^
   ```
   
   
   
   `ExchangeSinkBuffer` has two member fields `_queue_deps` and `_parents` that 
are protected by mutex `_m`. However, multiple code paths access these fields 
without holding `_m`, leading to data races.
   
   Specifically, `add_block()`, `_send_rpc()`, and `_set_receiver_eof()` 
iterate `_queue_deps` while only holding the per-`RpcInstance` mutex 
(`instance_data.mutex`), and `_turn_off_channel()` iterates `_parents` also 
without holding `_m`. Concurrently, `set_dependency()` acquires `_m` and pushes 
to both `_queue_deps` and `_parents`. Since the two lock domains are disjoint, 
if a new sink registers via `set_dependency()` while another thread is 
iterating `_queue_deps`, the `std::vector` may be reallocated underneath the 
iterator, causing use-after-free or corrupted iteration. This is undefined 
behavior.
   
   Beyond the data race on the vector itself, there is also a **missed-wakeup** 
problem: the check-then-act pattern on `_total_queue_size` (an 
`std::atomic<int>`) versus the `_queue_deps` block/unblock iteration is not 
atomic. Two threads holding different instance mutexes can interleave such that 
one thread blocks all dependencies right after another thread has already 
checked and decided to unblock, resulting in a permanently blocked pipeline 
(deadlock).
   
   ### Approach
   
   This PR introduces Clang's `-Wthread-safety` static analysis on 
`exchange_sink_buffer.cpp` to mechanically detect all unprotected accesses, 
then fixes all reported violations.
   
   #### 1. Thread Safety Annotation Infrastructure
   
   Added `be/src/common/thread_safety_annotations.h` which provides:
   - `TSA_GUARDED_BY`, `TSA_REQUIRES`, `TSA_ACQUIRE`, `TSA_RELEASE`, etc. — 
standard Clang thread safety annotation macros
   - `AnnotatedMutex` — a `std::mutex` wrapper annotated with 
`TSA_CAPABILITY("mutex")`
   - `AnnotatedLockGuard<MutexType>` — an RAII lock guard annotated with 
`TSA_SCOPED_CAPABILITY`
   
   These are no-ops when compiling with non-Clang compilers.
   
   #### 2. Annotations Applied to ExchangeSinkBuffer
   
   In `exchange_sink_buffer.h`:
   - `std::mutex _m` → `AnnotatedMutex _m`
   - `_queue_deps` marked with `TSA_GUARDED_BY(_m)`
   - `_parents` marked with `TSA_GUARDED_BY(_m)`
   - `set_dependency()` uses `AnnotatedLockGuard` instead of `std::lock_guard`
   
   In `be/src/exec/CMakeLists.txt`:
   - Added `set_source_files_properties(... PROPERTIES COMPILE_FLAGS 
"-Wthread-safety")` for `exchange_sink_buffer.cpp`, so the analysis is enabled 
per-file without affecting the rest of the build.
   
   #### 3. Compilation Errors Detected
   
   With `-Wthread-safety` enabled, Clang reported **8 errors at 4 locations**:
   
   | Location | Line | Field | Access Pattern |
   |---|---|---|---|
   | `add_block()` | ~189 | `_queue_deps` | Iterates to call `dep->block()` — 
only holds `instance_data.mutex`, not `_m` |
   | `_send_rpc()` | ~389 | `_queue_deps` | Iterates to call `dep->set_ready()` 
— only holds `instance_data.mutex`, not `_m` |
   | `_set_receiver_eof()` | ~569 | `_queue_deps` | Iterates to call 
`dep->set_ready()` — holds no lock at all |
   | `_turn_off_channel()` | ~590 | `_parents` | Iterates to call 
`parent->on_channel_finished()` — holds `instance_data.mutex`, not `_m` |
   
   #### 4. Fixes Applied
   
   All four sites now acquire `_m` (via `AnnotatedLockGuard l(_m)`) before 
iterating the guarded fields:
   
   - **`add_block()`**: Wrapped the `_total_queue_size > _queue_capacity` check 
and `_queue_deps` iteration in `AnnotatedLockGuard l(_m)`.
   - **`_send_rpc()`**: Wrapped the `_total_queue_size <= _queue_capacity` 
check and `_queue_deps` iteration in `AnnotatedLockGuard l(_m)`.
   - **`_set_receiver_eof()`**: Wrapped the `_total_queue_size <= 
_queue_capacity` check and `_queue_deps` iteration in `AnnotatedLockGuard 
l(_m)`.
   - **`_turn_off_channel()`**: Wrapped the `_parents` iteration in 
`AnnotatedLockGuard l(_m)`.
   
   Lock ordering is always `instance_data.mutex` → `_m` (the inner lock), so 
there is no deadlock risk.
   
   After applying the fixes, the file compiles cleanly with zero thread-safety 
violations.
   
   
   ### Check List (For Author)
   
   - Test <!-- At least one of them must be included. -->
       - [ ] Regression test
       - [ ] Unit Test
       - [ ] Manual test (add detailed scripts or steps below)
       - [ ] No need to test or manual test. Explain why:
           - [ ] This is a refactor/code format and no logic has been changed.
           - [ ] Previous test can cover this change.
           - [ ] No code files have been changed.
           - [ ] Other reason <!-- Add your reason?  -->
   
   - Behavior changed:
       - [ ] No.
       - [ ] Yes. <!-- Explain the behavior change -->
   
   - Does this need documentation?
       - [ ] No.
       - [ ] Yes. <!-- Add document PR link here. eg: 
https://github.com/apache/doris-website/pull/1214 -->
   
   ### Check List (For Reviewer who merge this PR)
   
   - [ ] Confirm the release note
   - [ ] Confirm test cases
   - [ ] Confirm document
   - [ ] Add branch pick label <!-- Add branch pick label that this PR should 
merge into -->
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to