This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push: new e1038d4c27c Improvements to ClientContext for ensureOpen (#5258) e1038d4c27c is described below commit e1038d4c27c96f8e7747877af564e46c3e682f89 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Wed Jan 22 15:30:49 2025 -0500 Improvements to ClientContext for ensureOpen (#5258) These changes are small quality fixes to ensure that ClientContext.ensureOpen is used when it is needed, and not used when it isn't. This fixes an issue seen where the client RPC timeout value is being retrieved from a supplier in a thread pool when returning RPC transports after a client is closed. In these cases, ensureOpen does not need to be checked. However, there were a few context API methods where it was not checked but should have been. Also, improved the close method to ensure close activities are only called at most once, and made private and renamed an internal method to get the client properties from the ClientInfo object, so it's more clear which properties the method is returning and isn't exposed for misuse. --- .../accumulo/core/clientImpl/ClientContext.java | 69 +++++++++++----------- .../accumulo/core/clientImpl/ClientInfo.java | 2 +- .../accumulo/core/clientImpl/ClientInfoImpl.java | 2 +- .../standalone/StandaloneAccumuloCluster.java | 2 +- .../org/apache/accumulo/server/ServerInfo.java | 2 +- .../apache/accumulo/server/MockServerContext.java | 3 - .../accumulo/server/rpc/TServerUtilsTest.java | 2 - .../test/server/security/SystemCredentialsIT.java | 2 +- 8 files changed, 40 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 702936aee92..f8bcda14589 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -142,7 +142,7 @@ public class ClientContext implements AccumuloClient { private ThriftTransportPool thriftTransportPool; private ZookeeperLockChecker zkLockChecker; - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(); private SecurityOperations secops = null; private final TableOperationsImpl tableops; @@ -157,22 +157,21 @@ public class ClientContext implements AccumuloClient { private final Supplier<ZooSession> zooSession; private void ensureOpen() { - if (closed) { + if (closed.get()) { throw new IllegalStateException("This client was closed."); } } private ScanServerSelector createScanServerSelector() { - String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties()); + String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(getClientProperties()); try { Class<? extends ScanServerSelector> impl = Class.forName(clazz).asSubclass(ScanServerSelector.class); ScanServerSelector scanServerSelector = impl.getDeclaredConstructor().newInstance(); Map<String,String> sserverProps = new HashMap<>(); - ClientProperty - .getPrefix(info.getProperties(), ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey()) - .forEach((k, v) -> { + ClientProperty.getPrefix(getClientProperties(), + ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey()).forEach((k, v) -> { sserverProps.put( k.toString() .substring(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey().length()), @@ -311,9 +310,8 @@ public class ClientContext implements AccumuloClient { return getCredentials().getToken(); } - public Properties getProperties() { - ensureOpen(); - return info.getProperties(); + private Properties getClientProperties() { + return info.getClientProperties(); } /** @@ -396,7 +394,7 @@ public class ClientContext implements AccumuloClient { public synchronized BatchWriterConfig getBatchWriterConfig() { ensureOpen(); if (batchWriterConfig == null) { - batchWriterConfig = getBatchWriterConfig(info.getProperties()); + batchWriterConfig = getBatchWriterConfig(getClientProperties()); } return batchWriterConfig; } @@ -405,6 +403,7 @@ public class ClientContext implements AccumuloClient { * @return map of live scan server addresses to lock uuids. */ public Map<String,Pair<UUID,String>> getScanServers() { + ensureOpen(); Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>(); String root = this.getZooKeeperRoot() + Constants.ZSSERVERS; var addrs = this.getZooCache().getChildren(root); @@ -455,7 +454,7 @@ public class ClientContext implements AccumuloClient { public synchronized ConditionalWriterConfig getConditionalWriterConfig() { ensureOpen(); if (conditionalWriterConfig == null) { - conditionalWriterConfig = getConditionalWriterConfig(info.getProperties()); + conditionalWriterConfig = getConditionalWriterConfig(getClientProperties()); } return conditionalWriterConfig; } @@ -621,6 +620,7 @@ public class ClientContext implements AccumuloClient { } public Map<NamespaceId,String> getNamespaceIdToNameMap() { + ensureOpen(); return Namespaces.getIdToNameMap(this); } @@ -694,7 +694,7 @@ public class ClientContext implements AccumuloClient { throws TableNotFoundException { ensureOpen(); Integer numQueryThreads = - ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getProperties()); + ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getClientProperties()); Objects.requireNonNull(numQueryThreads); return createBatchScanner(tableName, authorizations, numQueryThreads); } @@ -702,6 +702,7 @@ public class ClientContext implements AccumuloClient { @Override public BatchScanner createBatchScanner(String tableName) throws TableNotFoundException, AccumuloSecurityException, AccumuloException { + ensureOpen(); Authorizations auths = securityOperations().getUserAuthorizations(getPrincipal()); return createBatchScanner(tableName, auths); } @@ -718,7 +719,6 @@ public class ClientContext implements AccumuloClient { @Override public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { - ensureOpen(); return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig()); } @@ -773,7 +773,7 @@ public class ClientContext implements AccumuloClient { checkArgument(authorizations != null, "authorizations is null"); Scanner scanner = new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations); - Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties()); + Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getClientProperties()); if (batchSize != null) { scanner.setBatchSize(batchSize); } @@ -829,7 +829,7 @@ public class ClientContext implements AccumuloClient { public Properties properties() { ensureOpen(); Properties result = new Properties(); - getProperties().forEach((key, value) -> { + getClientProperties().forEach((key, value) -> { if (!key.equals(ClientProperty.AUTH_TOKEN.getKey())) { result.setProperty((String) key, (String) value); } @@ -844,23 +844,24 @@ public class ClientContext implements AccumuloClient { @Override public synchronized void close() { - closed = true; - if (zooKeeperOpened.get()) { - zooSession.get().close(); - } - if (thriftTransportPool != null) { - thriftTransportPool.shutdown(); - } - if (tableZooHelper != null) { - tableZooHelper.close(); - } - if (scannerReadaheadPool != null) { - scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down - } - if (cleanupThreadPool != null) { - cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute + if (closed.compareAndSet(false, true)) { + if (zooKeeperOpened.get()) { + zooSession.get().close(); + } + if (thriftTransportPool != null) { + thriftTransportPool.shutdown(); + } + if (tableZooHelper != null) { + tableZooHelper.close(); + } + if (scannerReadaheadPool != null) { + scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down + } + if (cleanupThreadPool != null) { + cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute + } + singletonReservation.close(); } - singletonReservation.close(); } public static class ClientBuilderImpl<T> @@ -896,7 +897,7 @@ public class ClientContext implements AccumuloClient { try { // ClientContext closes reservation unless a RuntimeException is thrown ClientInfo info = cbi.getClientInfo(); - AccumuloConfiguration config = ClientConfConverter.toAccumuloConf(info.getProperties()); + var config = ClientConfConverter.toAccumuloConf(info.getClientProperties()); return new ClientContext(reservation, info, config, cbi.getUncaughtExceptionHandler()); } catch (RuntimeException e) { reservation.close(); @@ -1080,8 +1081,7 @@ public class ClientContext implements AccumuloClient { } protected long getTransportPoolMaxAgeMillis() { - ensureOpen(); - return ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getProperties()); + return ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getClientProperties()); } public synchronized ThriftTransportPool getTransportPool() { @@ -1108,6 +1108,7 @@ public class ClientContext implements AccumuloClient { } public NamespaceMapping getNamespaces() { + ensureOpen(); return namespaces; } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java index cebe40ec2ad..cffcb499274 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java @@ -80,7 +80,7 @@ public interface ClientInfo { /** * @return All Accumulo client properties set for this connection */ - Properties getProperties(); + Properties getClientProperties(); /** * @return hadoop Configuration diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java index 04911060c3f..54c17c07cfd 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java @@ -101,7 +101,7 @@ public class ClientInfoImpl implements ClientInfo { } @Override - public Properties getProperties() { + public Properties getClientProperties() { Properties result = new Properties(); properties.forEach((key, value) -> result.setProperty((String) key, (String) value)); return result; diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java index 7de3600a920..0c0c48283b3 100644 --- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java +++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java @@ -144,7 +144,7 @@ public class StandaloneAccumuloCluster implements AccumuloCluster { @Override public Properties getClientProperties() { - return info.getProperties(); + return info.getClientProperties(); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java index 2f8241d3df1..cc6dfd0bfd7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java @@ -193,7 +193,7 @@ public class ServerInfo implements ClientInfo { } @Override - public Properties getProperties() { + public Properties getClientProperties() { Properties properties = ClientConfConverter.toProperties(getSiteConfiguration()); properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), getZooKeepers()); properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), diff --git a/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java b/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java index 46d3a7966b6..d01755d5c9e 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java +++ b/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java @@ -21,8 +21,6 @@ package org.apache.accumulo.server; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; -import java.util.Properties; - import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; @@ -43,7 +41,6 @@ public class MockServerContext { ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance()); conf.set(Property.INSTANCE_VOLUMES, "file:///"); expect(context.getConfiguration()).andReturn(conf).anyTimes(); - expect(context.getProperties()).andReturn(new Properties()).anyTimes(); return context; } diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 25c7003cf8e..f9a630a3f3b 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -35,7 +35,6 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.Map; -import java.util.Properties; import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface; import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor; @@ -70,7 +69,6 @@ public class TServerUtilsTest { expect(context.getZooSession()).andReturn(zk).anyTimes(); expect(zk.asReader()).andReturn(null).anyTimes(); expect(zk.asReaderWriter()).andReturn(null).anyTimes(); - expect(context.getProperties()).andReturn(new Properties()).anyTimes(); expect(context.getZooKeepers()).andReturn("").anyTimes(); expect(context.getInstanceName()).andReturn("instance").anyTimes(); expect(context.getZooKeepersSessionTimeOut()).andReturn(1).anyTimes(); diff --git a/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java b/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java index e9164c714db..5f5d37cfb3d 100644 --- a/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java @@ -79,7 +79,7 @@ public class SystemCredentialsIT extends ConfigurableMacBase { default: throw new RuntimeException("Incorrect usage; expected to be run by test only"); } - try (AccumuloClient client = Accumulo.newClient().from(context.getProperties()) + try (AccumuloClient client = Accumulo.newClient().from(context.properties()) .as(creds.getPrincipal(), creds.getToken()).build()) { client.securityOperations().authenticateUser(creds.getPrincipal(), creds.getToken()); try (Scanner scan =