On 3/6/2024 5:31 PM, Joel Fernandes wrote:
> 
> 
> On 3/5/2024 2:57 PM, Uladzislau Rezki (Sony) wrote:
>> Fix a below race by not releasing a wait-head from the
>> GP-kthread as it can lead for reusing it whereas a worker
>> can still access it thus execute newly added callbacks too
>> early.
>>
>> CPU 0                              CPU 1
>> -----                              -----
>>
>> // wait_tail == HEAD1
>> rcu_sr_normal_gp_cleanup() {
>>     // has passed SR_MAX_USERS_WAKE_FROM_GP
>>     wait_tail->next = next;
>>     // done_tail = HEAD1
>>     smp_store_release(&rcu_state.srs_done_tail, wait_tail);
>>     queue_work() {
>>         test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)
>>         __queue_work()
>>     }
>> }
>>
>>                                set_work_pool_and_clear_pending()
>>                                rcu_sr_normal_gp_cleanup_work() {
>> // new GP, wait_tail == HEAD2
>> rcu_sr_normal_gp_cleanup() {
>>     // executes all completion, but stop at HEAD1
>>     wait_tail->next = HEAD1;
>>     // done_tail = HEAD2
>>     smp_store_release(&rcu_state.srs_done_tail, wait_tail);
>>     queue_work() {
>>         test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)
>>         __queue_work()
>>     }
>> }
>>                                  // done = HEAD2
>>                                  done = 
>> smp_load_acquire(&rcu_state.srs_done_tail);
>>                                  // head = HEAD1
>>                                  head = done->next;
>>                                  done->next = NULL;
>>                                  llist_for_each_safe() {
>>                                  // completes all callbacks, release HEAD1
>>                                  }
>>                                }
>>                                // Process second queue
>>                                set_work_pool_and_clear_pending()
>>                                rcu_sr_normal_gp_cleanup_work() {
>>                                // done = HEAD2
>>                                done = 
>> smp_load_acquire(&rcu_state.srs_done_tail);
>>
>> // new GP, wait_tail == HEAD3
>> rcu_sr_normal_gp_cleanup() {
>>     // Finds HEAD2 with ->next == NULL at the end
>>     rcu_sr_put_wait_head(HEAD2)
>>     ...
>>
>> // A few more GPs later
>> rcu_sr_normal_gp_init() {
>>      HEAD2 = rcu_sr_get_wait_head();
>>      llist_add(HEAD2, &rcu_state.srs_next);
>>                                // head == rcu_state.srs_next
>>                                head = done->next;
>>                                done->next = NULL;
>>                                llist_for_each_safe() {
>>                                 // EXECUTE CALLBACKS TOO EARLY!!!
>>                                 }
>>                                }
>>
>> Reported-by: Frederic Weisbecker <[email protected]>
>> Fixes: 05a10b921000 ("rcu: Support direct wake-up of synchronize_rcu() 
>> users")
>> Signed-off-by: Uladzislau Rezki (Sony) <[email protected]>
>> ---
>>  kernel/rcu/tree.c | 22 ++++++++--------------
>>  1 file changed, 8 insertions(+), 14 deletions(-)
>>
>> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
>> index 31f3a61f9c38..475647620b12 100644
>> --- a/kernel/rcu/tree.c
>> +++ b/kernel/rcu/tree.c
>> @@ -1656,21 +1656,11 @@ static void rcu_sr_normal_gp_cleanup(void)
>>      WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail));
>>  
>>      /*
>> -     * Process (a) and (d) cases. See an illustration. Apart of
>> -     * that it handles the scenario when all clients are done,
>> -     * wait-head is released if last. The worker is not kicked.
>> +     * Process (a) and (d) cases. See an illustration.
>>       */
>>      llist_for_each_safe(rcu, next, wait_tail->next) {
>> -            if (rcu_sr_is_wait_head(rcu)) {
>> -                    if (!rcu->next) {
>> -                            rcu_sr_put_wait_head(rcu);
>> -                            wait_tail->next = NULL;
>> -                    } else {
>> -                            wait_tail->next = rcu;
>> -                    }
>> -
>> +            if (rcu_sr_is_wait_head(rcu))
>>                      break;
>> -            }
>>  
>>              rcu_sr_normal_complete(rcu);
>>              // It can be last, update a next on this step.
>> @@ -1684,8 +1674,12 @@ static void rcu_sr_normal_gp_cleanup(void)
>>      smp_store_release(&rcu_state.srs_done_tail, wait_tail);
>>      ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_done_tail);
>>  
>> -    if (wait_tail->next)
>> -            queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
>> +    /*
>> +     * We schedule a work in order to perform a final processing
>> +     * of outstanding users(if still left) and releasing wait-heads
>> +     * added by rcu_sr_normal_gp_init() call.
>> +     */
>> +    queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
>>  }

One question, why do you need to queue_work() if wait_tail->next == NULL?

AFAICS, at this stage if wait_tail->next == NULL, you are in CASE f. so the last
remaining HEAD stays?  (And llist_for_each_safe() in
rcu_sr_normal_gp_cleanup_work becomes a NOOP).

Could be something like this, but maybe I missed something:

@@ -1672,7 +1674,7 @@ static void rcu_sr_normal_gp_cleanup_work(struct
work_struct *work)
  */
 static void rcu_sr_normal_gp_cleanup(void)
 {
-       struct llist_node *wait_tail, *next, *rcu;
+       struct llist_node *wait_tail, *next = NULL, *rcu = NULL;
        int done = 0;

        wait_tail = rcu_state.srs_wait_tail;
@@ -1707,7 +1709,8 @@ static void rcu_sr_normal_gp_cleanup(void)
         * of outstanding users(if still left) and releasing wait-heads
         * added by rcu_sr_normal_gp_init() call.
         */
-       queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
+       if (rcu)
+               queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work);
 }

 /*

Reply via email to