This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f6a37412c8579c534eda0a34265d450d9843e3d9 Merge: 69a62fa550 69584e2835 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Thu Jun 6 18:13:21 2024 +0000 Merge branch 'main' into elasticity .../org/apache/accumulo/server/rpc/TServerUtils.java | 18 ++++++++++++------ .../org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +- .../apache/accumulo/test/functional/ZombieTServer.java | 2 +- .../apache/accumulo/test/performance/NullTserver.java | 2 +- 4 files changed, 15 insertions(+), 9 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 16bfc441ca,c52c5a74f0..8e8d5b3677 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@@ -320,81 -339,31 +320,81 @@@ public class NullTserver TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), - context.getMetricsInfo(), HostAndPort.fromParts("0.0.0.0", opts.port)); + context.getMetricsInfo(), false, HostAndPort.fromParts("0.0.0.0", opts.port)); - HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); + AccumuloLockWatcher miniLockWatcher = new AccumuloLockWatcher() { - TableId tableId = context.getTableId(opts.tableName); + @Override + public void lostLock(LockLossReason reason) { + LOG.warn("Lost lock: " + reason.toString()); + } - // read the locations for the table - Range tableRange = new KeyExtent(tableId, null, null).toMetaRange(); - List<Assignment> assignments = new ArrayList<>(); - try ( - var s = new MetaDataTableScanner(context, tableRange, AccumuloTable.METADATA.tableName())) { - long randomSessionID = opts.port; - TServerInstance instance = new TServerInstance(addr, randomSessionID); + @Override + public void unableToMonitorLockNode(Exception e) { + LOG.warn("Unable to monitor lock: " + e.getMessage()); + } - while (s.hasNext()) { - TabletLocationState next = s.next(); - assignments.add(new Assignment(next.extent, instance, next.last)); + @Override + public void acquiredLock() { + LOG.debug("Acquired ZooKeeper lock for NullTserver"); } - } - // point them to this server - TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context); - store.setLocations(assignments); - while (true) { - Thread.sleep(SECONDS.toMillis(10)); + @Override + public void failedToAcquireLock(Exception e) { + LOG.warn("Failed to acquire ZK lock for NullTserver, msg: " + e.getMessage()); + } + }; + + ServiceLock miniLock = null; + try { + ZooKeeper zk = context.getZooReaderWriter().getZooKeeper(); + UUID nullTServerUUID = UUID.randomUUID(); + String miniZDirPath = context.getZooKeeperRoot() + "/mini"; + String miniZInstancePath = miniZDirPath + "/" + nullTServerUUID.toString(); + try { + context.getZooReaderWriter().putPersistentData(miniZDirPath, new byte[0], + ZooUtil.NodeExistsPolicy.SKIP); + context.getZooReaderWriter().putPersistentData(miniZInstancePath, new byte[0], + ZooUtil.NodeExistsPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); + } + ServiceLockPath path = ServiceLock.path(miniZInstancePath); + ServiceLockData sld = new ServiceLockData(nullTServerUUID, "localhost", ThriftService.TSERV, + Constants.DEFAULT_RESOURCE_GROUP_NAME); + miniLock = new ServiceLock(zk, path, UUID.randomUUID()); + miniLock.lock(miniLockWatcher, sld); + context.setServiceLock(miniLock); + HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); + + TableId tableId = context.getTableId(opts.tableName); + + // read the locations for the table + Range tableRange = new KeyExtent(tableId, null, null).toMetaRange(); + List<Assignment> assignments = new ArrayList<>(); + try (var tablets = context.getAmple().readTablets().forLevel(DataLevel.USER).build()) { + long randomSessionID = opts.port; + TServerInstance instance = new TServerInstance(addr, randomSessionID); + var s = tablets.iterator(); + + while (s.hasNext()) { + TabletMetadata next = s.next(); + assignments.add(new Assignment(next.getExtent(), instance, next.getLast())); + } + } + // point them to this server + final ServiceLock lock = miniLock; + TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context); + store.setLocations(assignments); + + while (true) { + Thread.sleep(SECONDS.toMillis(10)); + } + + } finally { + if (miniLock != null) { + miniLock.unlock(); + } } } }