This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 0dc5f771c0 Fix MemoryStarvedMajCIT (#4041)
0dc5f771c0 is described below

commit 0dc5f771c0df1ad35f996679e19f900830e0edca
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Dec 8 15:23:48 2023 -0500

    Fix MemoryStarvedMajCIT (#4041)
    
    Modify IT to use methods on a modified Compactor class to
    consume and free memory instead of using a Scanner with an
    iterator. The scanner approach no longer works because majc's
    no longer occur in the tablet server.
    
    Fixes #4037
---
 .../org/apache/accumulo/compactor/Compactor.java   |  7 +-
 .../test/functional/MemoryConsumingCompactor.java  | 96 ++++++++++++++++++++++
 .../test/functional/MemoryConsumingIterator.java   | 12 ++-
 .../test/functional/MemoryStarvedMajCIT.java       | 72 ++++++++++------
 4 files changed, 158 insertions(+), 29 deletions(-)

diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 9e3c81504c..11a91e09bc 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -304,6 +304,10 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
     }
   }
 
+  protected CompactorService.Iface getCompactorThriftHandlerInterface() {
+    return this;
+  }
+
   /**
    * Start this Compactors thrift service to handle incoming client requests
    *
@@ -313,7 +317,8 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
   protected ServerAddress startCompactorClientService() throws 
UnknownHostException {
 
     ClientServiceHandler clientHandler = new 
ClientServiceHandler(getContext());
-    var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler, 
this, getContext());
+    var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler,
+        getCompactorThriftHandlerInterface(), getContext());
     Property maxMessageSizeProperty =
         (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null
             ? Property.COMPACTOR_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
new file mode 100644
index 0000000000..d725730245
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingCompactor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.clientImpl.thrift.TInfo;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.compaction.thrift.CompactorService;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryConsumingCompactor extends Compactor implements 
CompactorService.Iface {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryConsumingCompactor.class);
+
+  MemoryConsumingCompactor(ConfigOpts opts, String[] args) {
+    super(opts, args);
+  }
+
+  @Override
+  protected CompactorService.Iface getCompactorThriftHandlerInterface() {
+    return this;
+  }
+
+  @Override
+  public void cancel(TInfo tinfo, TCredentials credentials, String 
externalCompactionId)
+      throws TException {
+    // Use the cancel Thrift RPC to free the consumed memory
+    LOG.warn("cancel called, freeing memory");
+    MemoryConsumingIterator.freeBuffers();
+  }
+
+  @Override
+  public TExternalCompactionJob getRunningCompaction(TInfo tinfo, TCredentials 
credentials)
+      throws ThriftSecurityException, TException {
+    // Use the getRunningCompaction Thrift RPC to consume the memory
+    LOG.warn("getRunningCompaction called, consuming memory");
+    try {
+      MemoryConsumingIterator iter = new MemoryConsumingIterator();
+      iter.init((SortedKeyValueIterator<Key,Value>) null, Map.of(),
+          new SystemIteratorEnvironment() {
+
+            @Override
+            public ServerContext getServerContext() {
+              return getContext();
+            }
+
+            @Override
+            public SortedKeyValueIterator<Key,Value>
+                getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
+              return null;
+            }
+
+          });
+      iter.consume();
+    } catch (IOException e) {
+      throw new TException("Error consuming memory", e);
+    }
+    return new TExternalCompactionJob();
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (var compactor = new MemoryConsumingCompactor(new ConfigOpts(), args)) 
{
+      compactor.runServer();
+    }
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
index 2b23fb1cff..fd3f911250 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
@@ -61,10 +61,7 @@ public class MemoryConsumingIterator extends 
WrappingIterator {
     return (int) amountToConsume;
   }
 
-  @Override
-  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
-      throws IOException {
-    LOG.info("seek called");
+  public void consume() throws IOException {
     while (!this.isRunningLowOnMemory()) {
       int amountToConsume = getAmountToConsume();
       if (amountToConsume > 0) {
@@ -83,6 +80,13 @@ public class MemoryConsumingIterator extends 
WrappingIterator {
       }
     }
     LOG.info("Running low on memory == true");
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+      throws IOException {
+    LOG.info("seek called");
+    consume();
     super.seek(range, columnFamilies, inclusive);
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index 5c9e2cd96f..9ec1094ea7 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -30,36 +30,41 @@ import java.util.concurrent.atomic.DoubleAdder;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.minicluster.MemoryUnit;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
 import org.apache.accumulo.test.metrics.TestStatsDSink;
 import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
 
-@Disabled // ELASTICITY_TODO
 public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryStarvedMajCIT.class);
+
   public static class MemoryStarvedITConfiguration implements 
MiniClusterConfigurationCallback {
 
     @Override
     public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
       cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
-      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
-      // Configure the LowMemoryDetector in the TabletServer
+      cfg.getClusterServerConfiguration().setNumDefaultScanServers(0);
+      cfg.getClusterServerConfiguration().setNumDefaultCompactors(1);
       cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
       cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
           Double.toString(MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD));
@@ -120,16 +125,31 @@ public class MemoryStarvedMajCIT extends 
SharedMiniClusterBase {
 
   @Test
   public void testMajCPauses() throws Exception {
+
     String table = getUniqueNames(1)[0];
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 
+      ClientContext ctx = (ClientContext) client;
+
+      // Stop the normal compactors and start the version that will consume
+      // and free memory when we need it to
+      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
+      getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
+          MemoryConsumingCompactor.class);
+      Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() 
== 4, 60_000);
+      Wait.waitFor(
+          () -> 
ExternalCompactionUtil.getCompactorAddrs(ctx).get("user_small").size() == 1,
+          60_000);
+
+      Map<String,List<HostAndPort>> groupedCompactors =
+          ExternalCompactionUtil.getCompactorAddrs(ctx);
+      List<HostAndPort> compactorAddresses = 
groupedCompactors.get("user_small");
+      HostAndPort compactorAddr = compactorAddresses.get(0);
+
       TableOperations to = client.tableOperations();
       to.create(table);
 
-      // Add a small amount of data so that the MemoryConsumingIterator
-      // returns true when trying to consume all of the memory.
-      ReadWriteIT.ingest(client, 1, 1, 1, 0, table);
-
       AtomicReference<Throwable> error = new AtomicReference<>();
       Thread compactionThread = new Thread(() -> {
         try {
@@ -139,25 +159,29 @@ public class MemoryStarvedMajCIT extends 
SharedMiniClusterBase {
         }
       });
 
-      try (Scanner scanner = client.createScanner(table)) {
+      int paused = MAJC_PAUSED.intValue();
+      assertEquals(0, paused);
 
-        MemoryStarvedScanIT.consumeServerMemory(scanner);
+      // Calling getRunningCompaction on the MemoryConsumingCompactor
+      // will consume the free memory
+      LOG.info("Calling getRunningCompaction on {}", compactorAddr);
+      ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx);
 
-        int paused = MAJC_PAUSED.intValue();
-        assertEquals(0, paused);
+      ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
+      compactionThread.start();
 
-        ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
-        compactionThread.start();
+      waitFor(() -> MAJC_PAUSED.intValue() > 0);
 
-        waitFor(() -> MAJC_PAUSED.intValue() > 0);
+      // Calling cancel on the MemoryConsumingCompactor will free
+      // the consumed memory
+      LOG.info("Calling cancel on {}", compactorAddr);
+      ExternalCompactionUtil.cancelCompaction(ctx, compactorAddr, "fakeECID");
 
-        MemoryStarvedScanIT.freeServerMemory(client);
-        compactionThread.interrupt();
-        compactionThread.join();
-        assertNull(error.get());
-        assertTrue(client.instanceOperations().getActiveCompactions().stream()
-            .anyMatch(ac -> ac.getPausedCount() > 0));
-      }
+      compactionThread.interrupt();
+      compactionThread.join();
+      assertNull(error.get());
+      assertTrue(client.instanceOperations().getActiveCompactions().stream()
+          .anyMatch(ac -> ac.getPausedCount() > 0));
     }
 
   }

Reply via email to