This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 648529487f Fix MemoryStarved ITs (#4062)
648529487f is described below

commit 648529487f1af3bb6139e91bf5b76db7ebc08b78
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Dec 13 07:56:23 2023 -0500

    Fix MemoryStarved ITs (#4062)
    
    In #3951 the ScanServer and Compactor were modified to advertise
    the Thrift Client service. Since then, the freeServerMemory method
    would connect to any TabletServer, Compactor, or ScanServer during
    the test. However, we need it to only connect to the TabletServer. Modified
    freeServerMemory to only connect to the TabletServer and introduced
    a wait in MemoryStarvedMajCIT as it was trying to connect before
    the Compactor was fully up.
---
 .../test/functional/MemoryStarvedMajCIT.java       | 11 ++++++-
 .../test/functional/MemoryStarvedScanIT.java       | 35 ++++++++++++++++++++--
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index 9ec1094ea7..98222a5fa6 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -165,7 +166,15 @@ public class MemoryStarvedMajCIT extends 
SharedMiniClusterBase {
       // Calling getRunningCompaction on the MemoryConsumingCompactor
       // will consume the free memory
       LOG.info("Calling getRunningCompaction on {}", compactorAddr);
-      ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx);
+      boolean success = false;
+      while (!success) {
+        try {
+          ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx);
+          success = true;
+        } catch (Exception e) {
+          UtilWaitThread.sleep(3000);
+        }
+      }
 
       ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
       compactionThread.start();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 5d71a2fed5..8273f8e5b8 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -25,6 +25,7 @@ import static org.apache.accumulo.test.util.Wait.waitFor;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -33,18 +34,30 @@ import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.DoubleAdder;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
+import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client;
+import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
+import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.MemoryUnit;
@@ -54,6 +67,7 @@ import 
org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
 import org.apache.accumulo.test.metrics.TestStatsDSink;
 import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.transport.TTransport;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -173,10 +187,25 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
   }
 
   static void freeServerMemory(AccumuloClient client) throws Exception {
-    // Instantiating this class on the TabletServer will free the memory as it
-    // frees the buffers created by the MemoryConsumingIterator in its 
constructor.
-    
client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(),
+
+    final ClientContext context = (ClientContext) client;
+    final long rpcTimeout = context.getClientTimeoutInMillis();
+    final ArrayList<ThriftTransportKey> servers = new ArrayList<>();
+    final String serverPath = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+    final ZooCache zc = context.getZooCache();
+
+    for (String server : zc.getChildren(serverPath)) {
+      ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server);
+      zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.CLIENT))
+          .map(address -> new ThriftTransportKey(address, rpcTimeout, context))
+          .ifPresent(servers::add);
+    }
+
+    Pair<String,TTransport> pair = 
context.getTransportPool().getAnyTransport(servers, false);
+    Client clientService = ThriftUtil.createClient(ThriftClientTypes.CLIENT, 
pair.getSecond());
+    clientService.checkClass(new TInfo(), context.rpcCreds(), 
MemoryFreeingIterator.class.getName(),
         WrappingIterator.class.getName());
+
   }
 
   @Test

Reply via email to