This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 17717d9c27 adds debugging context to shared writers in coodinator (#5560) 17717d9c27 is described below commit 17717d9c27a74111786406369a9f68d7a1d17f3c Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue May 20 13:50:44 2025 -0400 adds debugging context to shared writers in coodinator (#5560) --- .../accumulo/coordinator/CompactionFinalizer.java | 3 ++- .../apache/accumulo/coordinator/SharedBatchWriter.java | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 2efb9e718b..6b12c775c0 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -107,7 +107,8 @@ public class CompactionFinalizer { private SharedBatchWriter getWriter(ExternalCompactionId ecid) { return writers.computeIfAbsent(ecid.getFirstUUIDChar(), - (i) -> new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), context, queueSize)); + (prefix) -> new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), prefix, context, + queueSize / 16)); } public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize, diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java index 012ee6c4ea..baf21be1d3 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java @@ -33,6 +33,8 @@ import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * This class supports the use case of many threads writing a single mutation to a table. It avoids * each thread creating its own batch writer which creates threads and makes 3 RPCs to write the @@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory; */ public class SharedBatchWriter { private static final Logger log = LoggerFactory.getLogger(SharedBatchWriter.class); + private final Character prefix; private static class Work { private final Mutation mutation; @@ -55,12 +58,14 @@ public class SharedBatchWriter { private final String table; private final ServerContext context; - public SharedBatchWriter(String table, ServerContext context, int queueSize) { + public SharedBatchWriter(String table, Character prefix, ServerContext context, int queueSize) { + Preconditions.checkArgument(queueSize > 0, "illegal queue size %s", queueSize); this.table = table; + this.prefix = prefix; this.context = context; this.mutations = new ArrayBlockingQueue<>(queueSize); - var thread = - Threads.createCriticalThread("shared batch writer for " + table, this::processMutations); + var thread = Threads.createCriticalThread( + "shared batch writer for " + table + " prefix:" + prefix, this::processMutations); thread.start(); } @@ -93,11 +98,12 @@ public class SharedBatchWriter { writer.addMutation(work.mutation); } writer.flush(); - log.trace("Wrote {} mutations in {}ms", batch.size(), timer.elapsed(TimeUnit.MILLISECONDS)); + log.trace("Wrote {} mutations in {}ms for prefix {}", batch.size(), + timer.elapsed(TimeUnit.MILLISECONDS), prefix); batch.forEach(work -> work.future.complete(null)); } catch (TableNotFoundException | MutationsRejectedException e) { - log.debug("Failed to process {} mutations in {}ms", batch.size(), - timer.elapsed(TimeUnit.MILLISECONDS), e); + log.debug("Failed to process {} mutations in {}ms for prefix {}", batch.size(), + timer.elapsed(TimeUnit.MILLISECONDS), prefix, e); batch.forEach(work -> work.future.completeExceptionally(e)); } }