This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new e1038d4c27c Improvements to ClientContext for ensureOpen (#5258)
e1038d4c27c is described below
commit e1038d4c27c96f8e7747877af564e46c3e682f89
Author: Christopher Tubbs <[email protected]>
AuthorDate: Wed Jan 22 15:30:49 2025 -0500
Improvements to ClientContext for ensureOpen (#5258)
These changes are small quality fixes to ensure that
ClientContext.ensureOpen is used when it is needed, and not used when it
isn't. This fixes an issue seen where the client RPC timeout value is
being retrieved from a supplier in a thread pool when returning RPC
transports after a client is closed. In these cases, ensureOpen does not
need to be checked. However, there were a few context API methods where
it was not checked but should have been.
Also, improved the close method to ensure close activities are only
called at most once, and made private and renamed an internal method to
get the client properties from the ClientInfo object, so it's more clear
which properties the method is returning and isn't exposed for misuse.
---
.../accumulo/core/clientImpl/ClientContext.java | 69 +++++++++++-----------
.../accumulo/core/clientImpl/ClientInfo.java | 2 +-
.../accumulo/core/clientImpl/ClientInfoImpl.java | 2 +-
.../standalone/StandaloneAccumuloCluster.java | 2 +-
.../org/apache/accumulo/server/ServerInfo.java | 2 +-
.../apache/accumulo/server/MockServerContext.java | 3 -
.../accumulo/server/rpc/TServerUtilsTest.java | 2 -
.../test/server/security/SystemCredentialsIT.java | 2 +-
8 files changed, 40 insertions(+), 44 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 702936aee92..f8bcda14589 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -142,7 +142,7 @@ public class ClientContext implements AccumuloClient {
private ThriftTransportPool thriftTransportPool;
private ZookeeperLockChecker zkLockChecker;
- private volatile boolean closed = false;
+ private final AtomicBoolean closed = new AtomicBoolean();
private SecurityOperations secops = null;
private final TableOperationsImpl tableops;
@@ -157,22 +157,21 @@ public class ClientContext implements AccumuloClient {
private final Supplier<ZooSession> zooSession;
private void ensureOpen() {
- if (closed) {
+ if (closed.get()) {
throw new IllegalStateException("This client was closed.");
}
}
private ScanServerSelector createScanServerSelector() {
- String clazz =
ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
+ String clazz =
ClientProperty.SCAN_SERVER_SELECTOR.getValue(getClientProperties());
try {
Class<? extends ScanServerSelector> impl =
Class.forName(clazz).asSubclass(ScanServerSelector.class);
ScanServerSelector scanServerSelector =
impl.getDeclaredConstructor().newInstance();
Map<String,String> sserverProps = new HashMap<>();
- ClientProperty
- .getPrefix(info.getProperties(),
ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey())
- .forEach((k, v) -> {
+ ClientProperty.getPrefix(getClientProperties(),
+
ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey()).forEach((k, v) -> {
sserverProps.put(
k.toString()
.substring(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey().length()),
@@ -311,9 +310,8 @@ public class ClientContext implements AccumuloClient {
return getCredentials().getToken();
}
- public Properties getProperties() {
- ensureOpen();
- return info.getProperties();
+ private Properties getClientProperties() {
+ return info.getClientProperties();
}
/**
@@ -396,7 +394,7 @@ public class ClientContext implements AccumuloClient {
public synchronized BatchWriterConfig getBatchWriterConfig() {
ensureOpen();
if (batchWriterConfig == null) {
- batchWriterConfig = getBatchWriterConfig(info.getProperties());
+ batchWriterConfig = getBatchWriterConfig(getClientProperties());
}
return batchWriterConfig;
}
@@ -405,6 +403,7 @@ public class ClientContext implements AccumuloClient {
* @return map of live scan server addresses to lock uuids.
*/
public Map<String,Pair<UUID,String>> getScanServers() {
+ ensureOpen();
Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>();
String root = this.getZooKeeperRoot() + Constants.ZSSERVERS;
var addrs = this.getZooCache().getChildren(root);
@@ -455,7 +454,7 @@ public class ClientContext implements AccumuloClient {
public synchronized ConditionalWriterConfig getConditionalWriterConfig() {
ensureOpen();
if (conditionalWriterConfig == null) {
- conditionalWriterConfig =
getConditionalWriterConfig(info.getProperties());
+ conditionalWriterConfig =
getConditionalWriterConfig(getClientProperties());
}
return conditionalWriterConfig;
}
@@ -621,6 +620,7 @@ public class ClientContext implements AccumuloClient {
}
public Map<NamespaceId,String> getNamespaceIdToNameMap() {
+ ensureOpen();
return Namespaces.getIdToNameMap(this);
}
@@ -694,7 +694,7 @@ public class ClientContext implements AccumuloClient {
throws TableNotFoundException {
ensureOpen();
Integer numQueryThreads =
-
ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getProperties());
+
ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getClientProperties());
Objects.requireNonNull(numQueryThreads);
return createBatchScanner(tableName, authorizations, numQueryThreads);
}
@@ -702,6 +702,7 @@ public class ClientContext implements AccumuloClient {
@Override
public BatchScanner createBatchScanner(String tableName)
throws TableNotFoundException, AccumuloSecurityException,
AccumuloException {
+ ensureOpen();
Authorizations auths =
securityOperations().getUserAuthorizations(getPrincipal());
return createBatchScanner(tableName, auths);
}
@@ -718,7 +719,6 @@ public class ClientContext implements AccumuloClient {
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations
authorizations,
int numQueryThreads) throws TableNotFoundException {
- ensureOpen();
return createBatchDeleter(tableName, authorizations, numQueryThreads, new
BatchWriterConfig());
}
@@ -773,7 +773,7 @@ public class ClientContext implements AccumuloClient {
checkArgument(authorizations != null, "authorizations is null");
Scanner scanner =
new ScannerImpl(this, requireNotOffline(getTableId(tableName),
tableName), authorizations);
- Integer batchSize =
ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
+ Integer batchSize =
ClientProperty.SCANNER_BATCH_SIZE.getInteger(getClientProperties());
if (batchSize != null) {
scanner.setBatchSize(batchSize);
}
@@ -829,7 +829,7 @@ public class ClientContext implements AccumuloClient {
public Properties properties() {
ensureOpen();
Properties result = new Properties();
- getProperties().forEach((key, value) -> {
+ getClientProperties().forEach((key, value) -> {
if (!key.equals(ClientProperty.AUTH_TOKEN.getKey())) {
result.setProperty((String) key, (String) value);
}
@@ -844,23 +844,24 @@ public class ClientContext implements AccumuloClient {
@Override
public synchronized void close() {
- closed = true;
- if (zooKeeperOpened.get()) {
- zooSession.get().close();
- }
- if (thriftTransportPool != null) {
- thriftTransportPool.shutdown();
- }
- if (tableZooHelper != null) {
- tableZooHelper.close();
- }
- if (scannerReadaheadPool != null) {
- scannerReadaheadPool.shutdownNow(); // abort all tasks, client is
shutting down
- }
- if (cleanupThreadPool != null) {
- cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute
+ if (closed.compareAndSet(false, true)) {
+ if (zooKeeperOpened.get()) {
+ zooSession.get().close();
+ }
+ if (thriftTransportPool != null) {
+ thriftTransportPool.shutdown();
+ }
+ if (tableZooHelper != null) {
+ tableZooHelper.close();
+ }
+ if (scannerReadaheadPool != null) {
+ scannerReadaheadPool.shutdownNow(); // abort all tasks, client is
shutting down
+ }
+ if (cleanupThreadPool != null) {
+ cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute
+ }
+ singletonReservation.close();
}
- singletonReservation.close();
}
public static class ClientBuilderImpl<T>
@@ -896,7 +897,7 @@ public class ClientContext implements AccumuloClient {
try {
// ClientContext closes reservation unless a RuntimeException is thrown
ClientInfo info = cbi.getClientInfo();
- AccumuloConfiguration config =
ClientConfConverter.toAccumuloConf(info.getProperties());
+ var config =
ClientConfConverter.toAccumuloConf(info.getClientProperties());
return new ClientContext(reservation, info, config,
cbi.getUncaughtExceptionHandler());
} catch (RuntimeException e) {
reservation.close();
@@ -1080,8 +1081,7 @@ public class ClientContext implements AccumuloClient {
}
protected long getTransportPoolMaxAgeMillis() {
- ensureOpen();
- return
ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getProperties());
+ return
ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getClientProperties());
}
public synchronized ThriftTransportPool getTransportPool() {
@@ -1108,6 +1108,7 @@ public class ClientContext implements AccumuloClient {
}
public NamespaceMapping getNamespaces() {
+ ensureOpen();
return namespaces;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java
index cebe40ec2ad..cffcb499274 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfo.java
@@ -80,7 +80,7 @@ public interface ClientInfo {
/**
* @return All Accumulo client properties set for this connection
*/
- Properties getProperties();
+ Properties getClientProperties();
/**
* @return hadoop Configuration
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java
index 04911060c3f..54c17c07cfd 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientInfoImpl.java
@@ -101,7 +101,7 @@ public class ClientInfoImpl implements ClientInfo {
}
@Override
- public Properties getProperties() {
+ public Properties getClientProperties() {
Properties result = new Properties();
properties.forEach((key, value) -> result.setProperty((String) key,
(String) value));
return result;
diff --git
a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
index 7de3600a920..0c0c48283b3 100644
---
a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
+++
b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
@@ -144,7 +144,7 @@ public class StandaloneAccumuloCluster implements
AccumuloCluster {
@Override
public Properties getClientProperties() {
- return info.getProperties();
+ return info.getClientProperties();
}
@Override
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java
b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java
index 2f8241d3df1..cc6dfd0bfd7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java
@@ -193,7 +193,7 @@ public class ServerInfo implements ClientInfo {
}
@Override
- public Properties getProperties() {
+ public Properties getClientProperties() {
Properties properties =
ClientConfConverter.toProperties(getSiteConfiguration());
properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(),
getZooKeepers());
properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(),
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java
b/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java
index 46d3a7966b6..d01755d5c9e 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java
@@ -21,8 +21,6 @@ package org.apache.accumulo.server;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
-import java.util.Properties;
-
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -43,7 +41,6 @@ public class MockServerContext {
ConfigurationCopy conf = new
ConfigurationCopy(DefaultConfiguration.getInstance());
conf.set(Property.INSTANCE_VOLUMES, "file:///");
expect(context.getConfiguration()).andReturn(conf).anyTimes();
- expect(context.getProperties()).andReturn(new Properties()).anyTimes();
return context;
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
index 25c7003cf8e..f9a630a3f3b 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
@@ -35,7 +35,6 @@ import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Map;
-import java.util.Properties;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor;
@@ -70,7 +69,6 @@ public class TServerUtilsTest {
expect(context.getZooSession()).andReturn(zk).anyTimes();
expect(zk.asReader()).andReturn(null).anyTimes();
expect(zk.asReaderWriter()).andReturn(null).anyTimes();
- expect(context.getProperties()).andReturn(new Properties()).anyTimes();
expect(context.getZooKeepers()).andReturn("").anyTimes();
expect(context.getInstanceName()).andReturn("instance").anyTimes();
expect(context.getZooKeepersSessionTimeOut()).andReturn(1).anyTimes();
diff --git
a/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java
b/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java
index e9164c714db..5f5d37cfb3d 100644
---
a/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/server/security/SystemCredentialsIT.java
@@ -79,7 +79,7 @@ public class SystemCredentialsIT extends ConfigurableMacBase {
default:
throw new RuntimeException("Incorrect usage; expected to be run by
test only");
}
- try (AccumuloClient client =
Accumulo.newClient().from(context.getProperties())
+ try (AccumuloClient client =
Accumulo.newClient().from(context.properties())
.as(creds.getPrincipal(), creds.getToken()).build()) {
client.securityOperations().authenticateUser(creds.getPrincipal(),
creds.getToken());
try (Scanner scan =