Control: tag -1 patch

Please find attached modification of the backport of 0ac4aa41.

Notice that `Q_EMIT started()` should be put at the end of the implementation
of PipeWireProduce::setupStream --- as stated in both the original commit,
and the backported patch.

However, the context around that change causes that Q_EMIT to actually
be placed at the end of PipeWireProduce::deactivate.  This is obviously
incorrect: the signal kickoff to start the asynchronous startup doesn't
get initiated until the stream is no longer used.

I can additionally confirm that this resolves the krdp input issue that
spurred this bug report initially.

Hope this can get in before trixie is released.

Best,
Antonio Russo

From 0ac4aa418b6a7665f3d9e811c7f6632687651e6b Mon Sep 17 00:00:00 2001
From: Arjen Hiemstra <ahiems...@heimr.nl>
Date: Tue, 8 Oct 2024 14:03:20 +0200
Subject: [PATCH] encodedstream: Deprecate setActive() and replace with an
 explicit API

setActive() currently implies starting/stopping the recording process.
However, this is somewhat awkward as everything is rather asynchronous
with multiple threads involved, which means calling setActive() may mean
things are not actually active or may still be active.

To avoid this awkwardness, deprecate setActive() and replace it with an
explicit start() and stop() method that are clearly documented to be
purely requests, with the real active/inactive state matching the stream
state.

This also makes the "Rendering" state more explicit, when "Stop" is
called we immediately switch to the "Rendering" state to indicate we may
still be processing frames but are no longer receiving new frames.
---
 src/pipewirebaseencodedstream.cpp | 111 +++++++++++++++++++-----------
 src/pipewirebaseencodedstream.h   |  40 ++++++++++-
 src/pipewireproduce.cpp           |   2 +
 src/pipewireproduce_p.h           |   2 +
 src/pipewirerecord.cpp            |   1 -
 tests/HeadlessTest.cpp            |  20 ++++--
 6 files changed, 127 insertions(+), 49 deletions(-)

diff --git a/src/pipewirebaseencodedstream.cpp b/src/pipewirebaseencodedstream.cpp
index 67c5445..27ef198 100644
--- a/src/pipewirebaseencodedstream.cpp
+++ b/src/pipewirebaseencodedstream.cpp
@@ -17,6 +17,8 @@ extern "C" {
 }
 #include <unistd.h>
 
+#include <QThread>
+
 #include "pipewireproduce_p.h"
 #include "vaapiutils_p.h"
 
@@ -29,6 +31,7 @@ struct PipeWireEncodedStreamPrivate {
     PipeWireBaseEncodedStream::Encoder m_encoder;
     std::optional<quint8> m_quality;
     PipeWireBaseEncodedStream::EncodingPreference m_encodingPreference;
+    PipeWireBaseEncodedStream::State m_state = PipeWireBaseEncodedStream::Idle;
 
     std::unique_ptr<QThread> m_produceThread;
     std::unique_ptr<PipeWireProduce> m_produce;
@@ -36,13 +39,7 @@ struct PipeWireEncodedStreamPrivate {
 
 PipeWireBaseEncodedStream::State PipeWireBaseEncodedStream::state() const
 {
-    if (isActive()) {
-        return Recording;
-    } else if (d->m_produceThread && d->m_produce->m_deactivated && d->m_produceThread->isRunning()) {
-        return Rendering;
-    }
-
-    return Idle;
+    return d->m_state;
 }
 
 PipeWireBaseEncodedStream::PipeWireBaseEncodedStream(QObject *parent)
@@ -65,10 +62,10 @@ PipeWireBaseEncodedStream::PipeWireBaseEncodedStream(QObject *parent)
 
 PipeWireBaseEncodedStream::~PipeWireBaseEncodedStream()
 {
-    setActive(false);
+    stop();
 
-    if (d->m_fd) {
-        close(*d->m_fd);
+    if (d->m_produceThread) {
+        d->m_produceThread->wait();
     }
 }
 
@@ -78,7 +75,6 @@ void PipeWireBaseEncodedStream::setNodeId(uint nodeId)
         return;
 
     d->m_nodeId = nodeId;
-    refresh();
     Q_EMIT nodeIdChanged(nodeId);
 }
 
@@ -91,7 +87,6 @@ void PipeWireBaseEncodedStream::setFd(uint fd)
         close(*d->m_fd);
     }
     d->m_fd = fd;
-    refresh();
     Q_EMIT fdChanged(fd);
 }
 
@@ -141,50 +136,84 @@ int PipeWireBaseEncodedStream::maxBufferSize() const
 
 void PipeWireBaseEncodedStream::setActive(bool active)
 {
-    if (d->m_active == active)
-        return;
+    if (active) {
+        start();
+    } else {
+        stop();
 
-    d->m_active = active;
-    refresh();
-    Q_EMIT activeChanged(active);
+        if (d->m_produceThread) {
+            d->m_produceThread->wait();
+        }
+    }
 }
 
-std::optional<quint8> PipeWireBaseEncodedStream::quality() const
+void PipeWireBaseEncodedStream::start()
 {
-    return d->m_quality;
-}
+    if (d->m_nodeId == 0) {
+        qCWarning(PIPEWIRERECORD_LOGGING) << "Cannot start recording on a stream without a node ID";
+        return;
+    }
 
-void PipeWireBaseEncodedStream::setQuality(quint8 quality)
-{
-    d->m_quality = quality;
-    if (d->m_produce) {
-        d->m_produce->setQuality(d->m_quality);
+    if (d->m_produceThread || d->m_state != Idle) {
+        return;
     }
+
+    d->m_produceThread = std::make_unique<QThread>();
+    d->m_produceThread->setObjectName("PipeWireProduce::input");
+    d->m_produce = makeProduce();
+    d->m_produce->setQuality(d->m_quality);
+    d->m_produce->setMaxPendingFrames(d->m_maxPendingFrames);
+    d->m_produce->setEncodingPreference(d->m_encodingPreference);
+    d->m_produce->moveToThread(d->m_produceThread.get());
+    d->m_produceThread->start();
+    QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::initialize, Qt::QueuedConnection);
+
+    connect(d->m_produce.get(), &PipeWireProduce::started, this, [this]() {
+        d->m_active = true;
+        Q_EMIT activeChanged(true);
+        d->m_state = Recording;
+        Q_EMIT stateChanged();
+    });
+
+    connect(d->m_produce.get(), &PipeWireProduce::finished, this, [this]() {
+        d->m_active = false;
+        Q_EMIT activeChanged(false);
+        d->m_state = Idle;
+        Q_EMIT stateChanged();
+    });
+
+    connect(d->m_produceThread.get(), &QThread::finished, this, [this]() {
+        d->m_produce.reset();
+        d->m_produceThread.reset();
+        d->m_nodeId = 0;
+
+        if (d->m_fd) {
+            close(d->m_fd.value());
+        }
+    });
 }
 
-void PipeWireBaseEncodedStream::refresh()
+void PipeWireBaseEncodedStream::stop()
 {
     if (d->m_produceThread) {
         QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::deactivate, Qt::QueuedConnection);
-        d->m_produceThread->wait();
-
-        d->m_produce.reset();
-        d->m_produceThread.reset();
     }
 
-    if (d->m_active && d->m_nodeId > 0) {
-        d->m_produceThread = std::make_unique<QThread>();
-        d->m_produceThread->setObjectName("PipeWireProduce::input");
-        d->m_produce = makeProduce();
+    d->m_state = PipeWireBaseEncodedStream::Rendering;
+    Q_EMIT stateChanged();
+}
+
+std::optional<quint8> PipeWireBaseEncodedStream::quality() const
+{
+    return d->m_quality;
+}
+
+void PipeWireBaseEncodedStream::setQuality(quint8 quality)
+{
+    d->m_quality = quality;
+    if (d->m_produce) {
         d->m_produce->setQuality(d->m_quality);
-        d->m_produce->setMaxPendingFrames(d->m_maxPendingFrames);
-        d->m_produce->setEncodingPreference(d->m_encodingPreference);
-        d->m_produce->moveToThread(d->m_produceThread.get());
-        d->m_produceThread->start();
-        QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::initialize, Qt::QueuedConnection);
     }
-
-    Q_EMIT stateChanged();
 }
 
 void PipeWireBaseEncodedStream::setEncoder(Encoder encoder)
diff --git a/src/pipewirebaseencodedstream.h b/src/pipewirebaseencodedstream.h
index 9f4c0fd..8728252 100644
--- a/src/pipewirebaseencodedstream.h
+++ b/src/pipewirebaseencodedstream.h
@@ -25,7 +25,7 @@ class KPIPEWIRE_EXPORT PipeWireBaseEncodedStream : public QObject
      * Transfers the ownership of the fd, will close it when it's done with it.
      */
     Q_PROPERTY(uint fd READ fd WRITE setFd NOTIFY fdChanged)
-    Q_PROPERTY(bool active READ isActive WRITE setActive NOTIFY activeChanged)
+    Q_PROPERTY(bool active READ isActive NOTIFY activeChanged)
     Q_PROPERTY(State state READ state NOTIFY stateChanged)
     Q_PROPERTY(Encoder encoder READ encoder WRITE setEncoder NOTIFY encoderChanged)
 
@@ -67,7 +67,42 @@ public:
     int maxBufferSize() const;
 
     bool isActive() const;
-    void setActive(bool active);
+    /**
+     * Set the active state of recording.
+     *
+     * @deprecated Since 6.4, use the separate `start()`/`stop()`calls instead.
+     * This function now just calls `start()`/`stop()`.
+     *
+     * @note When calling `setActive(false)`, unlike `stop()`, this function will
+     * block until the internal encoding threads are finished.
+     */
+    KPIPEWIRE_DEPRECATED void setActive(bool active);
+
+    /**
+     * Request to start recording.
+     *
+     * This will create everything required to perform recording, like a PipeWire
+     * stream and an encoder, then start receiving frames from the stream and
+     * encoding those.
+     *
+     * This requires a valid node ID to be set and that the current state is Idle.
+     *
+     * Note that recording all happens on separate threads, this method only
+     * starts the process, only when state() returns Recording is recording
+     * actually happening.
+     */
+    Q_INVOKABLE void start();
+    /**
+     * Request to stop recording.
+     *
+     * This will terminate receiving frames from PipeWire and do any cleanup
+     * necessary to fully terminate recording after that.
+     *
+     * Note that after this request, there may still be some processing required
+     * due to internal queues. As long as state() does not return Idle processing
+     * is still happening and teardown has not been completed.
+     */
+    Q_INVOKABLE void stop();
 
     /**
      * The quality used for encoding.
@@ -127,6 +162,5 @@ protected:
     virtual std::unique_ptr<PipeWireProduce> makeProduce() = 0;
     EncodingPreference encodingPreference();
 
-    void refresh();
     QScopedPointer<PipeWireEncodedStreamPrivate> d;
 };
diff --git a/src/pipewireproduce.cpp b/src/pipewireproduce.cpp
index aa6acbb..e5b05cc 100644
--- a/src/pipewireproduce.cpp
+++ b/src/pipewireproduce.cpp
@@ -183,6 +183,7 @@ void PipeWireProduce::setupStream()
         }
     });
     pthread_setname_np(m_outputThread.native_handle(), "PipeWireProduce::output");
+    Q_EMIT started();
 }
 
 void PipeWireProduce::deactivate()
@@ -251,6 +252,7 @@ void PipeWireProduce::destroy()
 
     qCDebug(PIPEWIRERECORD_LOGGING) << "finished";
     cleanup();
+    Q_EMIT finished();
     QThread::currentThread()->quit();
 }
 
diff --git a/src/pipewireproduce_p.h b/src/pipewireproduce_p.h
index 71ca34b..7cc49df 100644
--- a/src/pipewireproduce_p.h
+++ b/src/pipewireproduce_p.h
@@ -154,6 +154,8 @@ public:
 
 Q_SIGNALS:
     void producedFrames();
+    void started();
+    void finished();
 
 private:
     void initFiltersVaapi();
diff --git a/src/pipewirerecord.cpp b/src/pipewirerecord.cpp
index 7293b57..cff7ade 100644
--- a/src/pipewirerecord.cpp
+++ b/src/pipewirerecord.cpp
@@ -69,7 +69,6 @@ void PipeWireRecord::setOutput(const QString &_output)
         return;
 
     d->m_output = output;
-    refresh();
     Q_EMIT outputChanged(output);
 }
 
diff --git a/tests/HeadlessTest.cpp b/tests/HeadlessTest.cpp
index 33db742..6d31f77 100644
--- a/tests/HeadlessTest.cpp
+++ b/tests/HeadlessTest.cpp
@@ -55,16 +55,29 @@ void createStream(int nodeId, std::optional<int> fd = {})
             }
             encoded->setEncoder(enc);
         }
-        encoded->setActive(true);
+        encoded->start();
         QObject::connect(encoded, &PipeWireEncodedStream::newPacket, qGuiApp, [](const PipeWireEncodedStream::Packet &packet) {
             qDebug() << "packet received" << packet.data().size() << "key:" << packet.isKeyFrame();
         });
         QObject::connect(encoded, &PipeWireEncodedStream::cursorChanged, qGuiApp, [](const PipeWireCursor &cursor) {
             qDebug() << "cursor received. position:" << cursor.position << "hotspot:" << cursor.hotspot << "image:" << cursor.texture;
         });
+        QObject::connect(encoded, &PipeWireEncodedStream::stateChanged, qGuiApp, [encoded]() {
+            switch (encoded->state()) {
+            case PipeWireEncodedStream::Recording:
+                qDebug() << "Started recording";
+                break;
+            case PipeWireEncodedStream::Rendering:
+                qDebug() << "Stopped recording, flushing remaining frames";
+                break;
+            case PipeWireEncodedStream::Idle:
+                qDebug() << "Recording finished, quitting";
+                exit(0);
+                break;
+            }
+        });
         QObject::connect(KSignalHandler::self(), &KSignalHandler::signalReceived, encoded, [encoded] {
-            encoded->setActive(false);
-            exit(0);
+            encoded->stop();
         });
         return;
     }
@@ -96,7 +109,6 @@ void createStream(int nodeId, std::optional<int> fd = {})
     });
     QObject::connect(KSignalHandler::self(), &KSignalHandler::signalReceived, pwStream, [pwStream] {
         pwStream->setActive(false);
-        exit(0);
     });
 }
 
-- 
GitLab

Reply via email to