sammccall created this revision. sammccall added a reviewer: ilya-biryukov. Herald added subscribers: cfe-commits, ioeric, jkorous-apple, klimek.
LSP has asynchronous semantics, being able to block on an async operation completing is unneccesary and leads to tighter coupling with the threading. In practice only tests depend on this, so we add a general-purpose "block until idle" function to the scheduler which will work for all operations. To get this working, fix a latent condition-variable bug in ASTWorker, and make AsyncTaskRunner const-correct. Repository: rCTE Clang Tools Extra https://reviews.llvm.org/D43127 Files: clangd/ClangdServer.cpp clangd/ClangdServer.h clangd/TUScheduler.cpp clangd/TUScheduler.h clangd/Threading.cpp clangd/Threading.h unittests/clangd/ClangdTests.cpp unittests/clangd/CodeCompleteTests.cpp
Index: unittests/clangd/CodeCompleteTests.cpp =================================================================== --- unittests/clangd/CodeCompleteTests.cpp +++ unittests/clangd/CodeCompleteTests.cpp @@ -120,7 +120,8 @@ /*StorePreamblesInMemory=*/true); auto File = getVirtualTestFilePath("foo.cpp"); Annotations Test(Text); - Server.addDocument(File, Test.code()).wait(); + Server.addDocument(File, Test.code()); + EXPECT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for preamble"; auto CompletionList = Server.codeComplete(File, Test.point(), Opts).get().Value; // Sanity-check that filterText is valid. @@ -553,21 +554,24 @@ void f() { ns::^; } void f() { ns::preamble().$2^; } )cpp"); - Server.addDocument(File, Test.code()).wait(); + Server.addDocument(File, Test.code()); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for preamble"; clangd::CodeCompleteOptions Opts = {}; - auto WithoutIndex = Server.codeComplete(File, Test.point(), Opts).get().Value; - EXPECT_THAT(WithoutIndex.items, - UnorderedElementsAre(Named("local"), Named("preamble"))); - auto I = memIndex({var("ns::index")}); Opts.Index = I.get(); auto WithIndex = Server.codeComplete(File, Test.point(), Opts).get().Value; EXPECT_THAT(WithIndex.items, UnorderedElementsAre(Named("local"), Named("index"))); auto ClassFromPreamble = Server.codeComplete(File, Test.point("2"), Opts).get().Value; EXPECT_THAT(ClassFromPreamble.items, Contains(Named("member"))); + + Opts.Index = nullptr; + auto WithoutIndex = Server.codeComplete(File, Test.point(), Opts).get().Value; + EXPECT_THAT(WithoutIndex.items, + UnorderedElementsAre(Named("local"), Named("preamble"))); + } TEST(CompletionTest, DynamicIndexMultiFile) { @@ -578,11 +582,10 @@ /*StorePreamblesInMemory=*/true, /*BuildDynamicSymbolIndex=*/true); - Server - .addDocument(getVirtualTestFilePath("foo.cpp"), R"cpp( + Server.addDocument(getVirtualTestFilePath("foo.cpp"), R"cpp( namespace ns { class XYZ {}; void foo(int x) {} } - )cpp") - .wait(); + )cpp"); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for preamble"; auto File = getVirtualTestFilePath("bar.cpp"); Annotations Test(R"cpp( @@ -593,7 +596,8 @@ } void f() { ns::^ } )cpp"); - Server.addDocument(File, Test.code()).wait(); + Server.addDocument(File, Test.code()); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for preamble"; auto Results = Server.codeComplete(File, Test.point(), {}).get().Value; // "XYZ" and "foo" are not included in the file being completed but are still @@ -623,6 +627,7 @@ auto File = getVirtualTestFilePath("foo.cpp"); Annotations Test(Text); Server.addDocument(File, Test.code()); + EXPECT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for preamble"; auto R = Server.signatureHelp(File, Test.point()); assert(R); return R.get().Value; Index: unittests/clangd/ClangdTests.cpp =================================================================== --- unittests/clangd/ClangdTests.cpp +++ unittests/clangd/ClangdTests.cpp @@ -38,11 +38,6 @@ namespace { -// Don't wait for async ops in clangd test more than that to avoid blocking -// indefinitely in case of bugs. -static const std::chrono::seconds DefaultFutureTimeout = - std::chrono::seconds(10); - static bool diagsContainErrors(ArrayRef<DiagWithFixIts> Diagnostics) { for (const auto &DiagAndFixIts : Diagnostics) { // FIXME: severities returned by clangd should have a descriptive @@ -140,15 +135,9 @@ FS.ExpectedFile = SourceFilename; - // Have to sync reparses because requests are processed on the calling - // thread. - auto AddDocFuture = Server.addDocument(SourceFilename, SourceContents); - + Server.addDocument(SourceFilename, SourceContents); auto Result = dumpASTWithoutMemoryLocs(Server, SourceFilename); - - // Wait for reparse to finish before checking for errors. - EXPECT_EQ(AddDocFuture.wait_for(DefaultFutureTimeout), - std::future_status::ready); + EXPECT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; EXPECT_EQ(ExpectErrors, DiagConsumer.hadErrorInLastDiags()); return Result; } @@ -208,25 +197,19 @@ FS.Files[FooCpp] = SourceContents; FS.ExpectedFile = FooCpp; - // To sync reparses before checking for errors. - std::future<void> ParseFuture; - - ParseFuture = Server.addDocument(FooCpp, SourceContents); + Server.addDocument(FooCpp, SourceContents); auto DumpParse1 = dumpASTWithoutMemoryLocs(Server, FooCpp); - ASSERT_EQ(ParseFuture.wait_for(DefaultFutureTimeout), - std::future_status::ready); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; EXPECT_FALSE(DiagConsumer.hadErrorInLastDiags()); - ParseFuture = Server.addDocument(FooCpp, ""); + Server.addDocument(FooCpp, ""); auto DumpParseEmpty = dumpASTWithoutMemoryLocs(Server, FooCpp); - ASSERT_EQ(ParseFuture.wait_for(DefaultFutureTimeout), - std::future_status::ready); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; EXPECT_FALSE(DiagConsumer.hadErrorInLastDiags()); - ParseFuture = Server.addDocument(FooCpp, SourceContents); + Server.addDocument(FooCpp, SourceContents); auto DumpParse2 = dumpASTWithoutMemoryLocs(Server, FooCpp); - ASSERT_EQ(ParseFuture.wait_for(DefaultFutureTimeout), - std::future_status::ready); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; EXPECT_FALSE(DiagConsumer.hadErrorInLastDiags()); EXPECT_EQ(DumpParse1, DumpParse2); @@ -253,27 +236,21 @@ FS.Files[FooCpp] = SourceContents; FS.ExpectedFile = FooCpp; - // To sync reparses before checking for errors. - std::future<void> ParseFuture; - - ParseFuture = Server.addDocument(FooCpp, SourceContents); + Server.addDocument(FooCpp, SourceContents); auto DumpParse1 = dumpASTWithoutMemoryLocs(Server, FooCpp); - ASSERT_EQ(ParseFuture.wait_for(DefaultFutureTimeout), - std::future_status::ready); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; EXPECT_FALSE(DiagConsumer.hadErrorInLastDiags()); FS.Files[FooH] = ""; - ParseFuture = Server.forceReparse(FooCpp); + Server.forceReparse(FooCpp); auto DumpParseDifferent = dumpASTWithoutMemoryLocs(Server, FooCpp); - ASSERT_EQ(ParseFuture.wait_for(DefaultFutureTimeout), - std::future_status::ready); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; EXPECT_TRUE(DiagConsumer.hadErrorInLastDiags()); FS.Files[FooH] = "int a;"; - ParseFuture = Server.forceReparse(FooCpp); + Server.forceReparse(FooCpp); auto DumpParse2 = dumpASTWithoutMemoryLocs(Server, FooCpp); - EXPECT_EQ(ParseFuture.wait_for(DefaultFutureTimeout), - std::future_status::ready); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; EXPECT_FALSE(DiagConsumer.hadErrorInLastDiags()); EXPECT_EQ(DumpParse1, DumpParse2); @@ -769,29 +746,17 @@ TEST_F(ClangdThreadingTest, NoConcurrentDiagnostics) { class NoConcurrentAccessDiagConsumer : public DiagnosticsConsumer { - public: - NoConcurrentAccessDiagConsumer(std::promise<void> StartSecondReparse) - : StartSecondReparse(std::move(StartSecondReparse)) {} - - void onDiagnosticsReady( - PathRef File, - Tagged<std::vector<DiagWithFixIts>> Diagnostics) override { + std::atomic<bool> InCallback = {false}; - std::unique_lock<std::mutex> Lock(Mutex, std::try_to_lock_t()); - ASSERT_TRUE(Lock.owns_lock()) + public: + void onDiagnosticsReady(PathRef, + Tagged<std::vector<DiagWithFixIts>>) override { + ASSERT_FALSE(InCallback.exchange(true)) << "Detected concurrent onDiagnosticsReady calls for the same file."; - if (FirstRequest) { - FirstRequest = false; - StartSecondReparse.set_value(); - // Sleep long enough for the second request to be processed. - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } + // Sleep long enough for the other request to be processed. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_TRUE(InCallback.exchange(false)); } - - private: - std::mutex Mutex; - bool FirstRequest = true; - std::promise<void> StartSecondReparse; }; const auto SourceContentsWithoutErrors = R"cpp( @@ -809,24 +774,15 @@ )cpp"; auto FooCpp = getVirtualTestFilePath("foo.cpp"); - llvm::StringMap<std::string> FileContents; - FileContents[FooCpp] = ""; - ConstantFSProvider FS(buildTestFS(FileContents)); - - std::promise<void> StartSecondReparsePromise; - std::future<void> StartSecondReparse = StartSecondReparsePromise.get_future(); - - NoConcurrentAccessDiagConsumer DiagConsumer( - std::move(StartSecondReparsePromise)); - + MockFSProvider FS; + FS.Files[FooCpp] = ""; + NoConcurrentAccessDiagConsumer DiagConsumer; MockCompilationDatabase CDB; - ClangdServer Server(CDB, DiagConsumer, FS, 4, + ClangdServer Server(CDB, DiagConsumer, FS, /*AsyncThreadsCount=*/4, /*StorePreamblesInMemory=*/true); Server.addDocument(FooCpp, SourceContentsWithErrors); - StartSecondReparse.wait(); - - auto Future = Server.addDocument(FooCpp, SourceContentsWithoutErrors); - Future.wait(); + Server.addDocument(FooCpp, SourceContentsWithoutErrors); + ASSERT_TRUE(Server.blockUntilIdleForTest()) << "Waiting for diagnostics"; } } // namespace clangd Index: clangd/Threading.h =================================================================== --- clangd/Threading.h +++ clangd/Threading.h @@ -56,20 +56,36 @@ std::size_t FreeSlots; }; +/// A Deadline is a point in time we may wait for, or None to wait forever. +/// (We use Optional because buggy implementations of std::chrono overflow...) +using Deadline = llvm::Optional<std::chrono::steady_clock::time_point>; +/// Makes a deadline from a timeout in seconds. +Deadline timeoutSeconds(llvm::Optional<double> Seconds); +/// Waits on a condition variable until F() is true or D expires. +template <typename Func> +LLVM_NODISCARD bool wait(std::mutex &Mutex, std::condition_variable &CV, + Deadline D, Func F) { + std::unique_lock<std::mutex> Lock(Mutex); + if (D) + return CV.wait_until(Lock, *D, F); + CV.wait(Lock, F); + return true; +} + /// Runs tasks on separate (detached) threads and wait for all tasks to finish. /// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they /// all complete on destruction. class AsyncTaskRunner { public: /// Destructor waits for all pending tasks to finish. ~AsyncTaskRunner(); - void waitForAll(); + bool waitForAll(Deadline D = llvm::None) const; void runAsync(UniqueFunction<void()> Action); private: - std::mutex Mutex; - std::condition_variable TasksReachedZero; + mutable std::mutex Mutex; + mutable std::condition_variable TasksReachedZero; std::size_t InFlightTasks = 0; }; } // namespace clangd Index: clangd/Threading.cpp =================================================================== --- clangd/Threading.cpp +++ clangd/Threading.cpp @@ -28,14 +28,13 @@ AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); } -void AsyncTaskRunner::waitForAll() { - std::unique_lock<std::mutex> Lock(Mutex); - TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; }); +bool AsyncTaskRunner::waitForAll(Deadline D) const { + return wait(Mutex, TasksReachedZero, D, [&] { return InFlightTasks == 0; }); } void AsyncTaskRunner::runAsync(UniqueFunction<void()> Action) { { - std::unique_lock<std::mutex> Lock(Mutex); + std::lock_guard<std::mutex> Lock(Mutex); ++InFlightTasks; } @@ -59,5 +58,14 @@ std::move(Action), std::move(CleanupTask)) .detach(); } + +Deadline timeoutSeconds(llvm::Optional<double> Seconds) { + using namespace std::chrono; + if (!Seconds) + return llvm::None; + return steady_clock::now() + + duration_cast<steady_clock::duration>(duration<double>(*Seconds)); +} + } // namespace clangd } // namespace clang Index: clangd/TUScheduler.h =================================================================== --- clangd/TUScheduler.h +++ clangd/TUScheduler.h @@ -77,6 +77,10 @@ PathRef File, UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action); + /// Wait until there are no scheduled or running tasks. + /// Mostly useful for synchronizing tests. + bool blockUntilIdle(Deadline D) const; + private: /// This class stores per-file data in the Files map. struct FileData; @@ -88,7 +92,8 @@ llvm::StringMap<std::unique_ptr<FileData>> Files; // None when running tasks synchronously and non-None when running tasks // asynchronously. - llvm::Optional<AsyncTaskRunner> Tasks; + llvm::Optional<AsyncTaskRunner> PreambleTasks; + llvm::Optional<AsyncTaskRunner> WorkerThreads; }; } // namespace clangd } // namespace clang Index: clangd/TUScheduler.cpp =================================================================== --- clangd/TUScheduler.cpp +++ clangd/TUScheduler.cpp @@ -82,6 +82,7 @@ UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)> OnUpdated); void runWithAST(UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action); + bool blockUntilIdle(Deadline Timeout) const; std::shared_ptr<const PreambleData> getPossiblyStalePreamble() const; std::size_t getUsedBytes() const; @@ -113,7 +114,7 @@ // Only set when last request is an update. This allows us to cancel an update // that was never read, if a subsequent update comes in. llvm::Optional<CancellationFlag> LastUpdateCF; /* GUARDED_BY(Mutex) */ - std::condition_variable RequestsCV; + mutable std::condition_variable RequestsCV; }; /// A smart-pointer-like class that points to an active ASTWorker. @@ -243,7 +244,7 @@ assert(!Done && "stop() called twice"); Done = true; } - RequestsCV.notify_one(); + RequestsCV.notify_all(); } void ASTWorker::startTask(UniqueFunction<void()> Task, bool isUpdate, @@ -272,7 +273,7 @@ } Requests.emplace(std::move(Task), Context::current().clone()); } // unlock Mutex. - RequestsCV.notify_one(); + RequestsCV.notify_all(); } void ASTWorker::run() { @@ -289,14 +290,25 @@ // before exiting the processing loop. Req = std::move(Requests.front()); - Requests.pop(); + // Leave it on the queue for now, so waiters don't see an empty queue. } // unlock Mutex std::lock_guard<Semaphore> BarrierLock(Barrier); WithContext Guard(std::move(Req.second)); Req.first(); + + { + std::lock_guard<std::mutex> Lock(Mutex); + Requests.pop(); + } + RequestsCV.notify_all(); } } + +bool ASTWorker::blockUntilIdle(Deadline Timeout) const { + return wait(Mutex, RequestsCV, Timeout, [&] { return Requests.empty(); }); +} + } // namespace unsigned getDefaultAsyncThreadsCount() { @@ -321,17 +333,31 @@ : StorePreamblesInMemory(StorePreamblesInMemory), PCHOps(std::make_shared<PCHContainerOperations>()), ASTCallback(std::move(ASTCallback)), Barrier(AsyncThreadsCount) { - if (0 < AsyncThreadsCount) - Tasks.emplace(); + if (0 < AsyncThreadsCount) { + PreambleTasks.emplace(); + WorkerThreads.emplace(); + } } TUScheduler::~TUScheduler() { // Notify all workers that they need to stop. Files.clear(); // Wait for all in-flight tasks to finish. - if (Tasks) - Tasks->waitForAll(); + if (PreambleTasks) + PreambleTasks->waitForAll(); + if (WorkerThreads) + WorkerThreads->waitForAll(); +} + +bool TUScheduler::blockUntilIdle(Deadline D) const { + for (auto &File : Files) + if (!File.getValue()->Worker->blockUntilIdle(D)) + return false; + if (PreambleTasks) + if (!PreambleTasks->waitForAll(D)) + return false; + return true; } void TUScheduler::update( @@ -342,7 +368,7 @@ if (!FD) { // Create a new worker to process the AST-related tasks. ASTWorkerHandle Worker = ASTWorker::Create( - Tasks ? Tasks.getPointer() : nullptr, Barrier, + WorkerThreads ? WorkerThreads.getPointer() : nullptr, Barrier, CppFile::Create(File, StorePreamblesInMemory, PCHOps, ASTCallback)); FD = std::unique_ptr<FileData>(new FileData{Inputs, std::move(Worker)}); } else { @@ -382,7 +408,7 @@ return; } - if (!Tasks) { + if (!PreambleTasks) { std::shared_ptr<const PreambleData> Preamble = It->second->Worker->getPossiblyStalePreamble(); Action(InputsAndPreamble{It->second->Inputs, Preamble.get()}); @@ -400,7 +426,7 @@ Action(InputsAndPreamble{InputsCopy, Preamble.get()}); }; - Tasks->runAsync( + PreambleTasks->runAsync( BindWithForward(Task, Context::current().clone(), std::move(Action))); } Index: clangd/ClangdServer.h =================================================================== --- clangd/ClangdServer.h +++ clangd/ClangdServer.h @@ -149,11 +149,7 @@ /// \p File is already tracked. Also schedules parsing of the AST for it on a /// separate thread. When the parsing is complete, DiagConsumer passed in /// constructor will receive onDiagnosticsReady callback. - /// \return A future that will become ready when the rebuild (including - /// diagnostics) is finished. - /// FIXME: don't return futures here, LSP does not require a response for this - /// request. - std::future<void> addDocument(PathRef File, StringRef Contents); + void addDocument(PathRef File, StringRef Contents); /// Remove \p File from list of tracked files, schedule a request to free /// resources associated with it. @@ -163,9 +159,7 @@ /// Will also check if CompileCommand, provided by GlobalCompilationDatabase /// for \p File has changed. If it has, will remove currently stored Preamble /// and AST and rebuild them from scratch. - /// FIXME: don't return futures here, LSP does not require a response for this - /// request. - std::future<void> forceReparse(PathRef File); + void forceReparse(PathRef File); /// Run code completion for \p File at \p Pos. /// Request is processed asynchronously. @@ -258,14 +252,19 @@ /// FIXME: those metrics might be useful too, we should add them. std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const; + // Blocks the main thread until the server is idle. Only for use in tests. + // Returns false if the timeout expires. + LLVM_NODISCARD bool + blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds = 10); + private: /// FIXME: This stats several files to find a .clang-format file. I/O can be /// slow. Think of a way to cache this. llvm::Expected<tooling::Replacements> formatCode(llvm::StringRef Code, PathRef File, ArrayRef<tooling::Range> Ranges); - std::future<void> + void scheduleReparseAndDiags(PathRef File, VersionedDraft Contents, Tagged<IntrusiveRefCntPtr<vfs::FileSystem>> TaggedFS); Index: clangd/ClangdServer.cpp =================================================================== --- clangd/ClangdServer.cpp +++ clangd/ClangdServer.cpp @@ -139,20 +139,20 @@ this->RootPath = NewRootPath; } -std::future<void> ClangdServer::addDocument(PathRef File, StringRef Contents) { +void ClangdServer::addDocument(PathRef File, StringRef Contents) { DocVersion Version = DraftMgr.updateDraft(File, Contents); auto TaggedFS = FSProvider.getTaggedFileSystem(File); - return scheduleReparseAndDiags(File, VersionedDraft{Version, Contents.str()}, - std::move(TaggedFS)); + scheduleReparseAndDiags(File, VersionedDraft{Version, Contents.str()}, + std::move(TaggedFS)); } void ClangdServer::removeDocument(PathRef File) { DraftMgr.removeDraft(File); CompileArgs.invalidate(File); WorkScheduler.remove(File); } -std::future<void> ClangdServer::forceReparse(PathRef File) { +void ClangdServer::forceReparse(PathRef File) { auto FileContents = DraftMgr.getDraft(File); assert(FileContents.Draft && "forceReparse() was called for non-added document"); @@ -162,8 +162,7 @@ CompileArgs.invalidate(File); auto TaggedFS = FSProvider.getTaggedFileSystem(File); - return scheduleReparseAndDiags(File, std::move(FileContents), - std::move(TaggedFS)); + scheduleReparseAndDiags(File, std::move(FileContents), std::move(TaggedFS)); } std::future<Tagged<CompletionList>> @@ -481,7 +480,7 @@ return blockingRunWithAST<RetType>(WorkScheduler, File, Action); } -std::future<void> ClangdServer::scheduleReparseAndDiags( +void ClangdServer::scheduleReparseAndDiags( PathRef File, VersionedDraft Contents, Tagged<IntrusiveRefCntPtr<vfs::FileSystem>> TaggedFS) { tooling::CompileCommand Command = CompileArgs.getCompileCommand(File); @@ -492,12 +491,7 @@ Path FileStr = File.str(); VFSTag Tag = std::move(TaggedFS.Tag); - std::promise<void> DonePromise; - std::future<void> DoneFuture = DonePromise.get_future(); - - auto Callback = [this, Version, FileStr, Tag](std::promise<void> DonePromise, - OptDiags Diags) { - auto Guard = llvm::make_scope_exit([&]() { DonePromise.set_value(); }); + auto Callback = [this, Version, FileStr, Tag](OptDiags Diags) { if (!Diags) return; // A new reparse was requested before this one completed. @@ -521,8 +515,7 @@ ParseInputs{std::move(Command), std::move(TaggedFS.Value), std::move(*Contents.Draft)}, - BindWithForward(Callback, std::move(DonePromise))); - return DoneFuture; + std::move(Callback)); } void ClangdServer::onFileEvent(const DidChangeWatchedFilesParams &Params) { @@ -534,3 +527,8 @@ ClangdServer::getUsedBytesPerFile() const { return WorkScheduler.getUsedBytesPerFile(); } + +LLVM_NODISCARD bool +ClangdServer::blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds) { + return WorkScheduler.blockUntilIdle(timeoutSeconds(TimeoutSeconds)); +}
_______________________________________________ cfe-commits mailing list cfe-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/cfe-commits