This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new a5947214ba MiniAccumuloCluster improvements (#2624) a5947214ba is described below commit a5947214bac05f2407a58d20ef730a94d1809e2a Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Tue Apr 12 17:47:55 2022 -0400 MiniAccumuloCluster improvements (#2624) * Use a memoized Supplier to lazily load client properties and server context and remove now unnecessary synchronization for these * Make a protective copy of the client properties when returning in the public API method, getClientProperties(), and update javadoc * Make fields final where possible * Relocate constructors to just under the fields, prior to other methods, for easier class navigation This fixes #2619 --- .../accumulo/minicluster/MiniAccumuloCluster.java | 2 +- .../miniclusterImpl/MiniAccumuloClusterImpl.java | 313 +++++++++++---------- 2 files changed, 158 insertions(+), 157 deletions(-) diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java index 7b968ab55c..aa5bd5b29f 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java @@ -162,7 +162,7 @@ public class MiniAccumuloCluster implements AutoCloseable { } /** - * @return Connection properties for cluster + * @return A copy of the connection properties for the cluster * @since 2.0.0 */ public Properties getClientProperties() { diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 82df42a47a..a54f980b31 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -51,7 +51,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.accumulo.cluster.AccumuloCluster; @@ -107,6 +109,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.base.Suppliers; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -123,23 +126,150 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class MiniAccumuloClusterImpl implements AccumuloCluster { private static final Logger log = LoggerFactory.getLogger(MiniAccumuloClusterImpl.class); + private final Set<Pair<ServerType,Integer>> debugPorts = new HashSet<>(); + private final File zooCfgFile; + private final String dfsUri; + private final MiniAccumuloConfigImpl config; + private final Supplier<Properties> clientProperties; + private final SiteConfiguration siteConfig; + private final Supplier<ServerContext> context; + private final AtomicReference<MiniDFSCluster> miniDFS = new AtomicReference<>(); + private final List<Process> cleanup = new ArrayList<>(); + private final MiniAccumuloClusterControl clusterControl; + private boolean initialized = false; + private ExecutorService executor; - private Set<Pair<ServerType,Integer>> debugPorts = new HashSet<>(); + /** + * + * @param dir + * An empty or nonexistent temp directory that Accumulo and Zookeeper can store data in. + * Creating the directory is left to the user. Java 7, Guava, and Junit provide methods + * for creating temporary directories. + * @param rootPassword + * Initial root password for instance. + */ + public MiniAccumuloClusterImpl(File dir, String rootPassword) throws IOException { + this(new MiniAccumuloConfigImpl(dir, rootPassword)); + } - private File zooCfgFile; - private String dfsUri; - private SiteConfiguration siteConfig; - private ServerContext context; - private Properties clientProperties; + /** + * @param config + * initial configuration + */ + @SuppressWarnings("deprecation") + public MiniAccumuloClusterImpl(MiniAccumuloConfigImpl config) throws IOException { - private MiniAccumuloConfigImpl config; - private MiniDFSCluster miniDFS = null; - private List<Process> cleanup = new ArrayList<>(); + this.config = config.initialize(); + this.clientProperties = Suppliers.memoize( + () -> Accumulo.newClientProperties().from(config.getClientPropsFile().toPath()).build()); - private ExecutorService executor; + if (Boolean.valueOf(config.getSiteConfig().get(Property.TSERV_NATIVEMAP_ENABLED.getKey())) + && config.getNativeLibPaths().length == 0 + && !config.getSystemProperties().containsKey("accumulo.native.lib.path")) { + throw new RuntimeException( + "MAC configured to use native maps, but native library path was not provided."); + } + + mkdirs(config.getConfDir()); + mkdirs(config.getLogDir()); + mkdirs(config.getLibDir()); + mkdirs(config.getLibExtDir()); - private MiniAccumuloClusterControl clusterControl; + if (!config.useExistingInstance()) { + if (!config.useExistingZooKeepers()) { + mkdirs(config.getZooKeeperDir()); + } + mkdirs(config.getAccumuloDir()); + } + + if (config.useMiniDFS()) { + File nn = new File(config.getAccumuloDir(), "nn"); + mkdirs(nn); + File dn = new File(config.getAccumuloDir(), "dn"); + mkdirs(dn); + File dfs = new File(config.getAccumuloDir(), "dfs"); + mkdirs(dfs); + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nn.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dn.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "1"); + conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, "1"); + conf.set("dfs.support.append", "true"); + conf.set("dfs.datanode.synconclose", "true"); + conf.set("dfs.datanode.data.dir.perm", MiniDFSUtil.computeDatanodeDirectoryPermission()); + String oldTestBuildData = System.setProperty("test.build.data", dfs.getAbsolutePath()); + miniDFS.set(new MiniDFSCluster.Builder(conf).build()); + if (oldTestBuildData == null) { + System.clearProperty("test.build.data"); + } else { + System.setProperty("test.build.data", oldTestBuildData); + } + miniDFS.get().waitClusterUp(); + InetSocketAddress dfsAddress = miniDFS.get().getNameNode().getNameNodeAddress(); + dfsUri = "hdfs://" + dfsAddress.getHostName() + ":" + dfsAddress.getPort(); + File coreFile = new File(config.getConfDir(), "core-site.xml"); + writeConfig(coreFile, Collections.singletonMap("fs.default.name", dfsUri).entrySet()); + File hdfsFile = new File(config.getConfDir(), "hdfs-site.xml"); + writeConfig(hdfsFile, conf); + + Map<String,String> siteConfig = config.getSiteConfig(); + siteConfig.put(Property.INSTANCE_VOLUMES.getKey(), dfsUri + "/accumulo"); + config.setSiteConfig(siteConfig); + } else if (config.useExistingInstance()) { + dfsUri = config.getHadoopConfiguration().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY); + } else { + dfsUri = "file:///"; + } + + File clientConfFile = config.getClientConfFile(); + // Write only the properties that correspond to ClientConfiguration properties + writeConfigProperties(clientConfFile, + Maps.filterEntries(config.getSiteConfig(), + v -> org.apache.accumulo.core.client.ClientConfiguration.ClientProperty + .getPropertyByKey(v.getKey()) != null)); + + Map<String,String> clientProps = config.getClientProps(); + clientProps.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), config.getZooKeepers()); + clientProps.put(ClientProperty.INSTANCE_NAME.getKey(), config.getInstanceName()); + if (!clientProps.containsKey(ClientProperty.AUTH_TYPE.getKey())) { + clientProps.put(ClientProperty.AUTH_TYPE.getKey(), "password"); + clientProps.put(ClientProperty.AUTH_PRINCIPAL.getKey(), config.getRootUserName()); + clientProps.put(ClientProperty.AUTH_TOKEN.getKey(), config.getRootPassword()); + } + + File clientPropsFile = config.getClientPropsFile(); + writeConfigProperties(clientPropsFile, clientProps); + + File siteFile = new File(config.getConfDir(), "accumulo.properties"); + writeConfigProperties(siteFile, config.getSiteConfig()); + this.siteConfig = SiteConfiguration.fromFile(siteFile).build(); + this.context = Suppliers.memoize(() -> new ServerContext(siteConfig)); + + if (!config.useExistingInstance() && !config.useExistingZooKeepers()) { + zooCfgFile = new File(config.getConfDir(), "zoo.cfg"); + FileWriter fileWriter = new FileWriter(zooCfgFile, UTF_8); + + // zookeeper uses Properties to read its config, so use that to write in order to properly + // escape things like Windows paths + Properties zooCfg = new Properties(); + zooCfg.setProperty("tickTime", "2000"); + zooCfg.setProperty("initLimit", "10"); + zooCfg.setProperty("syncLimit", "5"); + zooCfg.setProperty("clientPortAddress", "127.0.0.1"); + zooCfg.setProperty("clientPort", config.getZooKeeperPort() + ""); + zooCfg.setProperty("maxClientCnxns", "1000"); + zooCfg.setProperty("dataDir", config.getZooKeeperDir().getAbsolutePath()); + zooCfg.setProperty("4lw.commands.whitelist", "ruok,wchs"); + zooCfg.setProperty("admin.enableServer", "false"); + zooCfg.store(fileWriter, null); + + fileWriter.close(); + } else { + zooCfgFile = null; + } + clusterControl = new MiniAccumuloClusterControl(this); + } File getZooCfgFile() { return zooCfgFile; @@ -308,132 +438,6 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { return _exec(clazz, jvmOpts, args); } - /** - * - * @param dir - * An empty or nonexistent temp directory that Accumulo and Zookeeper can store data in. - * Creating the directory is left to the user. Java 7, Guava, and Junit provide methods - * for creating temporary directories. - * @param rootPassword - * Initial root password for instance. - */ - public MiniAccumuloClusterImpl(File dir, String rootPassword) throws IOException { - this(new MiniAccumuloConfigImpl(dir, rootPassword)); - } - - /** - * @param config - * initial configuration - */ - @SuppressWarnings("deprecation") - public MiniAccumuloClusterImpl(MiniAccumuloConfigImpl config) throws IOException { - - this.config = config.initialize(); - - if (Boolean.valueOf(config.getSiteConfig().get(Property.TSERV_NATIVEMAP_ENABLED.getKey())) - && config.getNativeLibPaths().length == 0 - && !config.getSystemProperties().containsKey("accumulo.native.lib.path")) { - throw new RuntimeException( - "MAC configured to use native maps, but native library path was not provided."); - } - - mkdirs(config.getConfDir()); - mkdirs(config.getLogDir()); - mkdirs(config.getLibDir()); - mkdirs(config.getLibExtDir()); - - if (!config.useExistingInstance()) { - if (!config.useExistingZooKeepers()) { - mkdirs(config.getZooKeeperDir()); - } - mkdirs(config.getAccumuloDir()); - } - - if (config.useMiniDFS()) { - File nn = new File(config.getAccumuloDir(), "nn"); - mkdirs(nn); - File dn = new File(config.getAccumuloDir(), "dn"); - mkdirs(dn); - File dfs = new File(config.getAccumuloDir(), "dfs"); - mkdirs(dfs); - Configuration conf = new Configuration(); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nn.getAbsolutePath()); - conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dn.getAbsolutePath()); - conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "1"); - conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, "1"); - conf.set("dfs.support.append", "true"); - conf.set("dfs.datanode.synconclose", "true"); - conf.set("dfs.datanode.data.dir.perm", MiniDFSUtil.computeDatanodeDirectoryPermission()); - String oldTestBuildData = System.setProperty("test.build.data", dfs.getAbsolutePath()); - miniDFS = new MiniDFSCluster.Builder(conf).build(); - if (oldTestBuildData == null) { - System.clearProperty("test.build.data"); - } else { - System.setProperty("test.build.data", oldTestBuildData); - } - miniDFS.waitClusterUp(); - InetSocketAddress dfsAddress = miniDFS.getNameNode().getNameNodeAddress(); - dfsUri = "hdfs://" + dfsAddress.getHostName() + ":" + dfsAddress.getPort(); - File coreFile = new File(config.getConfDir(), "core-site.xml"); - writeConfig(coreFile, Collections.singletonMap("fs.default.name", dfsUri).entrySet()); - File hdfsFile = new File(config.getConfDir(), "hdfs-site.xml"); - writeConfig(hdfsFile, conf); - - Map<String,String> siteConfig = config.getSiteConfig(); - siteConfig.put(Property.INSTANCE_VOLUMES.getKey(), dfsUri + "/accumulo"); - config.setSiteConfig(siteConfig); - } else if (config.useExistingInstance()) { - dfsUri = config.getHadoopConfiguration().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY); - } else { - dfsUri = "file:///"; - } - - File clientConfFile = config.getClientConfFile(); - // Write only the properties that correspond to ClientConfiguration properties - writeConfigProperties(clientConfFile, - Maps.filterEntries(config.getSiteConfig(), - v -> org.apache.accumulo.core.client.ClientConfiguration.ClientProperty - .getPropertyByKey(v.getKey()) != null)); - - Map<String,String> clientProps = config.getClientProps(); - clientProps.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), config.getZooKeepers()); - clientProps.put(ClientProperty.INSTANCE_NAME.getKey(), config.getInstanceName()); - if (!clientProps.containsKey(ClientProperty.AUTH_TYPE.getKey())) { - clientProps.put(ClientProperty.AUTH_TYPE.getKey(), "password"); - clientProps.put(ClientProperty.AUTH_PRINCIPAL.getKey(), config.getRootUserName()); - clientProps.put(ClientProperty.AUTH_TOKEN.getKey(), config.getRootPassword()); - } - - File clientPropsFile = config.getClientPropsFile(); - writeConfigProperties(clientPropsFile, clientProps); - - File siteFile = new File(config.getConfDir(), "accumulo.properties"); - writeConfigProperties(siteFile, config.getSiteConfig()); - siteConfig = SiteConfiguration.fromFile(siteFile).build(); - - if (!config.useExistingInstance() && !config.useExistingZooKeepers()) { - zooCfgFile = new File(config.getConfDir(), "zoo.cfg"); - FileWriter fileWriter = new FileWriter(zooCfgFile, UTF_8); - - // zookeeper uses Properties to read its config, so use that to write in order to properly - // escape things like Windows paths - Properties zooCfg = new Properties(); - zooCfg.setProperty("tickTime", "2000"); - zooCfg.setProperty("initLimit", "10"); - zooCfg.setProperty("syncLimit", "5"); - zooCfg.setProperty("clientPortAddress", "127.0.0.1"); - zooCfg.setProperty("clientPort", config.getZooKeeperPort() + ""); - zooCfg.setProperty("maxClientCnxns", "1000"); - zooCfg.setProperty("dataDir", config.getZooKeeperDir().getAbsolutePath()); - zooCfg.setProperty("4lw.commands.whitelist", "ruok,wchs"); - zooCfg.setProperty("admin.enableServer", "false"); - zooCfg.store(fileWriter, null); - - fileWriter.close(); - } - clusterControl = new MiniAccumuloClusterControl(this); - } - private static void mkdirs(File dir) { if (!dir.mkdirs()) { log.warn("Unable to create {}", dir); @@ -471,7 +475,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { justification = "insecure socket used for reservation") @Override public synchronized void start() throws IOException, InterruptedException { - if (config.useMiniDFS() && miniDFS == null) { + if (config.useMiniDFS() && miniDFS.get() == null) { throw new IllegalStateException("Cannot restart mini when using miniDFS"); } @@ -763,11 +767,8 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { } @Override - public synchronized ServerContext getServerContext() { - if (context == null) { - context = new ServerContext(siteConfig); - } - return context; + public ServerContext getServerContext() { + return context.get(); } /** @@ -803,14 +804,15 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { executor = null; } - if (config.useMiniDFS() && miniDFS != null) { - miniDFS.shutdown(); + var miniDFSActual = miniDFS.get(); + if (config.useMiniDFS() && miniDFSActual != null) { + miniDFSActual.shutdown(); } for (Process p : cleanup) { p.destroy(); p.waitFor(); } - miniDFS = null; + miniDFS.set(null); } /** @@ -822,7 +824,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { @Override public AccumuloClient createAccumuloClient(String user, AuthenticationToken token) { - return Accumulo.newClient().from(getClientProperties()).as(user, token).build(); + return Accumulo.newClient().from(clientProperties.get()).as(user, token).build(); } @SuppressWarnings("deprecation") @@ -833,12 +835,11 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { } @Override - public synchronized Properties getClientProperties() { - if (clientProperties == null) { - clientProperties = - Accumulo.newClientProperties().from(config.getClientPropsFile().toPath()).build(); - } - return clientProperties; + public Properties getClientProperties() { + // return a copy, without re-reading the file + var copy = new Properties(); + copy.putAll(clientProperties.get()); + return copy; } @Override @@ -881,7 +882,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { */ public ManagerMonitorInfo getManagerMonitorInfo() throws AccumuloException, AccumuloSecurityException { - try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + try (AccumuloClient c = Accumulo.newClient().from(clientProperties.get()).build()) { while (true) { ManagerClientService.Iface client = null; try { @@ -904,8 +905,8 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { } } - public synchronized MiniDFSCluster getMiniDfs() { - return this.miniDFS; + public MiniDFSCluster getMiniDfs() { + return this.miniDFS.get(); } @Override