This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 648529487f Fix MemoryStarved ITs (#4062) 648529487f is described below commit 648529487f1af3bb6139e91bf5b76db7ebc08b78 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Dec 13 07:56:23 2023 -0500 Fix MemoryStarved ITs (#4062) In #3951 the ScanServer and Compactor were modified to advertise the Thrift Client service. Since then, the freeServerMemory method would connect to any TabletServer, Compactor, or ScanServer during the test. However, we need it to only connect to the TabletServer. Modified freeServerMemory to only connect to the TabletServer and introduced a wait in MemoryStarvedMajCIT as it was trying to connect before the Compactor was fully up. --- .../test/functional/MemoryStarvedMajCIT.java | 11 ++++++- .../test/functional/MemoryStarvedScanIT.java | 35 ++++++++++++++++++++-- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 9ec1094ea7..98222a5fa6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -165,7 +166,15 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase { // Calling getRunningCompaction on the MemoryConsumingCompactor // will consume the free memory LOG.info("Calling getRunningCompaction on {}", compactorAddr); - ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx); + boolean success = false; + while (!success) { + try { + ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx); + success = true; + } catch (Exception e) { + UtilWaitThread.sleep(3000); + } + } ReadWriteIT.ingest(client, 100, 100, 100, 0, table); compactionThread.start(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 5d71a2fed5..8273f8e5b8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.test.util.Wait.waitFor; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -33,18 +34,30 @@ import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.DoubleAdder; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.ThriftTransportKey; +import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.MemoryUnit; @@ -54,6 +67,7 @@ import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.transport.TTransport; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -173,10 +187,25 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { } static void freeServerMemory(AccumuloClient client) throws Exception { - // Instantiating this class on the TabletServer will free the memory as it - // frees the buffers created by the MemoryConsumingIterator in its constructor. - client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(), + + final ClientContext context = (ClientContext) client; + final long rpcTimeout = context.getClientTimeoutInMillis(); + final ArrayList<ThriftTransportKey> servers = new ArrayList<>(); + final String serverPath = context.getZooKeeperRoot() + Constants.ZTSERVERS; + final ZooCache zc = context.getZooCache(); + + for (String server : zc.getChildren(serverPath)) { + ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server); + zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.CLIENT)) + .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) + .ifPresent(servers::add); + } + + Pair<String,TTransport> pair = context.getTransportPool().getAnyTransport(servers, false); + Client clientService = ThriftUtil.createClient(ThriftClientTypes.CLIENT, pair.getSecond()); + clientService.checkClass(new TInfo(), context.rpcCreds(), MemoryFreeingIterator.class.getName(), WrappingIterator.class.getName()); + } @Test