This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 17717d9c27 adds debugging context to shared writers in coodinator 
(#5560)
17717d9c27 is described below

commit 17717d9c27a74111786406369a9f68d7a1d17f3c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue May 20 13:50:44 2025 -0400

    adds debugging context to shared writers in coodinator (#5560)
---
 .../accumulo/coordinator/CompactionFinalizer.java      |  3 ++-
 .../apache/accumulo/coordinator/SharedBatchWriter.java | 18 ++++++++++++------
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 2efb9e718b..6b12c775c0 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -107,7 +107,8 @@ public class CompactionFinalizer {
 
   private SharedBatchWriter getWriter(ExternalCompactionId ecid) {
     return writers.computeIfAbsent(ecid.getFirstUUIDChar(),
-        (i) -> new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), 
context, queueSize));
+        (prefix) -> new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), 
prefix, context,
+            queueSize / 16));
   }
 
   public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, 
long fileSize,
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
index 012ee6c4ea..baf21be1d3 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
@@ -33,6 +33,8 @@ import org.apache.accumulo.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class supports the use case of many threads writing a single mutation 
to a table. It avoids
  * each thread creating its own batch writer which creates threads and makes 3 
RPCs to write the
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SharedBatchWriter {
   private static final Logger log = 
LoggerFactory.getLogger(SharedBatchWriter.class);
+  private final Character prefix;
 
   private static class Work {
     private final Mutation mutation;
@@ -55,12 +58,14 @@ public class SharedBatchWriter {
   private final String table;
   private final ServerContext context;
 
-  public SharedBatchWriter(String table, ServerContext context, int queueSize) 
{
+  public SharedBatchWriter(String table, Character prefix, ServerContext 
context, int queueSize) {
+    Preconditions.checkArgument(queueSize > 0, "illegal queue size %s", 
queueSize);
     this.table = table;
+    this.prefix = prefix;
     this.context = context;
     this.mutations = new ArrayBlockingQueue<>(queueSize);
-    var thread =
-        Threads.createCriticalThread("shared batch writer for " + table, 
this::processMutations);
+    var thread = Threads.createCriticalThread(
+        "shared batch writer for " + table + " prefix:" + prefix, 
this::processMutations);
     thread.start();
   }
 
@@ -93,11 +98,12 @@ public class SharedBatchWriter {
           writer.addMutation(work.mutation);
         }
         writer.flush();
-        log.trace("Wrote {} mutations in {}ms", batch.size(), 
timer.elapsed(TimeUnit.MILLISECONDS));
+        log.trace("Wrote {} mutations in {}ms for prefix {}", batch.size(),
+            timer.elapsed(TimeUnit.MILLISECONDS), prefix);
         batch.forEach(work -> work.future.complete(null));
       } catch (TableNotFoundException | MutationsRejectedException e) {
-        log.debug("Failed to process {} mutations in {}ms", batch.size(),
-            timer.elapsed(TimeUnit.MILLISECONDS), e);
+        log.debug("Failed to process {} mutations in {}ms for prefix {}", 
batch.size(),
+            timer.elapsed(TimeUnit.MILLISECONDS), prefix, e);
         batch.forEach(work -> work.future.completeExceptionally(e));
       }
     }

Reply via email to