shangxinli commented on code in PR #649:
URL: https://github.com/apache/iceberg-cpp/pull/649#discussion_r3252863296


##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -54,6 +58,64 @@ Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
   return ManifestReader::Make(manifest, file_io, std::move(schema), 
std::move(spec));
 }
 
+/// \brief Cap on per-CleanFiles worker concurrency.
+///
+/// Java's RemoveSnapshots takes ExecutorServices from the table operations 
layer.
+/// C++ has no shared executor today, so each strategy spins up an ad-hoc pool 
via
+/// std::async. Cap concurrency to avoid swamping FileIO with hundreds of 
in-flight
+/// requests on hosts with very high core counts.
+constexpr std::size_t kMaxParallelism = 8;
+
+std::size_t WorkerCount(std::size_t item_count) {
+  if (item_count <= 1) return 1;
+  std::size_t hw = std::thread::hardware_concurrency();
+  if (hw == 0) hw = 2;
+  return std::min({item_count, kMaxParallelism, hw});
+}
+
+/// \brief Run `work` over `items` using up to WorkerCount(items) std::async 
workers.
+///
+/// Each worker drains a contiguous slice of `items`. Exceptions thrown by 
`work` are
+/// swallowed -- callers that need to know whether a unit succeeded should 
record it
+/// inside `work` (e.g. via an atomic counter or a thread-safe collection).
+template <typename Item, typename Fn>
+void RunInParallel(std::span<const Item> items, Fn&& work) {
+  if (items.empty()) return;
+  std::size_t n = WorkerCount(items.size());
+  if (n <= 1) {
+    for (const auto& item : items) {
+      try {
+        work(item);
+      } catch (...) {
+        // best-effort
+      }
+    }
+    return;
+  }
+
+  std::vector<std::future<void>> futures;
+  futures.reserve(n);
+  std::size_t per = (items.size() + n - 1) / n;
+  for (std::size_t i = 0; i < n; ++i) {
+    std::size_t begin = i * per;
+    if (begin >= items.size()) break;
+    std::size_t end = std::min(begin + per, items.size());
+    auto slice = items.subspan(begin, end - begin);
+    futures.emplace_back(std::async(std::launch::async, [slice, &work]() {

Review Comment:
   Good call -- replaced `std::async` with a dedicated `ThreadPool` in ff3a254. 
The pool is owned by `FileCleanupStrategy`, sized once at construction (`min(8, 
hardware_concurrency)`), and drained on destruction, so `CleanFiles` no longer 
spins up fresh threads on each `DeleteFiles` call. Added 
`util/thread_pool_internal.{h,cc}` plus `ThreadPoolTest` coverage (submit, 
fan-out, exception isolation, drain). PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to