On Tue, Apr 19, 2016 at 9:12 AM, Sebastian Huber <sebastian.hu...@embedded-brains.de> wrote: > Replace _Thread_queue_Flush() with _Thread_queue_Flush_critical() and > add a filter function for customization of the thread queue flush > operation. > > Update #2555. > --- > cpukit/rtems/src/semdelete.c | 2 +- > cpukit/rtems/src/semflush.c | 2 +- > cpukit/score/include/rtems/score/corebarrierimpl.h | 28 +++++-- > cpukit/score/include/rtems/score/coremuteximpl.h | 36 +++++++-- > cpukit/score/include/rtems/score/coresemimpl.h | 46 ++++++++--- > cpukit/score/include/rtems/score/threadqimpl.h | 59 ++++++++++---- > cpukit/score/src/corebarrier.c | 12 ++- > cpukit/score/src/coremsgclose.c | 21 ++++- > cpukit/score/src/coremsgflush.c | 2 +- > cpukit/score/src/coremutex.c | 22 ++++++ > cpukit/score/src/coresem.c | 22 ++++++ > cpukit/score/src/threadqflush.c | 90 > +++++++++++++++------- > 12 files changed, 266 insertions(+), 76 deletions(-) > > diff --git a/cpukit/rtems/src/semdelete.c b/cpukit/rtems/src/semdelete.c > index f503cd9..48a9055 100644 > --- a/cpukit/rtems/src/semdelete.c > +++ b/cpukit/rtems/src/semdelete.c > @@ -92,7 +92,7 @@ rtems_status_code rtems_semaphore_delete( > if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) { > _CORE_mutex_Flush( > &the_semaphore->Core_control.mutex, > - CORE_MUTEX_WAS_DELETED, > + _CORE_mutex_Was_deleted, > _Semaphore_MP_Send_object_was_deleted, > id > ); > diff --git a/cpukit/rtems/src/semflush.c b/cpukit/rtems/src/semflush.c > index 64386b0..01c5c0d 100644 > --- a/cpukit/rtems/src/semflush.c > +++ b/cpukit/rtems/src/semflush.c > @@ -53,7 +53,7 @@ rtems_status_code rtems_semaphore_flush( > if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) { > _CORE_mutex_Flush( > &the_semaphore->Core_control.mutex, > - CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT, > + _CORE_mutex_Unsatisfied_nowait, > _Semaphore_MP_Send_object_was_deleted, > id > ); > diff --git a/cpukit/score/include/rtems/score/corebarrierimpl.h > b/cpukit/score/include/rtems/score/corebarrierimpl.h > index 1d77405..8e923df 100644 > --- a/cpukit/score/include/rtems/score/corebarrierimpl.h > +++ b/cpukit/score/include/rtems/score/corebarrierimpl.h > @@ -193,19 +193,33 @@ uint32_t _CORE_barrier_Do_release( > ) > #endif > > +Thread_Control *_CORE_barrier_Was_deleted( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +); > + > /* Must be a macro due to the multiprocessing dependent parameters */ > #define _CORE_barrier_Flush( \ > the_barrier, \ > mp_callout, \ > mp_id \ > ) \ > - _Thread_queue_Flush( \ > - &( the_barrier )->Wait_queue, \ > - CORE_BARRIER_TQ_OPERATIONS, \ > - CORE_BARRIER_WAS_DELETED, \ > - mp_callout, \ > - mp_id \ > - ) > + do { \ > + ISR_lock_Context _core_barrier_flush_lock_context; \ > + _Thread_queue_Acquire( \ > + &( the_barrier )->Wait_queue, \ > + &_core_barrier_flush_lock_context \ > + ); \ > + _Thread_queue_Flush_critical( \ > + &( the_barrier )->Wait_queue, \ > + CORE_BARRIER_TQ_OPERATIONS, \ > + _CORE_barrier_Was_deleted, \ > + mp_callout, \ > + mp_id, \ > + &_core_barrier_flush_lock_context \ > + ); \ > + } while ( 0 ) > > /** > * This function returns true if the automatic release attribute is > diff --git a/cpukit/score/include/rtems/score/coremuteximpl.h > b/cpukit/score/include/rtems/score/coremuteximpl.h > index eae6ef1..0f79923 100644 > --- a/cpukit/score/include/rtems/score/coremuteximpl.h > +++ b/cpukit/score/include/rtems/score/coremuteximpl.h > @@ -338,20 +338,40 @@ CORE_mutex_Status _CORE_mutex_Do_surrender( > ) > #endif > > +Thread_Control *_CORE_mutex_Was_deleted( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +); > + > +Thread_Control *_CORE_mutex_Unsatisfied_nowait( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +); > + > /* Must be a macro due to the multiprocessing dependent parameters */ > #define _CORE_mutex_Flush( \ > the_mutex, \ > - status, \ > + filter, \ > mp_callout, \ > mp_id \ > ) \ > - _Thread_queue_Flush( \ > - &( the_mutex )->Wait_queue, \ > - ( the_mutex )->operations, \ > - status, \ > - mp_callout, \ > - mp_id \ > - ) > + do { \ > + ISR_lock_Context _core_mutex_flush_lock_context; \ > + _Thread_queue_Acquire( \ > + &( the_mutex )->Wait_queue, \ > + &_core_mutex_flush_lock_context \ > + ); \ > + _Thread_queue_Flush_critical( \ > + &( the_mutex )->Wait_queue, \ > + ( the_mutex )->operations, \ > + filter, \ > + mp_callout, \ > + mp_id, \ > + &_core_mutex_flush_lock_context \ > + ); \ > + } while ( 0 ) > > /** > * @brief Is mutex locked. > diff --git a/cpukit/score/include/rtems/score/coresemimpl.h > b/cpukit/score/include/rtems/score/coresemimpl.h > index d68c3d7..a341f28 100644 > --- a/cpukit/score/include/rtems/score/coresemimpl.h > +++ b/cpukit/score/include/rtems/score/coresemimpl.h > @@ -86,19 +86,36 @@ void _CORE_semaphore_Initialize( > uint32_t initial_value > ); > > +Thread_Control *_CORE_semaphore_Was_deleted( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +); > + > +Thread_Control *_CORE_semaphore_Unsatisfied_nowait( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +); > + > #define _CORE_semaphore_Destroy( \ > the_semaphore, \ > mp_callout, \ > mp_id \ > ) \ > do { \ > - _Thread_queue_Flush( \ > + ISR_lock_Context _core_semaphore_destroy_lock_context; \ > + _Thread_queue_Acquire( \ > + &( the_semaphore )->Wait_queue, \ > + &_core_semaphore_destroy_lock_context \ > + ); \ > + _Thread_queue_Flush_critical( \ > &( the_semaphore )->Wait_queue, \ > ( the_semaphore )->operations, \ > - CORE_SEMAPHORE_WAS_DELETED, \ > + _CORE_semaphore_Was_deleted, \ > mp_callout, \ > mp_id, \ > - &_core_semaphore_flush_lock_context \ > + &_core_semaphore_destroy_lock_context \ > ); \ > _Thread_queue_Destroy( &( the_semaphore )->Wait_queue ); \ > } while ( 0 ) > @@ -190,17 +207,24 @@ RTEMS_INLINE_ROUTINE CORE_semaphore_Status > _CORE_semaphore_Do_surrender( > /* Must be a macro due to the multiprocessing dependent parameters */ > #define _CORE_semaphore_Flush( \ > the_semaphore, \ > - CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \ > mp_callout, \ > mp_id \ > ) \ > - _Thread_queue_Flush( \ > - &( the_semaphore )->Wait_queue, \ > - ( the_semaphore )->operations, \ > - CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \ > - mp_callout, \ > - mp_id \ > - ) > + do { \ > + ISR_lock_Context _core_semaphore_flush_lock_context; \ > + _Thread_queue_Acquire( \ > + &( the_semaphore )->Wait_queue, \ > + &_core_semaphore_flush_lock_context \ > + ); \ > + _Thread_queue_Flush_critical( \ > + &( the_semaphore )->Wait_queue, \ > + ( the_semaphore )->operations, \ > + _CORE_semaphore_Unsatisfied_nowait, \ > + mp_callout, \ > + mp_id, \ > + &_core_semaphore_flush_lock_context \ > + ); \ > + } while ( 0 ) > > /** > * This routine returns the current count associated with the semaphore. > diff --git a/cpukit/score/include/rtems/score/threadqimpl.h > b/cpukit/score/include/rtems/score/threadqimpl.h > index d56be79..3462b41 100644 > --- a/cpukit/score/include/rtems/score/threadqimpl.h > +++ b/cpukit/score/include/rtems/score/threadqimpl.h > @@ -590,15 +590,37 @@ Thread_Control *_Thread_queue_First( > const Thread_queue_Operations *operations > ); > > -void _Thread_queue_Do_flush( > +/** > + * @brief Thread queue flush filter function. > + * > + * @param the_thread The thread to extract. This is the first parameter to > + * optimize for architectures that use the same register for the first > + * parameter and the return value. > + * @param the_thread_queue The thread queue. > + * @param lock_context The lock context of the lock acquire. May be used to > + * pass additional data to the filter function via an overlay structure. > The > + * filter function should not release or acquire the thread queue lock. > + * > + * @retval the_thread Extract this thread. > + * @retval NULL Do not extract this thread and stop the thread queue flush > + * operation. Threads that are already extracted will complete the flush > + * operation. > + */ Can you please add a bit more description about the intent of the flush filter? It was not clear to me until I looked at the example how the barrier is using filter to set a flag in the TCB of each flushed thread. Filter is an unfortunately overloaded term.
> +typedef Thread_Control *( *Thread_queue_Flush_filter )( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +); > + > +size_t _Thread_queue_Do_flush_critical( > Thread_queue_Control *the_thread_queue, > const Thread_queue_Operations *operations, > - uint32_t status > + Thread_queue_Flush_filter filter, > #if defined(RTEMS_MULTIPROCESSING) > - , > Thread_queue_MP_callout mp_callout, > - Objects_Id mp_id > + Objects_Id mp_id, > #endif > + ISR_lock_Context *lock_context > ); > > /** > @@ -608,39 +630,44 @@ void _Thread_queue_Do_flush( > * > * @param the_thread_queue The thread queue. > * @param operations The thread queue operations. > - * @param status The return status for the threads. > + * @param filter The filter functions called for each thread to extract from > + * the thread queue. > * @param mp_callout Callout to extract the proxy of a remote thread. This > * parameter is only used on multiprocessing configurations. > * @param mp_id Object identifier of the object containing the thread queue. > * This parameter is only used on multiprocessing configurations. > */ > #if defined(RTEMS_MULTIPROCESSING) > - #define _Thread_queue_Flush( \ > + #define _Thread_queue_Flush_critical( \ > the_thread_queue, \ > operations, \ > - status, \ > + filter, \ > mp_callout, \ > - mp_id \ > + mp_id, \ > + lock_context \ > ) \ > - _Thread_queue_Do_flush( \ > + _Thread_queue_Do_flush_critical( \ > the_thread_queue, \ > operations, \ > - status, \ > + filter, \ > mp_callout, \ > - mp_id \ > + mp_id, \ > + lock_context \ > ) > #else > - #define _Thread_queue_Flush( \ > + #define _Thread_queue_Flush_critical( \ > the_thread_queue, \ > operations, \ > - status, \ > + filter, \ > mp_callout, \ > - mp_id \ > + mp_id, \ > + lock_context \ > ) \ > - _Thread_queue_Do_flush( \ > + _Thread_queue_Do_flush_critical( \ > the_thread_queue, \ > operations, \ > - status \ > + filter, \ > + lock_context \ > ) > #endif > > diff --git a/cpukit/score/src/corebarrier.c b/cpukit/score/src/corebarrier.c > index 5313a0f..555e3c9 100644 > --- a/cpukit/score/src/corebarrier.c > +++ b/cpukit/score/src/corebarrier.c > @@ -19,7 +19,6 @@ > #endif > > #include <rtems/score/corebarrierimpl.h> > -#include <rtems/score/threadqimpl.h> > > void _CORE_barrier_Initialize( > CORE_barrier_Control *the_barrier, > @@ -32,3 +31,14 @@ void _CORE_barrier_Initialize( > > _Thread_queue_Initialize( &the_barrier->Wait_queue ); > } > + > +Thread_Control *_CORE_barrier_Was_deleted( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +) > +{ > + the_thread->Wait.return_code = CORE_BARRIER_WAS_DELETED; > + > + return the_thread; > +} > diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c > index 1511f83..acdc9d7 100644 > --- a/cpukit/score/src/coremsgclose.c > +++ b/cpukit/score/src/coremsgclose.c > @@ -21,6 +21,17 @@ > #include <rtems/score/coremsgimpl.h> > #include <rtems/score/wkspace.h> > > +static Thread_Control *_CORE_message_queue_Was_deleted( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +) > +{ > + the_thread->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED; > + > + return the_thread; > +} > + > void _CORE_message_queue_Do_close( > CORE_message_queue_Control *the_message_queue > #if defined(RTEMS_MULTIPROCESSING) > @@ -30,17 +41,21 @@ void _CORE_message_queue_Do_close( > #endif > ) > { > + ISR_lock_Context lock_context; > + > /* > * This will flush blocked threads whether they were blocked on > * a send or receive. > */ > > - _Thread_queue_Flush( > + _CORE_message_queue_Acquire( the_message_queue, &lock_context ); > + _Thread_queue_Flush_critical( > &the_message_queue->Wait_queue, > the_message_queue->operations, > - CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED, > + _CORE_message_queue_Was_deleted, > mp_callout, > - mp_id > + mp_id, > + &lock_context > ); > > (void) _Workspace_Free( the_message_queue->message_buffers ); > diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c > index f67dcf2..38f26b7 100644 > --- a/cpukit/score/src/coremsgflush.c > +++ b/cpukit/score/src/coremsgflush.c > @@ -41,7 +41,7 @@ uint32_t _CORE_message_queue_Flush( > * > * (1) The thread queue of pending senders is a logical extension > * of the pending message queue. In this case, it should be > - * flushed using the _Thread_queue_Flush() service with a status > + * flushed using the _Thread_queue_Flush_critical() service with a status > * such as CORE_MESSAGE_QUEUE_SENDER_FLUSHED (which currently does > * not exist). This can be implemented without changing the "big-O" > * of the message flushing part of the routine. > diff --git a/cpukit/score/src/coremutex.c b/cpukit/score/src/coremutex.c > index 88d487c..0b07c89 100644 > --- a/cpukit/score/src/coremutex.c > +++ b/cpukit/score/src/coremutex.c > @@ -96,3 +96,25 @@ CORE_mutex_Status _CORE_mutex_Initialize( > > return CORE_MUTEX_STATUS_SUCCESSFUL; > } > + > +Thread_Control *_CORE_mutex_Was_deleted( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +) > +{ > + the_thread->Wait.return_code = CORE_MUTEX_WAS_DELETED; > + > + return the_thread; > +} > + > +Thread_Control *_CORE_mutex_Unsatisfied_nowait( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +) > +{ > + the_thread->Wait.return_code = CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT; > + > + return the_thread; > +} > diff --git a/cpukit/score/src/coresem.c b/cpukit/score/src/coresem.c > index 2bdd81c..a36a4f7 100644 > --- a/cpukit/score/src/coresem.c > +++ b/cpukit/score/src/coresem.c > @@ -36,3 +36,25 @@ void _CORE_semaphore_Initialize( > the_semaphore->operations = &_Thread_queue_Operations_FIFO; > } > } > + > +Thread_Control *_CORE_semaphore_Was_deleted( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +) > +{ > + the_thread->Wait.return_code = CORE_SEMAPHORE_WAS_DELETED; > + > + return the_thread; > +} > + > +Thread_Control *_CORE_semaphore_Unsatisfied_nowait( > + Thread_Control *the_thread, > + Thread_queue_Control *the_thread_queue, > + ISR_lock_Context *lock_context > +) > +{ > + the_thread->Wait.return_code = CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT; > + > + return the_thread; > +} > diff --git a/cpukit/score/src/threadqflush.c b/cpukit/score/src/threadqflush.c > index c508314..de64a49 100644 > --- a/cpukit/score/src/threadqflush.c > +++ b/cpukit/score/src/threadqflush.c > @@ -18,46 +18,82 @@ > #include "config.h" > #endif > > -#include <rtems/score/threadqimpl.h> > -#include <rtems/score/objectimpl.h> > +#include <rtems/score/threadimpl.h> > > -void _Thread_queue_Do_flush( > +size_t _Thread_queue_Do_flush_critical( > Thread_queue_Control *the_thread_queue, > const Thread_queue_Operations *operations, > - uint32_t status > + Thread_queue_Flush_filter filter, > #if defined(RTEMS_MULTIPROCESSING) > - , > Thread_queue_MP_callout mp_callout, > - Objects_Id mp_id > + Objects_Id mp_id, > #endif > + ISR_lock_Context *lock_context > ) > { > - ISR_lock_Context lock_context; > - Thread_Control *the_thread; > - > - _Thread_queue_Acquire( the_thread_queue, &lock_context ); > - > - while ( > - ( > - the_thread = _Thread_queue_First_locked( > - the_thread_queue, > - operations > - ) > - ) > - ) { > - the_thread->Wait.return_code = status; > - > - _Thread_queue_Extract_critical( > + size_t flushed; > + Chain_Control unblock; > + Chain_Node *node; > + Chain_Node *tail; > + > + flushed = 0; > + _Chain_Initialize_empty( &unblock ); > + > + while ( true ) { > + Thread_queue_Heads *heads; > + Thread_Control *first; > + bool do_unblock; > + > + heads = the_thread_queue->Queue.heads; > + if ( heads == NULL ) { > + break; > + } > + > + first = ( *operations->first )( heads ); > + first = ( *filter )( first, the_thread_queue, lock_context ); > + if ( first == NULL ) { > + break; > + } > + > + do_unblock = _Thread_queue_Extract_locked( > &the_thread_queue->Queue, > operations, > - the_thread, > + first, > mp_callout, > - mp_id, > - &lock_context > + mp_id > ); > + if ( do_unblock ) { > + _Chain_Append_unprotected( &unblock, &first->Wait.Node.Chain ); > + } > + > + ++flushed; > + } > + > + node = _Chain_First( &unblock ); > + tail = _Chain_Tail( &unblock ); > + > + if ( node != tail ) { > + Per_CPU_Control *cpu_self; > + > + cpu_self = _Thread_Dispatch_disable_critical( lock_context ); > + _Thread_queue_Release( the_thread_queue, lock_context ); > + > + do { > + Thread_Control *the_thread; > + Chain_Node *next; > + > + next = _Chain_Next( node ); > + the_thread = THREAD_CHAIN_NODE_TO_THREAD( node ); > + _Thread_Timer_remove( the_thread ); > + _Thread_Unblock( the_thread ); > + > + node = next; > + } while ( node != tail ); > > - _Thread_queue_Acquire( the_thread_queue, &lock_context ); > + _Thread_Dispatch_enable( cpu_self ); > + } else { > + _Thread_queue_Release( the_thread_queue, lock_context ); > } > > - _Thread_queue_Release( the_thread_queue, &lock_context ); > + return flushed; > } > -- > 1.8.4.5 > > _______________________________________________ > devel mailing list > devel@rtems.org > http://lists.rtems.org/mailman/listinfo/devel _______________________________________________ devel mailing list devel@rtems.org http://lists.rtems.org/mailman/listinfo/devel