This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new a0239d7a01 Avoids creating a batch writer per completed ext compaction 
(#5543)
a0239d7a01 is described below

commit a0239d7a01693332399032f62280daad8d52fbc2
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon May 12 10:55:01 2025 -0400

    Avoids creating a batch writer per completed ext compaction (#5543)
    
    The compaction coordinator was creating a batch writer per a completed
    external compaction.  The batch writer was used to write a single small
    mutation.  This caused a lot of thread creation and RPCs in the
    coordinator.  Changed the coordinator to use a single batch writer for
    these mutations.  Also update some logging in the coordinator to lower
    levels to trace and add some timing information.
    
    
    Co-authored-by: Daniel Roberts ddanielr <ddani...@gmail.com>
---
 .../org/apache/accumulo/core/conf/Property.java    |   7 +-
 .../accumulo/core/metadata/schema/Ample.java       |   5 -
 .../schema/ExternalCompactionFinalState.java       |   8 ++
 .../apache/accumulo/core/util/threads/Threads.java |  17 ++++
 .../accumulo/server/metadata/ServerAmpleImpl.java  |  15 ---
 .../accumulo/coordinator/CompactionFinalizer.java  |  85 ++++++++++++----
 .../accumulo/coordinator/SharedBatchWriter.java    | 107 +++++++++++++++++++++
 .../TestCompactionCoordinatorForOfflineTable.java  |  11 ++-
 8 files changed, 213 insertions(+), 42 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 97cc0319ec..ca73765984 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1584,6 +1584,10 @@ public enum Property {
       "The interval at which to check for external compaction final state 
markers in the metadata table.",
       "2.1.0"),
   @Experimental
+  COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE(
+      "compaction.coordinator.compaction.finalizer.queue.size", "16384", 
PropertyType.COUNT,
+      "The number of completed compactions to buffer in memory before 
blocking.", "2.1.4"),
+  @Experimental
   COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
       "compaction.coordinator.tserver.check.interval", "1m", 
PropertyType.TIMEDURATION,
       "The interval at which to check the tservers for external compactions.", 
"2.1.0"),
@@ -1928,7 +1932,8 @@ public enum Property {
       COMPACTOR_MINTHREADS_TIMEOUT,
 
       // others
-      TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, 
MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME);
+      TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, 
MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME,
+      COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE);
 
   /**
    * Checks if the given property may be changed via Zookeeper, but not 
recognized until the restart
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index f7232d7865..60dc6bbbf5 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -224,11 +224,6 @@ public interface Ample {
     throw new UnsupportedOperationException();
   }
 
-  default void
-      
putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> 
finalStates) {
-    throw new UnsupportedOperationException();
-  }
-
   default Stream<ExternalCompactionFinalState> 
getExternalCompactionFinalStates() {
     throw new UnsupportedOperationException();
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
index 9212ccde9f..14c1df57b0 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.metadata.schema;
 
 import java.util.Base64;
 
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.util.TextUtil;
@@ -136,4 +137,11 @@ public class ExternalCompactionFinalState {
   public String toString() {
     return toJson();
   }
+
+  public Mutation toMutation() {
+    String prefix = MetadataSchema.ExternalCompactionSection.getRowPrefix();
+    Mutation m = new Mutation(prefix + getExternalCompactionId().canonical());
+    m.put("", "", toJson());
+    return m;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
index 76a4029ad3..3ff86913b0 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
@@ -22,9 +22,13 @@ import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.OptionalInt;
 
 import org.apache.accumulo.core.trace.TraceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Threads {
 
+  private static final Logger log = LoggerFactory.getLogger(Threads.class);
+
   public static final UncaughtExceptionHandler UEH = new 
AccumuloUncaughtExceptionHandler();
 
   public static class AccumuloDaemonThread extends Thread {
@@ -66,4 +70,17 @@ public class Threads {
     return thread;
   }
 
+  public static Thread createCriticalThread(String name, Runnable r) {
+    Runnable wrapped = () -> {
+      try {
+        r.run();
+      } catch (RuntimeException e) {
+        System.err.println("Critical thread " + name + " died");
+        e.printStackTrace();
+        Runtime.getRuntime().halt(-1);
+      }
+    };
+
+    return createThread(name, wrapped);
+  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 472aa0a9d7..857853311c 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -294,21 +294,6 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
     return delFlag;
   }
 
-  @Override
-  public void
-      
putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> 
finalStates) {
-    try (BatchWriter writer = 
context.createBatchWriter(DataLevel.USER.metaTable())) {
-      String prefix = ExternalCompactionSection.getRowPrefix();
-      for (ExternalCompactionFinalState finalState : finalStates) {
-        Mutation m = new Mutation(prefix + 
finalState.getExternalCompactionId().canonical());
-        m.put("", "", finalState.toJson());
-        writer.addMutation(m);
-      }
-    } catch (MutationsRejectedException | TableNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   @Override
   public Stream<ExternalCompactionFinalState> 
getExternalCompactionFinalStates() {
     Scanner scanner;
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 3ca9791c70..d61f265c4c 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -36,10 +36,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
 import 
org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -52,6 +55,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.thrift.TException;
@@ -67,10 +71,14 @@ public class CompactionFinalizer {
   private final ExecutorService backgroundExecutor;
   private final BlockingQueue<ExternalCompactionFinalState> 
pendingNotifications;
   private final long tserverCheckInterval;
+  private final SharedBatchWriter sharedBatchWriter;
 
   protected CompactionFinalizer(ServerContext context, 
ScheduledThreadPoolExecutor schedExecutor) {
     this.context = context;
-    this.pendingNotifications = new ArrayBlockingQueue<>(1000);
+    var queueSize =
+        
context.getConfiguration().getCount(Property.COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE);
+
+    this.pendingNotifications = new ArrayBlockingQueue<>(queueSize);
 
     tserverCheckInterval = this.context.getConfiguration()
         
.getTimeInMillis(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL);
@@ -91,6 +99,9 @@ public class CompactionFinalizer {
 
     
ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
         this::notifyTservers, 0, tserverCheckInterval, TimeUnit.MILLISECONDS));
+
+    this.sharedBatchWriter =
+        new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), context, 
queueSize);
   }
 
   public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, 
long fileSize,
@@ -99,41 +110,61 @@ public class CompactionFinalizer {
     var ecfs =
         new ExternalCompactionFinalState(ecid, extent, FinalState.FINISHED, 
fileSize, fileEntries);
 
-    LOG.debug("Initiating commit for external compaction: {}", ecfs);
+    LOG.trace("Initiating commit for external compaction: {} {}", ecid, ecfs);
 
     // write metadata entry
-    context.getAmple().putExternalCompactionFinalStates(List.of(ecfs));
+    Timer timer = Timer.startNew();
+    sharedBatchWriter.write(ecfs.toMutation());
+    LOG.trace("{} metadata compation state write completed in {}ms", ecid,
+        timer.elapsed(TimeUnit.MILLISECONDS));
 
     if (!pendingNotifications.offer(ecfs)) {
-      LOG.debug("Queue full, notification to tablet server will happen later 
{}.", ecfs);
+      LOG.trace("Queue full, notification to tablet server will happen later 
{}.", ecid);
     } else {
-      LOG.debug("Queued tserver notification for completed external 
compaction: {}", ecfs);
+      LOG.trace("Queued tserver notification for completed external 
compaction: {}", ecid);
     }
   }
 
   public void failCompactions(Map<ExternalCompactionId,KeyExtent> 
compactionsToFail) {
+    if (compactionsToFail.size() == 1) {
+      var e = compactionsToFail.entrySet().iterator().next();
+      var ecfs =
+          new ExternalCompactionFinalState(e.getKey(), e.getValue(), 
FinalState.FAILED, 0L, 0L);
+      sharedBatchWriter.write(ecfs.toMutation());
+    } else {
+      try (BatchWriter writer = 
context.createBatchWriter(Ample.DataLevel.USER.metaTable())) {
+        for (var e : compactionsToFail.entrySet()) {
+          var ecfs =
+              new ExternalCompactionFinalState(e.getKey(), e.getValue(), 
FinalState.FAILED, 0L, 0L);
+          writer.addMutation(ecfs.toMutation());
+        }
+      } catch (MutationsRejectedException | TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    }
 
-    var finalStates = compactionsToFail.entrySet().stream().map(
-        e -> new ExternalCompactionFinalState(e.getKey(), e.getValue(), 
FinalState.FAILED, 0L, 0L))
-        .collect(Collectors.toList());
-
-    context.getAmple().putExternalCompactionFinalStates(finalStates);
-
-    finalStates.forEach(pendingNotifications::offer);
+    for (var e : compactionsToFail.entrySet()) {
+      var ecfs =
+          new ExternalCompactionFinalState(e.getKey(), e.getValue(), 
FinalState.FAILED, 0L, 0L);
+      if (!pendingNotifications.offer(ecfs)) {
+        break;
+      }
+    }
   }
 
   private void notifyTserver(Location loc, ExternalCompactionFinalState ecfs) {
 
     TabletClientService.Client client = null;
+    Timer timer = Timer.startNew();
     try {
       client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
loc.getHostAndPort(), context);
       if (ecfs.getFinalState() == FinalState.FINISHED) {
-        LOG.debug("Notifying tserver {} that compaction {} has finished.", 
loc, ecfs);
+        LOG.trace("Notifying tserver {} that compaction {} has finished.", 
loc, ecfs);
         client.compactionJobFinished(TraceUtil.traceInfo(), context.rpcCreds(),
             ecfs.getExternalCompactionId().canonical(), 
ecfs.getExtent().toThrift(),
             ecfs.getFileSize(), ecfs.getEntries());
       } else if (ecfs.getFinalState() == FinalState.FAILED) {
-        LOG.debug("Notifying tserver {} that compaction {} with {} has 
failed.", loc,
+        LOG.trace("Notifying tserver {} that compaction {} with {} has 
failed.", loc,
             ecfs.getExternalCompactionId(), ecfs);
         client.compactionJobFailed(TraceUtil.traceInfo(), context.rpcCreds(),
             ecfs.getExternalCompactionId().canonical(), 
ecfs.getExtent().toThrift());
@@ -145,6 +176,8 @@ public class CompactionFinalizer {
     } finally {
       ThriftUtil.returnClient(client, context);
     }
+    LOG.trace("Tserver {} notification of {} {} took {}ms", loc, 
ecfs.getExternalCompactionId(),
+        ecfs, timer.elapsed(TimeUnit.MILLISECONDS));
   }
 
   private void processPending() {
@@ -155,16 +188,21 @@ public class CompactionFinalizer {
         batch.add(pendingNotifications.take());
         pendingNotifications.drainTo(batch);
 
+        LOG.trace("Processing pending of batch size {}", batch.size());
+
         List<Future<?>> futures = new ArrayList<>();
 
         List<ExternalCompactionId> statusesToDelete = new ArrayList<>();
 
         Map<KeyExtent,TabletMetadata> tabletsMetadata;
         var extents = 
batch.stream().map(ExternalCompactionFinalState::getExtent).collect(toList());
+        Timer timer = Timer.startNew();
         try (TabletsMetadata tablets = 
context.getAmple().readTablets().forTablets(extents)
             .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, 
ColumnType.ECOMP).build()) {
           tabletsMetadata = 
tablets.stream().collect(toMap(TabletMetadata::getExtent, identity()));
         }
+        LOG.trace("Metadata scan completed in {}ms for batch size {}, found 
{}",
+            timer.elapsed(TimeUnit.MILLISECONDS), batch.size(), 
tabletsMetadata.size());
 
         for (ExternalCompactionFinalState ecfs : batch) {
 
@@ -190,10 +228,14 @@ public class CompactionFinalizer {
         }
 
         if (!statusesToDelete.isEmpty()) {
-          LOG.info(
-              "Deleting unresolvable completed external compactions from 
metadata table, ids: {}",
-              statusesToDelete);
+          timer.restart();
           
context.getAmple().deleteExternalCompactionFinalStates(statusesToDelete);
+          LOG.info(
+              "Deleted unresolvable completed external compactions from 
metadata table, ids: {} in {}ms",
+              statusesToDelete.size(), timer.elapsed(TimeUnit.MILLISECONDS));
+          for (var ecid : statusesToDelete) {
+            LOG.debug("Deleted unresolvable completed external compaction {}", 
ecid);
+          }
         }
 
         for (Future<?> future : futures) {
@@ -214,14 +256,19 @@ public class CompactionFinalizer {
   }
 
   private void notifyTservers() {
+    Timer timer = Timer.startNew();
     try (var finalStatesStream = 
context.getAmple().getExternalCompactionFinalStates()) {
+      int count = 0;
       Iterator<ExternalCompactionFinalState> finalStates = 
finalStatesStream.iterator();
       while (finalStates.hasNext()) {
         ExternalCompactionFinalState state = finalStates.next();
-        LOG.debug("Found external compaction in final state: {}, queueing for 
tserver notification",
+        count++;
+        LOG.trace("Found external compaction in final state: {}, queueing for 
tserver notification",
             state);
         pendingNotifications.put(state);
       }
+      LOG.trace("Added {} final compaction states to notification queue in 
{}ms", count,
+          timer.elapsed(TimeUnit.MILLISECONDS));
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
new file mode 100644
index 0000000000..b25fd5f64e
--- /dev/null
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.util.ArrayList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.Timer;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class supports the use case of many threads writing a single mutation 
to a table. It avoids
+ * each thread creating its own batch writer which creates threads and makes 3 
RPCs to write the
+ * single mutation. Using this class results in much less thread creation and 
RPCs.
+ */
+public class SharedBatchWriter {
+  private static final Logger log = 
LoggerFactory.getLogger(SharedBatchWriter.class);
+
+  private static class Work {
+    private final Mutation mutation;
+    private final CompletableFuture<Void> future;
+
+    private Work(Mutation mutation) {
+      this.mutation = mutation;
+      this.future = new CompletableFuture<>();
+    }
+  }
+
+  private final BlockingQueue<Work> mutations;
+  private final String table;
+  private final ServerContext context;
+
+  public SharedBatchWriter(String table, ServerContext context, int queueSize) 
{
+    this.table = table;
+    this.context = context;
+    this.mutations = new ArrayBlockingQueue<>(queueSize);
+    var thread =
+        Threads.createCriticalThread("shared batch writer for " + table, 
this::processMutations);
+    thread.start();
+  }
+
+  public void write(Mutation m) {
+    try {
+      var work = new Work(m);
+      mutations.put(work);
+      work.future.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void processMutations() {
+    Timer timer = Timer.startNew();
+    while (true) {
+      ArrayList<Work> batch = new ArrayList<>();
+      try {
+        batch.add(mutations.take());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      }
+
+      var config = new BatchWriterConfig().setMaxWriteThreads(16);
+      try (var writer = context.createBatchWriter(table, config)) {
+        mutations.drainTo(batch);
+        timer.restart();
+        for (var work : batch) {
+          writer.addMutation(work.mutation);
+        }
+        writer.flush();
+        log.trace("Wrote {} mutations in {}ms", batch.size(), 
timer.elapsed(TimeUnit.MILLISECONDS));
+        batch.forEach(work -> work.future.complete(null));
+      } catch (TableNotFoundException | MutationsRejectedException e) {
+        log.debug("Failed to process {} mutations in {}ms", batch.size(),
+            timer.elapsed(TimeUnit.MILLISECONDS), e);
+        batch.forEach(work -> work.future.completeExceptionally(e));
+      }
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
index 48e86a5b67..a3cc78905e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
@@ -18,13 +18,16 @@
  */
 package org.apache.accumulo.test.compaction;
 
-import java.util.List;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.accumulo.coordinator.CompactionCoordinator;
 import org.apache.accumulo.coordinator.CompactionFinalizer;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
 import 
org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -55,7 +58,11 @@ public class TestCompactionCoordinatorForOfflineTable 
extends CompactionCoordina
 
       // write metadata entry
       LOG.info("Writing completed external compaction to metadata table: {}", 
ecfs);
-      context.getAmple().putExternalCompactionFinalStates(List.of(ecfs));
+      try (BatchWriter writer = 
context.createBatchWriter(Ample.DataLevel.USER.metaTable())) {
+        writer.addMutation(ecfs.toMutation());
+      } catch (MutationsRejectedException | TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
 
       // queue RPC if queue is not full
       LOG.info("Skipping tserver notification for completed external 
compaction: {}", ecfs);

Reply via email to