Replace _Thread_queue_Flush() with _Thread_queue_Flush_critical() and add a filter function for customization of the thread queue flush operation.
Update #2555. --- cpukit/rtems/src/semdelete.c | 2 +- cpukit/rtems/src/semflush.c | 2 +- cpukit/score/include/rtems/score/corebarrierimpl.h | 28 +++++-- cpukit/score/include/rtems/score/coremuteximpl.h | 36 +++++++-- cpukit/score/include/rtems/score/coresemimpl.h | 46 ++++++++--- cpukit/score/include/rtems/score/threadqimpl.h | 59 ++++++++++---- cpukit/score/src/corebarrier.c | 12 ++- cpukit/score/src/coremsgclose.c | 21 ++++- cpukit/score/src/coremsgflush.c | 2 +- cpukit/score/src/coremutex.c | 22 ++++++ cpukit/score/src/coresem.c | 22 ++++++ cpukit/score/src/threadqflush.c | 90 +++++++++++++++------- 12 files changed, 266 insertions(+), 76 deletions(-) diff --git a/cpukit/rtems/src/semdelete.c b/cpukit/rtems/src/semdelete.c index f503cd9..48a9055 100644 --- a/cpukit/rtems/src/semdelete.c +++ b/cpukit/rtems/src/semdelete.c @@ -92,7 +92,7 @@ rtems_status_code rtems_semaphore_delete( if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) { _CORE_mutex_Flush( &the_semaphore->Core_control.mutex, - CORE_MUTEX_WAS_DELETED, + _CORE_mutex_Was_deleted, _Semaphore_MP_Send_object_was_deleted, id ); diff --git a/cpukit/rtems/src/semflush.c b/cpukit/rtems/src/semflush.c index 64386b0..01c5c0d 100644 --- a/cpukit/rtems/src/semflush.c +++ b/cpukit/rtems/src/semflush.c @@ -53,7 +53,7 @@ rtems_status_code rtems_semaphore_flush( if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) { _CORE_mutex_Flush( &the_semaphore->Core_control.mutex, - CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT, + _CORE_mutex_Unsatisfied_nowait, _Semaphore_MP_Send_object_was_deleted, id ); diff --git a/cpukit/score/include/rtems/score/corebarrierimpl.h b/cpukit/score/include/rtems/score/corebarrierimpl.h index 1d77405..8e923df 100644 --- a/cpukit/score/include/rtems/score/corebarrierimpl.h +++ b/cpukit/score/include/rtems/score/corebarrierimpl.h @@ -193,19 +193,33 @@ uint32_t _CORE_barrier_Do_release( ) #endif +Thread_Control *_CORE_barrier_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +); + /* Must be a macro due to the multiprocessing dependent parameters */ #define _CORE_barrier_Flush( \ the_barrier, \ mp_callout, \ mp_id \ ) \ - _Thread_queue_Flush( \ - &( the_barrier )->Wait_queue, \ - CORE_BARRIER_TQ_OPERATIONS, \ - CORE_BARRIER_WAS_DELETED, \ - mp_callout, \ - mp_id \ - ) + do { \ + ISR_lock_Context _core_barrier_flush_lock_context; \ + _Thread_queue_Acquire( \ + &( the_barrier )->Wait_queue, \ + &_core_barrier_flush_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ + &( the_barrier )->Wait_queue, \ + CORE_BARRIER_TQ_OPERATIONS, \ + _CORE_barrier_Was_deleted, \ + mp_callout, \ + mp_id, \ + &_core_barrier_flush_lock_context \ + ); \ + } while ( 0 ) /** * This function returns true if the automatic release attribute is diff --git a/cpukit/score/include/rtems/score/coremuteximpl.h b/cpukit/score/include/rtems/score/coremuteximpl.h index eae6ef1..0f79923 100644 --- a/cpukit/score/include/rtems/score/coremuteximpl.h +++ b/cpukit/score/include/rtems/score/coremuteximpl.h @@ -338,20 +338,40 @@ CORE_mutex_Status _CORE_mutex_Do_surrender( ) #endif +Thread_Control *_CORE_mutex_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +); + +Thread_Control *_CORE_mutex_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +); + /* Must be a macro due to the multiprocessing dependent parameters */ #define _CORE_mutex_Flush( \ the_mutex, \ - status, \ + filter, \ mp_callout, \ mp_id \ ) \ - _Thread_queue_Flush( \ - &( the_mutex )->Wait_queue, \ - ( the_mutex )->operations, \ - status, \ - mp_callout, \ - mp_id \ - ) + do { \ + ISR_lock_Context _core_mutex_flush_lock_context; \ + _Thread_queue_Acquire( \ + &( the_mutex )->Wait_queue, \ + &_core_mutex_flush_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ + &( the_mutex )->Wait_queue, \ + ( the_mutex )->operations, \ + filter, \ + mp_callout, \ + mp_id, \ + &_core_mutex_flush_lock_context \ + ); \ + } while ( 0 ) /** * @brief Is mutex locked. diff --git a/cpukit/score/include/rtems/score/coresemimpl.h b/cpukit/score/include/rtems/score/coresemimpl.h index d68c3d7..a341f28 100644 --- a/cpukit/score/include/rtems/score/coresemimpl.h +++ b/cpukit/score/include/rtems/score/coresemimpl.h @@ -86,19 +86,36 @@ void _CORE_semaphore_Initialize( uint32_t initial_value ); +Thread_Control *_CORE_semaphore_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +); + +Thread_Control *_CORE_semaphore_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +); + #define _CORE_semaphore_Destroy( \ the_semaphore, \ mp_callout, \ mp_id \ ) \ do { \ - _Thread_queue_Flush( \ + ISR_lock_Context _core_semaphore_destroy_lock_context; \ + _Thread_queue_Acquire( \ + &( the_semaphore )->Wait_queue, \ + &_core_semaphore_destroy_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ &( the_semaphore )->Wait_queue, \ ( the_semaphore )->operations, \ - CORE_SEMAPHORE_WAS_DELETED, \ + _CORE_semaphore_Was_deleted, \ mp_callout, \ mp_id, \ - &_core_semaphore_flush_lock_context \ + &_core_semaphore_destroy_lock_context \ ); \ _Thread_queue_Destroy( &( the_semaphore )->Wait_queue ); \ } while ( 0 ) @@ -190,17 +207,24 @@ RTEMS_INLINE_ROUTINE CORE_semaphore_Status _CORE_semaphore_Do_surrender( /* Must be a macro due to the multiprocessing dependent parameters */ #define _CORE_semaphore_Flush( \ the_semaphore, \ - CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \ mp_callout, \ mp_id \ ) \ - _Thread_queue_Flush( \ - &( the_semaphore )->Wait_queue, \ - ( the_semaphore )->operations, \ - CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \ - mp_callout, \ - mp_id \ - ) + do { \ + ISR_lock_Context _core_semaphore_flush_lock_context; \ + _Thread_queue_Acquire( \ + &( the_semaphore )->Wait_queue, \ + &_core_semaphore_flush_lock_context \ + ); \ + _Thread_queue_Flush_critical( \ + &( the_semaphore )->Wait_queue, \ + ( the_semaphore )->operations, \ + _CORE_semaphore_Unsatisfied_nowait, \ + mp_callout, \ + mp_id, \ + &_core_semaphore_flush_lock_context \ + ); \ + } while ( 0 ) /** * This routine returns the current count associated with the semaphore. diff --git a/cpukit/score/include/rtems/score/threadqimpl.h b/cpukit/score/include/rtems/score/threadqimpl.h index d56be79..3462b41 100644 --- a/cpukit/score/include/rtems/score/threadqimpl.h +++ b/cpukit/score/include/rtems/score/threadqimpl.h @@ -590,15 +590,37 @@ Thread_Control *_Thread_queue_First( const Thread_queue_Operations *operations ); -void _Thread_queue_Do_flush( +/** + * @brief Thread queue flush filter function. + * + * @param the_thread The thread to extract. This is the first parameter to + * optimize for architectures that use the same register for the first + * parameter and the return value. + * @param the_thread_queue The thread queue. + * @param lock_context The lock context of the lock acquire. May be used to + * pass additional data to the filter function via an overlay structure. The + * filter function should not release or acquire the thread queue lock. + * + * @retval the_thread Extract this thread. + * @retval NULL Do not extract this thread and stop the thread queue flush + * operation. Threads that are already extracted will complete the flush + * operation. + */ +typedef Thread_Control *( *Thread_queue_Flush_filter )( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +); + +size_t _Thread_queue_Do_flush_critical( Thread_queue_Control *the_thread_queue, const Thread_queue_Operations *operations, - uint32_t status + Thread_queue_Flush_filter filter, #if defined(RTEMS_MULTIPROCESSING) - , Thread_queue_MP_callout mp_callout, - Objects_Id mp_id + Objects_Id mp_id, #endif + ISR_lock_Context *lock_context ); /** @@ -608,39 +630,44 @@ void _Thread_queue_Do_flush( * * @param the_thread_queue The thread queue. * @param operations The thread queue operations. - * @param status The return status for the threads. + * @param filter The filter functions called for each thread to extract from + * the thread queue. * @param mp_callout Callout to extract the proxy of a remote thread. This * parameter is only used on multiprocessing configurations. * @param mp_id Object identifier of the object containing the thread queue. * This parameter is only used on multiprocessing configurations. */ #if defined(RTEMS_MULTIPROCESSING) - #define _Thread_queue_Flush( \ + #define _Thread_queue_Flush_critical( \ the_thread_queue, \ operations, \ - status, \ + filter, \ mp_callout, \ - mp_id \ + mp_id, \ + lock_context \ ) \ - _Thread_queue_Do_flush( \ + _Thread_queue_Do_flush_critical( \ the_thread_queue, \ operations, \ - status, \ + filter, \ mp_callout, \ - mp_id \ + mp_id, \ + lock_context \ ) #else - #define _Thread_queue_Flush( \ + #define _Thread_queue_Flush_critical( \ the_thread_queue, \ operations, \ - status, \ + filter, \ mp_callout, \ - mp_id \ + mp_id, \ + lock_context \ ) \ - _Thread_queue_Do_flush( \ + _Thread_queue_Do_flush_critical( \ the_thread_queue, \ operations, \ - status \ + filter, \ + lock_context \ ) #endif diff --git a/cpukit/score/src/corebarrier.c b/cpukit/score/src/corebarrier.c index 5313a0f..555e3c9 100644 --- a/cpukit/score/src/corebarrier.c +++ b/cpukit/score/src/corebarrier.c @@ -19,7 +19,6 @@ #endif #include <rtems/score/corebarrierimpl.h> -#include <rtems/score/threadqimpl.h> void _CORE_barrier_Initialize( CORE_barrier_Control *the_barrier, @@ -32,3 +31,14 @@ void _CORE_barrier_Initialize( _Thread_queue_Initialize( &the_barrier->Wait_queue ); } + +Thread_Control *_CORE_barrier_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_BARRIER_WAS_DELETED; + + return the_thread; +} diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c index 1511f83..acdc9d7 100644 --- a/cpukit/score/src/coremsgclose.c +++ b/cpukit/score/src/coremsgclose.c @@ -21,6 +21,17 @@ #include <rtems/score/coremsgimpl.h> #include <rtems/score/wkspace.h> +static Thread_Control *_CORE_message_queue_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED; + + return the_thread; +} + void _CORE_message_queue_Do_close( CORE_message_queue_Control *the_message_queue #if defined(RTEMS_MULTIPROCESSING) @@ -30,17 +41,21 @@ void _CORE_message_queue_Do_close( #endif ) { + ISR_lock_Context lock_context; + /* * This will flush blocked threads whether they were blocked on * a send or receive. */ - _Thread_queue_Flush( + _CORE_message_queue_Acquire( the_message_queue, &lock_context ); + _Thread_queue_Flush_critical( &the_message_queue->Wait_queue, the_message_queue->operations, - CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED, + _CORE_message_queue_Was_deleted, mp_callout, - mp_id + mp_id, + &lock_context ); (void) _Workspace_Free( the_message_queue->message_buffers ); diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c index f67dcf2..38f26b7 100644 --- a/cpukit/score/src/coremsgflush.c +++ b/cpukit/score/src/coremsgflush.c @@ -41,7 +41,7 @@ uint32_t _CORE_message_queue_Flush( * * (1) The thread queue of pending senders is a logical extension * of the pending message queue. In this case, it should be - * flushed using the _Thread_queue_Flush() service with a status + * flushed using the _Thread_queue_Flush_critical() service with a status * such as CORE_MESSAGE_QUEUE_SENDER_FLUSHED (which currently does * not exist). This can be implemented without changing the "big-O" * of the message flushing part of the routine. diff --git a/cpukit/score/src/coremutex.c b/cpukit/score/src/coremutex.c index 88d487c..0b07c89 100644 --- a/cpukit/score/src/coremutex.c +++ b/cpukit/score/src/coremutex.c @@ -96,3 +96,25 @@ CORE_mutex_Status _CORE_mutex_Initialize( return CORE_MUTEX_STATUS_SUCCESSFUL; } + +Thread_Control *_CORE_mutex_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_MUTEX_WAS_DELETED; + + return the_thread; +} + +Thread_Control *_CORE_mutex_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT; + + return the_thread; +} diff --git a/cpukit/score/src/coresem.c b/cpukit/score/src/coresem.c index 2bdd81c..a36a4f7 100644 --- a/cpukit/score/src/coresem.c +++ b/cpukit/score/src/coresem.c @@ -36,3 +36,25 @@ void _CORE_semaphore_Initialize( the_semaphore->operations = &_Thread_queue_Operations_FIFO; } } + +Thread_Control *_CORE_semaphore_Was_deleted( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_SEMAPHORE_WAS_DELETED; + + return the_thread; +} + +Thread_Control *_CORE_semaphore_Unsatisfied_nowait( + Thread_Control *the_thread, + Thread_queue_Control *the_thread_queue, + ISR_lock_Context *lock_context +) +{ + the_thread->Wait.return_code = CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT; + + return the_thread; +} diff --git a/cpukit/score/src/threadqflush.c b/cpukit/score/src/threadqflush.c index c508314..de64a49 100644 --- a/cpukit/score/src/threadqflush.c +++ b/cpukit/score/src/threadqflush.c @@ -18,46 +18,82 @@ #include "config.h" #endif -#include <rtems/score/threadqimpl.h> -#include <rtems/score/objectimpl.h> +#include <rtems/score/threadimpl.h> -void _Thread_queue_Do_flush( +size_t _Thread_queue_Do_flush_critical( Thread_queue_Control *the_thread_queue, const Thread_queue_Operations *operations, - uint32_t status + Thread_queue_Flush_filter filter, #if defined(RTEMS_MULTIPROCESSING) - , Thread_queue_MP_callout mp_callout, - Objects_Id mp_id + Objects_Id mp_id, #endif + ISR_lock_Context *lock_context ) { - ISR_lock_Context lock_context; - Thread_Control *the_thread; - - _Thread_queue_Acquire( the_thread_queue, &lock_context ); - - while ( - ( - the_thread = _Thread_queue_First_locked( - the_thread_queue, - operations - ) - ) - ) { - the_thread->Wait.return_code = status; - - _Thread_queue_Extract_critical( + size_t flushed; + Chain_Control unblock; + Chain_Node *node; + Chain_Node *tail; + + flushed = 0; + _Chain_Initialize_empty( &unblock ); + + while ( true ) { + Thread_queue_Heads *heads; + Thread_Control *first; + bool do_unblock; + + heads = the_thread_queue->Queue.heads; + if ( heads == NULL ) { + break; + } + + first = ( *operations->first )( heads ); + first = ( *filter )( first, the_thread_queue, lock_context ); + if ( first == NULL ) { + break; + } + + do_unblock = _Thread_queue_Extract_locked( &the_thread_queue->Queue, operations, - the_thread, + first, mp_callout, - mp_id, - &lock_context + mp_id ); + if ( do_unblock ) { + _Chain_Append_unprotected( &unblock, &first->Wait.Node.Chain ); + } + + ++flushed; + } + + node = _Chain_First( &unblock ); + tail = _Chain_Tail( &unblock ); + + if ( node != tail ) { + Per_CPU_Control *cpu_self; + + cpu_self = _Thread_Dispatch_disable_critical( lock_context ); + _Thread_queue_Release( the_thread_queue, lock_context ); + + do { + Thread_Control *the_thread; + Chain_Node *next; + + next = _Chain_Next( node ); + the_thread = THREAD_CHAIN_NODE_TO_THREAD( node ); + _Thread_Timer_remove( the_thread ); + _Thread_Unblock( the_thread ); + + node = next; + } while ( node != tail ); - _Thread_queue_Acquire( the_thread_queue, &lock_context ); + _Thread_Dispatch_enable( cpu_self ); + } else { + _Thread_queue_Release( the_thread_queue, lock_context ); } - _Thread_queue_Release( the_thread_queue, &lock_context ); + return flushed; } -- 1.8.4.5 _______________________________________________ devel mailing list devel@rtems.org http://lists.rtems.org/mailman/listinfo/devel