This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 8f9636b268 Modified Compactor and ScanServer to advertise Client service (#3928) (#3951) 8f9636b268 is described below commit 8f9636b268acf40a36bf462f3f1a3578426fb840 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Nov 14 10:39:55 2023 -0500 Modified Compactor and ScanServer to advertise Client service (#3928) (#3951) Modified Compactor and ScanServer to advertise Client service and modified client code to use TabletServer, ScanServer, and Compactor addresses when trying to execute an operation on the Client service API. Fixes #3624 (cherry picked from commit 65f0204884f04e339699d25426e499c046b127d9) --- .../core/clientImpl/TableOperationsImpl.java | 2 +- .../rpc/clients/ClientServiceThriftClient.java | 7 +++-- .../accumulo/core/rpc/clients/TServerClient.java | 35 ++++++++++++++-------- .../TabletManagementClientServiceThriftClient.java | 1 - .../core/rpc/clients/TabletServerThriftClient.java | 7 +++-- .../accumulo/server/rpc/ThriftProcessorTypes.java | 12 +++++--- .../org/apache/accumulo/compactor/Compactor.java | 18 +++++++++-- .../org/apache/accumulo/tserver/ScanServer.java | 19 ++++++++++-- 8 files changed, 71 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 6fa2470401..e9e784b23e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -1503,7 +1503,7 @@ public class TableOperationsImpl extends TableOperationsHelper { // this operation may us a lot of memory... its likely that connections to tabletservers // hosting metadata tablets will be cached, so do not use cached // connections - pair = ThriftClientTypes.CLIENT.getTabletServerConnection(context, false); + pair = ThriftClientTypes.CLIENT.getThriftServerConnection(context, false); diskUsages = pair.getSecond().getDiskUsage(tableNames, context.rpcCreds()); } catch (ThriftTableOperationException e) { switch (e.getType()) { diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java index 5a6a912647..c1dc9954da 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ClientServiceThriftClient.java @@ -24,6 +24,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.util.Pair; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; @@ -40,10 +41,10 @@ public class ClientServiceThriftClient extends ThriftClientTypes<Client> } @Override - public Pair<String,Client> getTabletServerConnection(ClientContext context, + public Pair<String,Client> getThriftServerConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException { - return getTabletServerConnection(LOG, this, context, preferCachedConnections, - warnedAboutTServersBeingDown); + return getThriftServerConnection(LOG, this, context, preferCachedConnections, + warnedAboutTServersBeingDown, ThriftService.CLIENT); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index c60c91c5d6..121bde735b 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@ -21,8 +21,11 @@ package org.apache.accumulo.core.rpc.clients; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.Constants; @@ -48,25 +51,33 @@ import org.slf4j.Logger; public interface TServerClient<C extends TServiceClient> { - Pair<String,C> getTabletServerConnection(ClientContext context, boolean preferCachedConnections) + Pair<String,C> getThriftServerConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException; - default Pair<String,C> getTabletServerConnection(Logger LOG, ThriftClientTypes<C> type, - ClientContext context, boolean preferCachedConnections, AtomicBoolean warned) - throws TTransportException { + default Pair<String,C> getThriftServerConnection(Logger LOG, ThriftClientTypes<C> type, + ClientContext context, boolean preferCachedConnections, AtomicBoolean warned, + ThriftService service) throws TTransportException { checkArgument(context != null, "context is null"); long rpcTimeout = context.getClientTimeoutInMillis(); // create list of servers ArrayList<ThriftTransportKey> servers = new ArrayList<>(); // add tservers + List<String> serverPaths = new ArrayList<>(); + serverPaths.add(context.getZooKeeperRoot() + Constants.ZTSERVERS); + if (type == ThriftClientTypes.CLIENT) { + serverPaths.add(context.getZooKeeperRoot() + Constants.ZCOMPACTORS); + serverPaths.add(context.getZooKeeperRoot() + Constants.ZSSERVERS); + Collections.shuffle(serverPaths, RANDOM.get()); + } ZooCache zc = context.getZooCache(); - for (String tserver : zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS)) { - var zLocPath = - ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + tserver); - zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.TSERV)) - .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) - .ifPresent(servers::add); + for (String serverPath : serverPaths) { + for (String server : zc.getChildren(serverPath)) { + var zLocPath = ServiceLock.path(serverPath + "/" + server); + zc.getLockData(zLocPath).map(sld -> sld.getAddress(service)) + .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) + .ifPresent(servers::add); + } } boolean opened = false; @@ -96,7 +107,7 @@ public interface TServerClient<C extends TServiceClient> { String server = null; C client = null; try { - Pair<String,C> pair = getTabletServerConnection(context, true); + Pair<String,C> pair = getThriftServerConnection(context, true); server = pair.getFirst(); client = pair.getSecond(); return exec.execute(client); @@ -123,7 +134,7 @@ public interface TServerClient<C extends TServiceClient> { String server = null; C client = null; try { - Pair<String,C> pair = getTabletServerConnection(context, true); + Pair<String,C> pair = getThriftServerConnection(context, true); server = pair.getFirst(); client = pair.getSecond(); exec.execute(client); diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java index a5464ea14a..d19a2a45e5 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletManagementClientServiceThriftClient.java @@ -29,5 +29,4 @@ public class TabletManagementClientServiceThriftClient extends ThriftClientTypes public TabletManagementClientServiceThriftClient(String serviceName) { super(serviceName, new Client.Factory()); } - } diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java index f672e14610..3f2f71bc7a 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TabletServerThriftClient.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client; import org.apache.accumulo.core.util.Pair; import org.apache.thrift.transport.TTransportException; @@ -44,10 +45,10 @@ public class TabletServerThriftClient extends ThriftClientTypes<Client> } @Override - public Pair<String,Client> getTabletServerConnection(ClientContext context, + public Pair<String,Client> getThriftServerConnection(ClientContext context, boolean preferCachedConnections) throws TTransportException { - return getTabletServerConnection(LOG, this, context, preferCachedConnections, - warnedAboutTServersBeingDown); + return getThriftServerConnection(LOG, this, context, preferCachedConnections, + warnedAboutTServersBeingDown, ThriftService.TSERV); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java index 3913fce3f2..7e497c3145 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java @@ -95,9 +95,11 @@ public class ThriftProcessorTypes<C extends TServiceClient> extends ThriftClient public static final ThriftProcessorTypes<TabletManagementClientService.Client> TABLET_MGMT = new ThriftProcessorTypes<>(ThriftClientTypes.TABLET_MGMT); - public static TMultiplexedProcessor getCompactorTProcessor(CompactorService.Iface serviceHandler, - ServerContext context) { + public static TMultiplexedProcessor getCompactorTProcessor(ClientServiceHandler clientHandler, + CompactorService.Iface serviceHandler, ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); + muxProcessor.registerProcessor(CLIENT.getServiceName(), CLIENT.getTProcessor( + ClientService.Processor.class, ClientService.Iface.class, clientHandler, context)); muxProcessor.registerProcessor(COMPACTOR.getServiceName(), COMPACTOR.getTProcessor( CompactorService.Processor.class, CompactorService.Iface.class, serviceHandler, context)); return muxProcessor; @@ -131,9 +133,11 @@ public class ThriftProcessorTypes<C extends TServiceClient> extends ThriftClient return muxProcessor; } - public static TMultiplexedProcessor - getScanServerTProcessor(TabletScanClientService.Iface tserverHandler, ServerContext context) { + public static TMultiplexedProcessor getScanServerTProcessor(ClientServiceHandler clientHandler, + TabletScanClientService.Iface tserverHandler, ServerContext context) { TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); + muxProcessor.registerProcessor(CLIENT.getServiceName(), CLIENT.getTProcessor( + ClientService.Processor.class, ClientService.Iface.class, clientHandler, context)); muxProcessor.registerProcessor(TABLET_SCAN.getServiceName(), TABLET_SCAN.getTProcessor(TabletScanClientService.Processor.class, TabletScanClientService.Iface.class, tserverHandler, context)); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index df295cb912..fc7d6a0234 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -69,6 +69,8 @@ import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -92,6 +94,7 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.CompactionWatcher; import org.apache.accumulo.server.compaction.FileCompactor; @@ -104,6 +107,7 @@ import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@ -278,8 +282,13 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac for (int i = 0; i < 25; i++) { zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); - if (compactorLock.tryLock(lw, - new ServiceLockData(compactorId, hostPort, ThriftService.COMPACTOR, this.queueName))) { + ServiceDescriptors descriptors = new ServiceDescriptors(); + for (ThriftService svc : new ThriftService[] {ThriftService.CLIENT, + ThriftService.COMPACTOR}) { + descriptors.addService(new ServiceDescriptor(compactorId, svc, hostPort, this.queueName)); + } + + if (compactorLock.tryLock(lw, new ServiceLockData(descriptors))) { LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath()); return; } @@ -302,7 +311,10 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac * @throws UnknownHostException host unknown */ protected ServerAddress startCompactorClientService() throws UnknownHostException { - var processor = ThriftProcessorTypes.getCompactorTProcessor(this, getContext()); + + ClientServiceHandler clientHandler = + new ClientServiceHandler(getContext(), new TransactionWatcher(getContext())); + var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler, this, getContext()); Property maxMessageSizeProperty = (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null ? Property.COMPACTOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 102267aaf7..8868065aa4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -70,6 +70,8 @@ import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; +import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -89,6 +91,7 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.compaction.PausedCompactionMetrics; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; @@ -96,6 +99,7 @@ import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; import org.apache.accumulo.tserver.session.MultiScanSession; @@ -260,7 +264,10 @@ public class ScanServer extends AbstractServer // This class implements TabletClientService.Iface and then delegates calls. Be sure // to set up the ThriftProcessor using this class, not the delegate. - TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, getContext()); + ClientServiceHandler clientHandler = + new ClientServiceHandler(context, new TransactionWatcher(context)); + TProcessor processor = + ThriftProcessorTypes.getScanServerTProcessor(clientHandler, this, getContext()); Property maxMessageSizeProperty = (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null @@ -326,8 +333,14 @@ public class ScanServer extends AbstractServer for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); - if (scanServerLock.tryLock(lw, new ServiceLockData(serverLockUUID, getClientAddressString(), - ThriftService.TABLET_SCAN, this.groupName))) { + ServiceDescriptors descriptors = new ServiceDescriptors(); + for (ThriftService svc : new ThriftService[] {ThriftService.CLIENT, + ThriftService.TABLET_SCAN}) { + descriptors.addService( + new ServiceDescriptor(serverLockUUID, svc, getClientAddressString(), this.groupName)); + } + + if (scanServerLock.tryLock(lw, new ServiceLockData(descriptors))) { LOG.debug("Obtained scan server lock {}", scanServerLock.getLockPath()); return scanServerLock; }