This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 262d96cc65 always uses conditional writer interceptor in test ample (#4792) 262d96cc65 is described below commit 262d96cc654e7b9270a9c59d1196ae9c1d502778 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Aug 9 12:26:33 2024 -0400 always uses conditional writer interceptor in test ample (#4792) Ample has two ways to write conditional mutations. TestAmple was only intercepting conditional mutations for one of these methods. This commit updates TestAmple to incercept both. Also made TestAmple set more aggressive retry times for failed conditional muations. This speeds up all of the FlakyAmple ITs. This change is important because more conditional mutations are failing with these changes and the those tests were running slower. --- .../AsyncConditionalTabletsMutatorImpl.java | 21 ++++++++------------- .../metadata/ConditionalTabletsMutatorImpl.java | 10 +++++++--- .../accumulo/server/metadata/ServerAmpleImpl.java | 3 ++- .../accumulo/test/ample/metadata/TestAmple.java | 13 +++++++++++-- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java index fb3a43dcd7..ac5d186356 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java @@ -25,38 +25,33 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.Consumer; -import java.util.function.Function; +import java.util.function.Supplier; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.server.ServerContext; import com.google.common.annotations.VisibleForTesting; public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditionalTabletsMutator { private final Consumer<Ample.ConditionalResult> resultsConsumer; private final ExecutorService executor; + private final Supplier<Ample.ConditionalTabletsMutator> mutatorFactory; private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing = null; - private ConditionalTabletsMutatorImpl bufferingMutator; - private final ServerContext context; + private Ample.ConditionalTabletsMutator bufferingMutator; private long mutatedTablets = 0; public static final int BATCH_SIZE = 1000; - private final Function<DataLevel,String> tableMapper; @VisibleForTesting - public AsyncConditionalTabletsMutatorImpl(ServerContext context, - Function<DataLevel,String> tableMapper, Consumer<Ample.ConditionalResult> resultsConsumer) { + public AsyncConditionalTabletsMutatorImpl(Consumer<Ample.ConditionalResult> resultsConsumer, + Supplier<Ample.ConditionalTabletsMutator> mutatorFactory) { this.resultsConsumer = Objects.requireNonNull(resultsConsumer); - this.context = context; - this.bufferingMutator = new ConditionalTabletsMutatorImpl(context, tableMapper); + this.mutatorFactory = mutatorFactory; + this.bufferingMutator = mutatorFactory.get(); var creatorId = Thread.currentThread().getId(); this.executor = Executors.newSingleThreadExecutor(runnable -> Threads.createThread( "Async conditional tablets mutator background thread, created by : #" + creatorId, runnable)); - this.tableMapper = Objects.requireNonNull(tableMapper); - } @Override @@ -80,7 +75,7 @@ public class AsyncConditionalTabletsMutatorImpl implements Ample.AsyncConditiona return result; }); - bufferingMutator = new ConditionalTabletsMutatorImpl(context, tableMapper); + bufferingMutator = mutatorFactory.get(); mutatedTablets = 0; } mutatedTablets++; diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java index 11412e2e2a..3dbdfb5842 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java @@ -163,6 +163,12 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu } } + protected Retry createUnknownRetry() { + return Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); + } + private Iterator<ConditionalWriter.Result> writeMutations(ConditionalWriter conditionalWriter) { var results = conditionalWriter.write(mutations.iterator()); @@ -175,9 +181,7 @@ public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMu while (!unknownResults.isEmpty()) { try { if (retry == null) { - retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) - .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5) - .logInterval(Duration.ofMinutes(3)).createRetry(); + retry = createUnknownRetry(); } retry.waitForNextAttempt(log, "handle conditional mutations with unknown status"); } catch (InterruptedException e) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index c136e4503e..7cc3dd1de4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -99,7 +99,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { @Override public AsyncConditionalTabletsMutator conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) { - return new AsyncConditionalTabletsMutatorImpl(context, getTableMapper(), resultsConsumer); + return new AsyncConditionalTabletsMutatorImpl(resultsConsumer, + () -> new ConditionalTabletsMutatorImpl(context, getTableMapper())); } private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) { diff --git a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java index 99dc332d3a..0b1c2b9e84 100644 --- a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java +++ b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.ample.metadata; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -56,6 +57,7 @@ import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl; import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; @@ -122,8 +124,8 @@ public class TestAmple { @Override public AsyncConditionalTabletsMutator conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) { - return new AsyncConditionalTabletsMutatorImpl(getContext(), getTableMapper(), - resultsConsumer); + return new AsyncConditionalTabletsMutatorImpl(resultsConsumer, + () -> conditionallyMutateTablets(cwInterceptor.get())); } @Override @@ -200,6 +202,13 @@ public class TestAmple { return new ConditionalTabletsMutatorImpl(getContext(), tables::get) { + @Override + protected Retry createUnknownRetry() { + return Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(3)) + .incrementBy(Duration.ofMillis(3)).maxWait(Duration.ofMillis(50)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); + } + @Override protected ConditionalWriter createConditionalWriter(Ample.DataLevel dataLevel) throws TableNotFoundException {