shangxinli commented on code in PR #649:
URL: https://github.com/apache/iceberg-cpp/pull/649#discussion_r3252863296
##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -54,6 +58,64 @@ Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
return ManifestReader::Make(manifest, file_io, std::move(schema),
std::move(spec));
}
+/// \brief Cap on per-CleanFiles worker concurrency.
+///
+/// Java's RemoveSnapshots takes ExecutorServices from the table operations
layer.
+/// C++ has no shared executor today, so each strategy spins up an ad-hoc pool
via
+/// std::async. Cap concurrency to avoid swamping FileIO with hundreds of
in-flight
+/// requests on hosts with very high core counts.
+constexpr std::size_t kMaxParallelism = 8;
+
+std::size_t WorkerCount(std::size_t item_count) {
+ if (item_count <= 1) return 1;
+ std::size_t hw = std::thread::hardware_concurrency();
+ if (hw == 0) hw = 2;
+ return std::min({item_count, kMaxParallelism, hw});
+}
+
+/// \brief Run `work` over `items` using up to WorkerCount(items) std::async
workers.
+///
+/// Each worker drains a contiguous slice of `items`. Exceptions thrown by
`work` are
+/// swallowed -- callers that need to know whether a unit succeeded should
record it
+/// inside `work` (e.g. via an atomic counter or a thread-safe collection).
+template <typename Item, typename Fn>
+void RunInParallel(std::span<const Item> items, Fn&& work) {
+ if (items.empty()) return;
+ std::size_t n = WorkerCount(items.size());
+ if (n <= 1) {
+ for (const auto& item : items) {
+ try {
+ work(item);
+ } catch (...) {
+ // best-effort
+ }
+ }
+ return;
+ }
+
+ std::vector<std::future<void>> futures;
+ futures.reserve(n);
+ std::size_t per = (items.size() + n - 1) / n;
+ for (std::size_t i = 0; i < n; ++i) {
+ std::size_t begin = i * per;
+ if (begin >= items.size()) break;
+ std::size_t end = std::min(begin + per, items.size());
+ auto slice = items.subspan(begin, end - begin);
+ futures.emplace_back(std::async(std::launch::async, [slice, &work]() {
Review Comment:
Good call -- replaced `std::async` with a dedicated `ThreadPool` in ff3a254.
The pool is owned by `FileCleanupStrategy`, sized once at construction (`min(8,
hardware_concurrency)`), and drained on destruction, so `CleanFiles` no longer
spins up fresh threads on each `DeleteFiles` call. Added
`util/thread_pool_internal.{h,cc}` plus `ThreadPoolTest` coverage (submit,
fan-out, exception isolation, drain). PTAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]