================ @@ -133,6 +140,85 @@ void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { } } +/// Main loop for worker threads when using a jobserver. +/// This function uses a two-level queue; it first acquires a job slot from the +/// external jobserver, then retrieves a task from the internal queue. +/// This allows the thread pool to cooperate with build systems like `make -j`. +void StdThreadPool::processTasksWithJobserver() { + while (true) { + // Acquire a job slot from the external jobserver. + // This polls for a slot and yields the thread to avoid a high-CPU wait. + JobSlot Slot; + while (true) { + // Return if the thread pool is shutting down. + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + if (!EnableFlag) + return; + } + Slot = TheJobserver->tryAcquire(); + if (Slot.isValid()) + break; // Successfully acquired a slot. + + // If the jobserver is busy, yield to let other threads run. + std::this_thread::yield(); + } + + // `make_scope_exit` guarantees the job slot is released, even if the + // task throws or we exit early. This prevents deadlocking the build. + auto SlotReleaser = + make_scope_exit([&] { TheJobserver->release(std::move(Slot)); }); + + // With an external job slot secured, get a task from the internal queue. + std::function<void()> Task; + ThreadPoolTaskGroup *GroupOfTask = nullptr; + + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + + // Wait until a task is available or the pool is shutting down. + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + + // If shutting down and the queue is empty, the thread can terminate. + if (!EnableFlag && Tasks.empty()) + return; + + // Handle spurious wakeups by re-checking if the queue has tasks. + if (Tasks.empty()) + continue; + + // A task is available. Mark it as active before releasing the lock + // to prevent race conditions with `wait()`. + ++ActiveThreads; + Task = std::move(Tasks.front().first); + GroupOfTask = Tasks.front().second; + if (GroupOfTask != nullptr) + ++ActiveGroups[GroupOfTask]; + Tasks.pop_front(); + } // The queue lock is released. + + // Run the task. The job slot remains acquired during execution. + Task(); + + // The task has finished. Update the active count and notify any waiters. + { + std::lock_guard<std::mutex> LockGuard(QueueLock); + --ActiveThreads; + if (GroupOfTask != nullptr) { + auto A = ActiveGroups.find(GroupOfTask); + if (--(A->second) == 0) + ActiveGroups.erase(A); + } + // If all tasks are complete, notify any waiting threads. + if (workCompletedUnlocked(nullptr)) + CompletionCondition.notify_all(); + } + + // The SlotReleaser is destroyed here, returning the token to the jobserver. + // The loop then repeats to process the next job. ---------------- yxsamliu wrote:
will add an inner loop to keep running tasks when we have the token https://github.com/llvm/llvm-project/pull/145131 _______________________________________________ cfe-commits mailing list cfe-commits@lists.llvm.org https://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits