This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new a0239d7a01 Avoids creating a batch writer per completed ext compaction (#5543) a0239d7a01 is described below commit a0239d7a01693332399032f62280daad8d52fbc2 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon May 12 10:55:01 2025 -0400 Avoids creating a batch writer per completed ext compaction (#5543) The compaction coordinator was creating a batch writer per a completed external compaction. The batch writer was used to write a single small mutation. This caused a lot of thread creation and RPCs in the coordinator. Changed the coordinator to use a single batch writer for these mutations. Also update some logging in the coordinator to lower levels to trace and add some timing information. Co-authored-by: Daniel Roberts ddanielr <ddani...@gmail.com> --- .../org/apache/accumulo/core/conf/Property.java | 7 +- .../accumulo/core/metadata/schema/Ample.java | 5 - .../schema/ExternalCompactionFinalState.java | 8 ++ .../apache/accumulo/core/util/threads/Threads.java | 17 ++++ .../accumulo/server/metadata/ServerAmpleImpl.java | 15 --- .../accumulo/coordinator/CompactionFinalizer.java | 85 ++++++++++++---- .../accumulo/coordinator/SharedBatchWriter.java | 107 +++++++++++++++++++++ .../TestCompactionCoordinatorForOfflineTable.java | 11 ++- 8 files changed, 213 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 97cc0319ec..ca73765984 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1584,6 +1584,10 @@ public enum Property { "The interval at which to check for external compaction final state markers in the metadata table.", "2.1.0"), @Experimental + COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE( + "compaction.coordinator.compaction.finalizer.queue.size", "16384", PropertyType.COUNT, + "The number of completed compactions to buffer in memory before blocking.", "2.1.4"), + @Experimental COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL( "compaction.coordinator.tserver.check.interval", "1m", PropertyType.TIMEDURATION, "The interval at which to check the tservers for external compactions.", "2.1.0"), @@ -1928,7 +1932,8 @@ public enum Property { COMPACTOR_MINTHREADS_TIMEOUT, // others - TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME); + TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, + COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE); /** * Checks if the given property may be changed via Zookeeper, but not recognized until the restart diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index f7232d7865..60dc6bbbf5 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -224,11 +224,6 @@ public interface Ample { throw new UnsupportedOperationException(); } - default void - putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> finalStates) { - throw new UnsupportedOperationException(); - } - default Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() { throw new UnsupportedOperationException(); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java index 9212ccde9f..14c1df57b0 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.metadata.schema; import java.util.Base64; +import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.util.TextUtil; @@ -136,4 +137,11 @@ public class ExternalCompactionFinalState { public String toString() { return toJson(); } + + public Mutation toMutation() { + String prefix = MetadataSchema.ExternalCompactionSection.getRowPrefix(); + Mutation m = new Mutation(prefix + getExternalCompactionId().canonical()); + m.put("", "", toJson()); + return m; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java index 76a4029ad3..3ff86913b0 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java @@ -22,9 +22,13 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.OptionalInt; import org.apache.accumulo.core.trace.TraceUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Threads { + private static final Logger log = LoggerFactory.getLogger(Threads.class); + public static final UncaughtExceptionHandler UEH = new AccumuloUncaughtExceptionHandler(); public static class AccumuloDaemonThread extends Thread { @@ -66,4 +70,17 @@ public class Threads { return thread; } + public static Thread createCriticalThread(String name, Runnable r) { + Runnable wrapped = () -> { + try { + r.run(); + } catch (RuntimeException e) { + System.err.println("Critical thread " + name + " died"); + e.printStackTrace(); + Runtime.getRuntime().halt(-1); + } + }; + + return createThread(name, wrapped); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 472aa0a9d7..857853311c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -294,21 +294,6 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { return delFlag; } - @Override - public void - putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState> finalStates) { - try (BatchWriter writer = context.createBatchWriter(DataLevel.USER.metaTable())) { - String prefix = ExternalCompactionSection.getRowPrefix(); - for (ExternalCompactionFinalState finalState : finalStates) { - Mutation m = new Mutation(prefix + finalState.getExternalCompactionId().canonical()); - m.put("", "", finalState.toJson()); - writer.addMutation(m); - } - } catch (MutationsRejectedException | TableNotFoundException e) { - throw new RuntimeException(e); - } - } - @Override public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() { Scanner scanner; diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 3ca9791c70..d61f265c4c 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -36,10 +36,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -52,6 +55,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.thrift.TException; @@ -67,10 +71,14 @@ public class CompactionFinalizer { private final ExecutorService backgroundExecutor; private final BlockingQueue<ExternalCompactionFinalState> pendingNotifications; private final long tserverCheckInterval; + private final SharedBatchWriter sharedBatchWriter; protected CompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor schedExecutor) { this.context = context; - this.pendingNotifications = new ArrayBlockingQueue<>(1000); + var queueSize = + context.getConfiguration().getCount(Property.COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE); + + this.pendingNotifications = new ArrayBlockingQueue<>(queueSize); tserverCheckInterval = this.context.getConfiguration() .getTimeInMillis(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL); @@ -91,6 +99,9 @@ public class CompactionFinalizer { ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay( this::notifyTservers, 0, tserverCheckInterval, TimeUnit.MILLISECONDS)); + + this.sharedBatchWriter = + new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), context, queueSize); } public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize, @@ -99,41 +110,61 @@ public class CompactionFinalizer { var ecfs = new ExternalCompactionFinalState(ecid, extent, FinalState.FINISHED, fileSize, fileEntries); - LOG.debug("Initiating commit for external compaction: {}", ecfs); + LOG.trace("Initiating commit for external compaction: {} {}", ecid, ecfs); // write metadata entry - context.getAmple().putExternalCompactionFinalStates(List.of(ecfs)); + Timer timer = Timer.startNew(); + sharedBatchWriter.write(ecfs.toMutation()); + LOG.trace("{} metadata compation state write completed in {}ms", ecid, + timer.elapsed(TimeUnit.MILLISECONDS)); if (!pendingNotifications.offer(ecfs)) { - LOG.debug("Queue full, notification to tablet server will happen later {}.", ecfs); + LOG.trace("Queue full, notification to tablet server will happen later {}.", ecid); } else { - LOG.debug("Queued tserver notification for completed external compaction: {}", ecfs); + LOG.trace("Queued tserver notification for completed external compaction: {}", ecid); } } public void failCompactions(Map<ExternalCompactionId,KeyExtent> compactionsToFail) { + if (compactionsToFail.size() == 1) { + var e = compactionsToFail.entrySet().iterator().next(); + var ecfs = + new ExternalCompactionFinalState(e.getKey(), e.getValue(), FinalState.FAILED, 0L, 0L); + sharedBatchWriter.write(ecfs.toMutation()); + } else { + try (BatchWriter writer = context.createBatchWriter(Ample.DataLevel.USER.metaTable())) { + for (var e : compactionsToFail.entrySet()) { + var ecfs = + new ExternalCompactionFinalState(e.getKey(), e.getValue(), FinalState.FAILED, 0L, 0L); + writer.addMutation(ecfs.toMutation()); + } + } catch (MutationsRejectedException | TableNotFoundException e) { + throw new RuntimeException(e); + } + } - var finalStates = compactionsToFail.entrySet().stream().map( - e -> new ExternalCompactionFinalState(e.getKey(), e.getValue(), FinalState.FAILED, 0L, 0L)) - .collect(Collectors.toList()); - - context.getAmple().putExternalCompactionFinalStates(finalStates); - - finalStates.forEach(pendingNotifications::offer); + for (var e : compactionsToFail.entrySet()) { + var ecfs = + new ExternalCompactionFinalState(e.getKey(), e.getValue(), FinalState.FAILED, 0L, 0L); + if (!pendingNotifications.offer(ecfs)) { + break; + } + } } private void notifyTserver(Location loc, ExternalCompactionFinalState ecfs) { TabletClientService.Client client = null; + Timer timer = Timer.startNew(); try { client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, loc.getHostAndPort(), context); if (ecfs.getFinalState() == FinalState.FINISHED) { - LOG.debug("Notifying tserver {} that compaction {} has finished.", loc, ecfs); + LOG.trace("Notifying tserver {} that compaction {} has finished.", loc, ecfs); client.compactionJobFinished(TraceUtil.traceInfo(), context.rpcCreds(), ecfs.getExternalCompactionId().canonical(), ecfs.getExtent().toThrift(), ecfs.getFileSize(), ecfs.getEntries()); } else if (ecfs.getFinalState() == FinalState.FAILED) { - LOG.debug("Notifying tserver {} that compaction {} with {} has failed.", loc, + LOG.trace("Notifying tserver {} that compaction {} with {} has failed.", loc, ecfs.getExternalCompactionId(), ecfs); client.compactionJobFailed(TraceUtil.traceInfo(), context.rpcCreds(), ecfs.getExternalCompactionId().canonical(), ecfs.getExtent().toThrift()); @@ -145,6 +176,8 @@ public class CompactionFinalizer { } finally { ThriftUtil.returnClient(client, context); } + LOG.trace("Tserver {} notification of {} {} took {}ms", loc, ecfs.getExternalCompactionId(), + ecfs, timer.elapsed(TimeUnit.MILLISECONDS)); } private void processPending() { @@ -155,16 +188,21 @@ public class CompactionFinalizer { batch.add(pendingNotifications.take()); pendingNotifications.drainTo(batch); + LOG.trace("Processing pending of batch size {}", batch.size()); + List<Future<?>> futures = new ArrayList<>(); List<ExternalCompactionId> statusesToDelete = new ArrayList<>(); Map<KeyExtent,TabletMetadata> tabletsMetadata; var extents = batch.stream().map(ExternalCompactionFinalState::getExtent).collect(toList()); + Timer timer = Timer.startNew(); try (TabletsMetadata tablets = context.getAmple().readTablets().forTablets(extents) .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW, ColumnType.ECOMP).build()) { tabletsMetadata = tablets.stream().collect(toMap(TabletMetadata::getExtent, identity())); } + LOG.trace("Metadata scan completed in {}ms for batch size {}, found {}", + timer.elapsed(TimeUnit.MILLISECONDS), batch.size(), tabletsMetadata.size()); for (ExternalCompactionFinalState ecfs : batch) { @@ -190,10 +228,14 @@ public class CompactionFinalizer { } if (!statusesToDelete.isEmpty()) { - LOG.info( - "Deleting unresolvable completed external compactions from metadata table, ids: {}", - statusesToDelete); + timer.restart(); context.getAmple().deleteExternalCompactionFinalStates(statusesToDelete); + LOG.info( + "Deleted unresolvable completed external compactions from metadata table, ids: {} in {}ms", + statusesToDelete.size(), timer.elapsed(TimeUnit.MILLISECONDS)); + for (var ecid : statusesToDelete) { + LOG.debug("Deleted unresolvable completed external compaction {}", ecid); + } } for (Future<?> future : futures) { @@ -214,14 +256,19 @@ public class CompactionFinalizer { } private void notifyTservers() { + Timer timer = Timer.startNew(); try (var finalStatesStream = context.getAmple().getExternalCompactionFinalStates()) { + int count = 0; Iterator<ExternalCompactionFinalState> finalStates = finalStatesStream.iterator(); while (finalStates.hasNext()) { ExternalCompactionFinalState state = finalStates.next(); - LOG.debug("Found external compaction in final state: {}, queueing for tserver notification", + count++; + LOG.trace("Found external compaction in final state: {}, queueing for tserver notification", state); pendingNotifications.put(state); } + LOG.trace("Added {} final compaction states to notification queue in {}ms", count, + timer.elapsed(TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java new file mode 100644 index 0000000000..b25fd5f64e --- /dev/null +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.coordinator; + +import java.util.ArrayList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.server.ServerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class supports the use case of many threads writing a single mutation to a table. It avoids + * each thread creating its own batch writer which creates threads and makes 3 RPCs to write the + * single mutation. Using this class results in much less thread creation and RPCs. + */ +public class SharedBatchWriter { + private static final Logger log = LoggerFactory.getLogger(SharedBatchWriter.class); + + private static class Work { + private final Mutation mutation; + private final CompletableFuture<Void> future; + + private Work(Mutation mutation) { + this.mutation = mutation; + this.future = new CompletableFuture<>(); + } + } + + private final BlockingQueue<Work> mutations; + private final String table; + private final ServerContext context; + + public SharedBatchWriter(String table, ServerContext context, int queueSize) { + this.table = table; + this.context = context; + this.mutations = new ArrayBlockingQueue<>(queueSize); + var thread = + Threads.createCriticalThread("shared batch writer for " + table, this::processMutations); + thread.start(); + } + + public void write(Mutation m) { + try { + var work = new Work(m); + mutations.put(work); + work.future.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + private void processMutations() { + Timer timer = Timer.startNew(); + while (true) { + ArrayList<Work> batch = new ArrayList<>(); + try { + batch.add(mutations.take()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + + var config = new BatchWriterConfig().setMaxWriteThreads(16); + try (var writer = context.createBatchWriter(table, config)) { + mutations.drainTo(batch); + timer.restart(); + for (var work : batch) { + writer.addMutation(work.mutation); + } + writer.flush(); + log.trace("Wrote {} mutations in {}ms", batch.size(), timer.elapsed(TimeUnit.MILLISECONDS)); + batch.forEach(work -> work.future.complete(null)); + } catch (TableNotFoundException | MutationsRejectedException e) { + log.debug("Failed to process {} mutations in {}ms", batch.size(), + timer.elapsed(TimeUnit.MILLISECONDS), e); + batch.forEach(work -> work.future.completeExceptionally(e)); + } + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java index 48e86a5b67..a3cc78905e 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java @@ -18,13 +18,16 @@ */ package org.apache.accumulo.test.compaction; -import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.accumulo.coordinator.CompactionCoordinator; import org.apache.accumulo.coordinator.CompactionFinalizer; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -55,7 +58,11 @@ public class TestCompactionCoordinatorForOfflineTable extends CompactionCoordina // write metadata entry LOG.info("Writing completed external compaction to metadata table: {}", ecfs); - context.getAmple().putExternalCompactionFinalStates(List.of(ecfs)); + try (BatchWriter writer = context.createBatchWriter(Ample.DataLevel.USER.metaTable())) { + writer.addMutation(ecfs.toMutation()); + } catch (MutationsRejectedException | TableNotFoundException e) { + throw new RuntimeException(e); + } // queue RPC if queue is not full LOG.info("Skipping tserver notification for completed external compaction: {}", ecfs);