I wanted some advice from the brain trust on the best means of terminating threads that are synchronizing using QMutex/QWaitCondition. In my working code, I'm getting error messages that look like:

    QWaitCondition: cv destroy failure: Device or resource busy
    QWaitCondition: mutex destroy failure: Device or resource busy

when I terminate the program, so I'm guessing there's probably a better way that I'm missing. The following code demonstrates the mechanism I'm using to signal the threads to stop. I should note that this demonstration code doesn't give me the above errors, even though it is stopping the threads in the same fashion as my production code:

--------------------------
test.h header:
--------------------------

    #include <atomic>

    #include <QMutex>
    #include <QWaitCondition>
    #include <QSharedPointer>
    #include <QWeakPointer>

    struct ThreadData
    {
        std::atomic<bool>    sending;
    };

    typedef QSharedPointer<ThreadData> ThreadDataPointer;
    typedef QWeakPointer<ThreadData> ThreadDataWeakPointer;

    class Consumer : public QObject
    {
        Q_OBJECT

    public:
        Consumer(ThreadDataWeakPointer thread_data,
                QWaitCondition& wait,
                QMutex& mutex,
                QObject* parent = nullptr);

    signals:
        void        signal_finished();

    public slots:
        void        slot_thread();

    protected:
        ThreadDataWeakPointer    thread_data;
        QWaitCondition&            consumer_wait;
        QMutex&                    consumer_mutex;
    };

    class Producer : public QObject
    {
        Q_OBJECT

    public:
Producer(ThreadDataWeakPointer thread_data, QWaitCondition& wait, QObject* parent = nullptr);

    public slots:
        void        slot_thread();

    signals:
        void        signal_finished();

    protected:
        ThreadDataWeakPointer    thread_data;
        QWaitCondition&            consumer_wait;
    };

--------------------------
test.cpp module:
--------------------------

    #include <iostream>
    #include <chrono>
    #include <thread>

    #include <signal.h>

    #include <QCoreApplication>
    #include <QThread>
    #include <QTimer>

    #include "test.h"

//-------------------------------------------------------------------
    // Consumer class methods

    Consumer::Consumer(ThreadDataWeakPointer data,
                        QWaitCondition& wait,
                        QMutex& mutex,
                        QObject* parent)
        : QObject(parent),
        thread_data(data),
        consumer_wait(wait),
        consumer_mutex(mutex)
    {
    }

    void Consumer::slot_thread()
    {
        while(true)
        {
            // wait for the producer to send us data
            consumer_mutex.lock();
            consumer_wait.wait(&consumer_mutex);

            // invalid thread data is a signal to us to terminate

            if(thread_data.isNull())
                break;

            ThreadDataPointer n_dp;
            n_dp = thread_data;
            if(!n_dp)
                break;

            // do something for a few milliseconds
std::this_thread::sleep_for(std::chrono::milliseconds(10));

            n_dp->sending = false;
            consumer_mutex.unlock();
        }

        std::cout << "Terminating Consumer\n";

        consumer_mutex.unlock();    // release the mutex
        emit signal_finished();
    }

//-------------------------------------------------------------------
    // Producer class methods

Producer::Producer(ThreadDataWeakPointer data, QWaitCondition& wait, QObject* parent)
        : QObject(parent),
        thread_data(data),
        consumer_wait(wait)
    {
    }

    void Producer::slot_thread()
    {
        while(true)
        {
            if(thread_data.isNull())
                break;

            ThreadDataPointer n_dp;
            n_dp = thread_data;
            if(!n_dp)
                break;

            // thread_data remains valid as long as we hold n_dp

            // wake the consumer thread
            n_dp->sending = true;
            consumer_wait.wakeAll();

            // wait for the consumer to consume our output
            while(n_dp->sending)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }

        std::cout << "Terminating Producer\n";

consumer_wait.wakeAll(); // wake consumer so it can detect bad thread data and terminate
        emit signal_finished();
    }

    ThreadDataPointer thread_data;
    QMutex            consumer_mutex;
    QWaitCondition    consumer_wait;

    QThread*  producer_thread;
    Producer* producer_worker;
    QThread*  consumer_thread;
    Consumer* consumer_worker;

    void signal_handler(int /*sig*/)
    {
        thread_data.clear();        // signal threads to terminate
        QTimer::singleShot(100, qApp, &QCoreApplication::quit);
    }

    int main(int argc, char* argv[])
    {
        QCoreApplication app(argc, argv);

        for(int sig : {SIGQUIT, SIGINT, SIGTERM, SIGHUP})   // C++11
            signal(sig, signal_handler);

        thread_data = ThreadDataPointer(new ThreadData);
        thread_data->sending = false;

        // create the producer thread

        QThread* producer_thread = new QThread;
Producer* producer_worker = new Producer(thread_data.toWeakRef(), consumer_wait);

        producer_worker->moveToThread(producer_thread);

QObject::connect(producer_thread, &QThread::started, producer_worker, &Producer::slot_thread); QObject::connect(producer_worker, &Producer::signal_finished, producer_thread, &QThread::quit); QObject::connect(producer_worker, &Producer::signal_finished, producer_worker, &Producer::deleteLater); QObject::connect(producer_thread, &QThread::finished, producer_thread, &QThread::deleteLater);

        // create the consumer thread

        QThread* consumer_thread = new QThread;
consumer_worker = new Consumer(thread_data.toWeakRef(), consumer_wait, consumer_mutex);

        consumer_worker->moveToThread(consumer_thread);

QObject::connect(consumer_thread, &QThread::started, consumer_worker, &Consumer::slot_thread); QObject::connect(consumer_worker, &Consumer::signal_finished, consumer_thread, &QThread::quit); QObject::connect(consumer_worker, &Consumer::signal_finished, consumer_worker, &Consumer::deleteLater); QObject::connect(consumer_thread, &QThread::finished, consumer_thread, &QThread::deleteLater);

        consumer_thread->start();
        producer_thread->start();

        return app.exec();
    }

Thanks for any insights.

_______________________________________________
Interest mailing list
Interest@qt-project.org
http://lists.qt-project.org/mailman/listinfo/interest

Reply via email to