This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 3849fffeddeb29ec92b74926c15e756f182bb7c3 Merge: 422d48a432 f3d5fb01d7 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Mar 4 20:14:59 2024 +0000 Merge branch 'main' into elasticity .../core/clientImpl/ThriftTransportKey.java | 29 ++++-- .../core/clientImpl/ThriftTransportPool.java | 110 +++++---------------- .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++++++++++--------- .../core/rpc/clients/ThriftClientTypes.java | 6 +- .../core/clientImpl/ThriftTransportKeyTest.java | 25 ++--- .../apache/accumulo/test/TransportCachingIT.java | 42 ++++---- .../test/functional/MemoryStarvedScanIT.java | 41 +++++--- 8 files changed, 182 insertions(+), 183 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 8273f8e5b8,59b9a535b8..0becd57120 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@@ -31,6 -30,6 +30,7 @@@ import java.util.Iterator import java.util.List; import java.util.Map; import java.util.Map.Entry; ++import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.DoubleAdder; @@@ -41,23 -39,12 +41,21 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.clientImpl.ClientContext; - import org.apache.accumulo.core.clientImpl.ThriftTransportKey; +import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; ++import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; - import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.MemoryUnit; @@@ -67,7 -54,6 +65,8 @@@ import org.apache.accumulo.test.metrics import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.transport.TTransport; ++import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@@ -75,6 -61,6 +74,8 @@@ import org.junit.jupiter.api.Test import org.slf4j.Logger; import org.slf4j.LoggerFactory; ++import com.google.common.net.HostAndPort; ++ public class MemoryStarvedScanIT extends SharedMiniClusterBase { public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback { @@@ -187,25 -173,10 +188,35 @@@ } static void freeServerMemory(AccumuloClient client) throws Exception { - // Instantiating this class on the TabletServer will free the memory as it - // frees the buffers created by the MemoryConsumingIterator in its constructor. - client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(), - WrappingIterator.class.getName()); + ++ // This does not call ThriftClientTypes.CLIENT.execute because ++ // we only want to communicate with the TabletServer for this test + final ClientContext context = (ClientContext) client; + final long rpcTimeout = context.getClientTimeoutInMillis(); - final ArrayList<ThriftTransportKey> servers = new ArrayList<>(); + final String serverPath = context.getZooKeeperRoot() + Constants.ZTSERVERS; + final ZooCache zc = context.getZooCache(); + + for (String server : zc.getChildren(serverPath)) { - ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server); - zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.CLIENT)) - .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) - .ifPresent(servers::add); ++ var zLocPath = ServiceLock.path(serverPath + "/" + server); ++ Optional<ServiceLockData> data = zc.getLockData(zLocPath); ++ if (data != null && data.isPresent()) { ++ HostAndPort tserverClientAddress = data.orElseThrow().getAddress(ThriftService.CLIENT); ++ if (tserverClientAddress != null) { ++ try { ++ TTransport transport = context.getTransportPool().getTransport(ThriftClientTypes.CLIENT, ++ tserverClientAddress, rpcTimeout, context, true); ++ Client c = ThriftUtil.createClient(ThriftClientTypes.CLIENT, transport); ++ if (c.checkClass(new TInfo(), context.rpcCreds(), MemoryFreeingIterator.class.getName(), ++ WrappingIterator.class.getName())) { ++ break; ++ } ++ } catch (TTransportException e) { ++ LOG.trace("Error creating transport to {}", tserverClientAddress); ++ continue; ++ } ++ } ++ } + } - - Pair<String,TTransport> pair = context.getTransportPool().getAnyTransport(servers, false); - Client clientService = ThriftUtil.createClient(ThriftClientTypes.CLIENT, pair.getSecond()); - clientService.checkClass(new TInfo(), context.rpcCreds(), MemoryFreeingIterator.class.getName(), - WrappingIterator.class.getName()); - } @Test