This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 262d96cc65 always uses conditional writer interceptor in test ample 
(#4792)
262d96cc65 is described below

commit 262d96cc654e7b9270a9c59d1196ae9c1d502778
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Aug 9 12:26:33 2024 -0400

    always uses conditional writer interceptor in test ample (#4792)
    
    Ample has two ways to write conditional mutations.  TestAmple was
    only intercepting conditional mutations for one of these methods.
    This commit updates TestAmple to incercept both.
    
    Also made TestAmple set more aggressive retry times for failed
    conditional muations.  This speeds up all of the FlakyAmple ITs.
    This change is important because more conditional mutations are
    failing with these changes and the those tests were running
    slower.
---
 .../AsyncConditionalTabletsMutatorImpl.java         | 21 ++++++++-------------
 .../metadata/ConditionalTabletsMutatorImpl.java     | 10 +++++++---
 .../accumulo/server/metadata/ServerAmpleImpl.java   |  3 ++-
 .../accumulo/test/ample/metadata/TestAmple.java     | 13 +++++++++++--
 4 files changed, 28 insertions(+), 19 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
index fb3a43dcd7..ac5d186356 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
@@ -25,38 +25,33 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.util.threads.Threads;
-import org.apache.accumulo.server.ServerContext;
 
 import com.google.common.annotations.VisibleForTesting;
 
 public class AsyncConditionalTabletsMutatorImpl implements 
Ample.AsyncConditionalTabletsMutator {
   private final Consumer<Ample.ConditionalResult> resultsConsumer;
   private final ExecutorService executor;
+  private final Supplier<Ample.ConditionalTabletsMutator> mutatorFactory;
   private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing 
= null;
-  private ConditionalTabletsMutatorImpl bufferingMutator;
-  private final ServerContext context;
+  private Ample.ConditionalTabletsMutator bufferingMutator;
   private long mutatedTablets = 0;
   public static final int BATCH_SIZE = 1000;
-  private final Function<DataLevel,String> tableMapper;
 
   @VisibleForTesting
-  public AsyncConditionalTabletsMutatorImpl(ServerContext context,
-      Function<DataLevel,String> tableMapper, 
Consumer<Ample.ConditionalResult> resultsConsumer) {
+  public AsyncConditionalTabletsMutatorImpl(Consumer<Ample.ConditionalResult> 
resultsConsumer,
+      Supplier<Ample.ConditionalTabletsMutator> mutatorFactory) {
     this.resultsConsumer = Objects.requireNonNull(resultsConsumer);
-    this.context = context;
-    this.bufferingMutator = new ConditionalTabletsMutatorImpl(context, 
tableMapper);
+    this.mutatorFactory = mutatorFactory;
+    this.bufferingMutator = mutatorFactory.get();
     var creatorId = Thread.currentThread().getId();
     this.executor = Executors.newSingleThreadExecutor(runnable -> 
Threads.createThread(
         "Async conditional tablets mutator background thread, created by : #" 
+ creatorId,
         runnable));
-    this.tableMapper = Objects.requireNonNull(tableMapper);
-
   }
 
   @Override
@@ -80,7 +75,7 @@ public class AsyncConditionalTabletsMutatorImpl implements 
Ample.AsyncConditiona
         return result;
       });
 
-      bufferingMutator = new ConditionalTabletsMutatorImpl(context, 
tableMapper);
+      bufferingMutator = mutatorFactory.get();
       mutatedTablets = 0;
     }
     mutatedTablets++;
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 11412e2e2a..3dbdfb5842 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -163,6 +163,12 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
     }
   }
 
+  protected Retry createUnknownRetry() {
+    return Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
+        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5)
+        .logInterval(Duration.ofMinutes(3)).createRetry();
+  }
+
   private Iterator<ConditionalWriter.Result> writeMutations(ConditionalWriter 
conditionalWriter) {
     var results = conditionalWriter.write(mutations.iterator());
 
@@ -175,9 +181,7 @@ public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMu
     while (!unknownResults.isEmpty()) {
       try {
         if (retry == null) {
-          retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
-              
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5)
-              .logInterval(Duration.ofMinutes(3)).createRetry();
+          retry = createUnknownRetry();
         }
         retry.waitForNextAttempt(log, "handle conditional mutations with 
unknown status");
       } catch (InterruptedException e) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index c136e4503e..7cc3dd1de4 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -99,7 +99,8 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   @Override
   public AsyncConditionalTabletsMutator
       conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) {
-    return new AsyncConditionalTabletsMutatorImpl(context, getTableMapper(), 
resultsConsumer);
+    return new AsyncConditionalTabletsMutatorImpl(resultsConsumer,
+        () -> new ConditionalTabletsMutatorImpl(context, getTableMapper()));
   }
 
   private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java 
b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
index 99dc332d3a..0b1c2b9e84 100644
--- a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
+++ b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test.ample.metadata;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -56,6 +57,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
 import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
@@ -122,8 +124,8 @@ public class TestAmple {
     @Override
     public AsyncConditionalTabletsMutator
         conditionallyMutateTablets(Consumer<ConditionalResult> 
resultsConsumer) {
-      return new AsyncConditionalTabletsMutatorImpl(getContext(), 
getTableMapper(),
-          resultsConsumer);
+      return new AsyncConditionalTabletsMutatorImpl(resultsConsumer,
+          () -> conditionallyMutateTablets(cwInterceptor.get()));
     }
 
     @Override
@@ -200,6 +202,13 @@ public class TestAmple {
 
       return new ConditionalTabletsMutatorImpl(getContext(), tables::get) {
 
+        @Override
+        protected Retry createUnknownRetry() {
+          return 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(3))
+              
.incrementBy(Duration.ofMillis(3)).maxWait(Duration.ofMillis(50)).backOffFactor(1.5)
+              .logInterval(Duration.ofMinutes(3)).createRetry();
+        }
+
         @Override
         protected ConditionalWriter createConditionalWriter(Ample.DataLevel 
dataLevel)
             throws TableNotFoundException {

Reply via email to