This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 3849fffeddeb29ec92b74926c15e756f182bb7c3
Merge: 422d48a432 f3d5fb01d7
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Mar 4 20:14:59 2024 +0000

    Merge branch 'main' into elasticity

 .../core/clientImpl/ThriftTransportKey.java        |  29 ++++--
 .../core/clientImpl/ThriftTransportPool.java       | 110 +++++----------------
 .../org/apache/accumulo/core/rpc/ThriftUtil.java   |   7 +-
 .../accumulo/core/rpc/clients/TServerClient.java   | 105 +++++++++++---------
 .../core/rpc/clients/ThriftClientTypes.java        |   6 +-
 .../core/clientImpl/ThriftTransportKeyTest.java    |  25 ++---
 .../apache/accumulo/test/TransportCachingIT.java   |  42 ++++----
 .../test/functional/MemoryStarvedScanIT.java       |  41 +++++---
 8 files changed, 182 insertions(+), 183 deletions(-)

diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 8273f8e5b8,59b9a535b8..0becd57120
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@@ -31,6 -30,6 +30,7 @@@ import java.util.Iterator
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
++import java.util.Optional;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.DoubleAdder;
  
@@@ -41,23 -39,12 +41,21 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.Scanner;
  import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
- import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
 +import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.fate.zookeeper.ZooCache;
  import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.apache.accumulo.core.lock.ServiceLock;
- import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
++import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
  import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
- import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
  import org.apache.accumulo.harness.SharedMiniClusterBase;
  import org.apache.accumulo.minicluster.MemoryUnit;
@@@ -67,7 -54,6 +65,8 @@@ import org.apache.accumulo.test.metrics
  import org.apache.accumulo.test.metrics.TestStatsDSink;
  import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
  import org.apache.hadoop.conf.Configuration;
 +import org.apache.thrift.transport.TTransport;
++import org.apache.thrift.transport.TTransportException;
  import org.junit.jupiter.api.AfterAll;
  import org.junit.jupiter.api.BeforeAll;
  import org.junit.jupiter.api.BeforeEach;
@@@ -75,6 -61,6 +74,8 @@@ import org.junit.jupiter.api.Test
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import com.google.common.net.HostAndPort;
++
  public class MemoryStarvedScanIT extends SharedMiniClusterBase {
  
    public static class MemoryStarvedITConfiguration implements 
MiniClusterConfigurationCallback {
@@@ -187,25 -173,10 +188,35 @@@
    }
  
    static void freeServerMemory(AccumuloClient client) throws Exception {
 -    // Instantiating this class on the TabletServer will free the memory as it
 -    // frees the buffers created by the MemoryConsumingIterator in its 
constructor.
 -    
client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(),
 -        WrappingIterator.class.getName());
 +
++    // This does not call ThriftClientTypes.CLIENT.execute because
++    // we only want to communicate with the TabletServer for this test
 +    final ClientContext context = (ClientContext) client;
 +    final long rpcTimeout = context.getClientTimeoutInMillis();
-     final ArrayList<ThriftTransportKey> servers = new ArrayList<>();
 +    final String serverPath = context.getZooKeeperRoot() + 
Constants.ZTSERVERS;
 +    final ZooCache zc = context.getZooCache();
 +
 +    for (String server : zc.getChildren(serverPath)) {
-       ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server);
-       zc.getLockData(zLocPath).map(sld -> 
sld.getAddress(ThriftService.CLIENT))
-           .map(address -> new ThriftTransportKey(address, rpcTimeout, 
context))
-           .ifPresent(servers::add);
++      var zLocPath = ServiceLock.path(serverPath + "/" + server);
++      Optional<ServiceLockData> data = zc.getLockData(zLocPath);
++      if (data != null && data.isPresent()) {
++        HostAndPort tserverClientAddress = 
data.orElseThrow().getAddress(ThriftService.CLIENT);
++        if (tserverClientAddress != null) {
++          try {
++            TTransport transport = 
context.getTransportPool().getTransport(ThriftClientTypes.CLIENT,
++                tserverClientAddress, rpcTimeout, context, true);
++            Client c = ThriftUtil.createClient(ThriftClientTypes.CLIENT, 
transport);
++            if (c.checkClass(new TInfo(), context.rpcCreds(), 
MemoryFreeingIterator.class.getName(),
++                WrappingIterator.class.getName())) {
++              break;
++            }
++          } catch (TTransportException e) {
++            LOG.trace("Error creating transport to {}", tserverClientAddress);
++            continue;
++          }
++        }
++      }
 +    }
- 
-     Pair<String,TTransport> pair = 
context.getTransportPool().getAnyTransport(servers, false);
-     Client clientService = ThriftUtil.createClient(ThriftClientTypes.CLIENT, 
pair.getSecond());
-     clientService.checkClass(new TInfo(), context.rpcCreds(), 
MemoryFreeingIterator.class.getName(),
-         WrappingIterator.class.getName());
- 
    }
  
    @Test

Reply via email to