This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 0647dfa359c2eb5b8b70bc8e1dd2e1c13f63f6fe Merge: 40dc910caa dd2d40ff41 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Thu Nov 28 03:38:06 2024 -0500 Merge branch '3.1' .../core/clientImpl/RootClientTabletCacheTest.java | 23 +++---- .../core/clientImpl/ZookeeperLockCheckerTest.java | 28 ++++++--- .../MiniAccumuloClusterExistingZooKeepersTest.java | 7 ++- .../org/apache/accumulo/server/ServerContext.java | 3 +- .../org/apache/accumulo/server/ServerInfo.java | 3 +- .../accumulo/server/init/ZooKeeperInitializer.java | 4 +- .../server/security/handler/ZKPermHandler.java | 7 +-- .../server/security/handler/ZKSecurityTool.java | 5 -- .../accumulo/server/tables/TableManager.java | 11 ++-- .../server/tablets/UniqueNameAllocator.java | 2 +- .../apache/accumulo/server/util/ChangeSecret.java | 5 +- .../apache/accumulo/server/util/ListInstances.java | 3 +- .../apache/accumulo/server/util/ZooKeeperMain.java | 2 +- .../apache/accumulo/server/MockServerContext.java | 8 ++- .../conf/ServerConfigurationFactoryTest.java | 3 +- .../server/conf/store/PropStoreKeyTest.java | 46 +++++++------- .../server/conf/store/impl/PropStoreEventTest.java | 3 +- .../server/conf/store/impl/ZooPropStoreTest.java | 3 +- .../ZooAuthenticationKeyWatcherTest.java | 3 +- .../security/handler/ZKAuthenticatorTest.java | 13 ++-- .../org/apache/accumulo/server/util/AdminTest.java | 73 ++++++++++++---------- .../accumulo/server/util/ServiceStatusCmdTest.java | 3 +- .../java/org/apache/accumulo/manager/Manager.java | 25 +++----- .../manager/ManagerClientServiceHandler.java | 4 +- .../org/apache/accumulo/manager/ManagerTime.java | 2 +- .../accumulo/manager/recovery/RecoveryManager.java | 12 ++-- .../manager/tableOps/compact/CompactionDriver.java | 2 +- .../manager/tableOps/delete/PreDeleteTable.java | 6 +- .../create/PopulateZookeeperWithNamespace.java | 4 +- .../tableOps/namespace/rename/RenameNamespace.java | 4 +- .../manager/tableOps/rename/RenameTable.java | 4 +- .../accumulo/manager/upgrade/Upgrader11to12.java | 2 +- .../tableOps/compact/CompactionDriverTest.java | 39 ++++++------ .../manager/upgrade/Upgrader11to12Test.java | 47 +++++++------- .../org/apache/accumulo/tserver/tablet/Tablet.java | 4 +- .../org/apache/accumulo/test/ImportExportIT.java | 7 ++- .../org/apache/accumulo/test/VolumeManagerIT.java | 16 +++-- .../apache/accumulo/test/fate/meta/MetaFateIT.java | 5 +- .../test/fate/meta/MetaFateInterleavingIT.java | 5 +- .../test/fate/meta/MetaFateStoreFateIT.java | 5 +- .../accumulo/test/fate/meta/ZooMutatorIT.java | 4 +- .../accumulo/test/functional/CloneTestIT.java | 11 ++-- .../test/functional/GarbageCollectorIT.java | 5 +- .../apache/accumulo/test/functional/TableIT.java | 10 ++- .../apache/accumulo/test/lock/ServiceLockIT.java | 3 +- 45 files changed, 271 insertions(+), 213 deletions(-) diff --cc core/src/test/java/org/apache/accumulo/core/clientImpl/RootClientTabletCacheTest.java index 178b2e7ec4,0fa52ec2c0..37412f0e21 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/RootClientTabletCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/RootClientTabletCacheTest.java @@@ -19,44 -19,55 +19,41 @@@ package org.apache.accumulo.core.clientImpl; import static org.easymock.EasyMock.createMock; - import static org.easymock.EasyMock.createStrictMock; --import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; -import java.util.UUID; +import java.util.List; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.clientImpl.TabletLocatorImpl.TabletServerLockChecker; -import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker; - import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.metadata.RootTable; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class RootTabletLocatorTest { +public class RootClientTabletCacheTest { + private ClientContext context; private TabletServerLockChecker lockChecker; -- private ZooCache zc; - private RootClientTabletCache rtl; @BeforeEach public void setUp() { - var instanceId = InstanceId.of(UUID.randomUUID()); - zc = createMock(ZooCache.class); context = createMock(ClientContext.class); - expect(context.getZooKeeperRoot()).andReturn("/accumulo/iid").anyTimes(); - zc = createStrictMock(ZooCache.class); - expect(context.getZooKeeperRoot()).andReturn(ZooUtil.getRoot(instanceId)).anyTimes(); -- expect(context.getZooCache()).andReturn(zc).anyTimes(); - replay(context); lockChecker = createMock(TabletServerLockChecker.class); - rtl = new RootClientTabletCache(lockChecker); - replay(context, zc, lockChecker); ++ replay(context, lockChecker); + } + + @AfterEach + public void tearDown() { - verify(context, zc, lockChecker); ++ verify(context, lockChecker); } @Test - public void testInvalidateCache_Server() { - var rtl = new RootTabletLocator(lockChecker); - - verify(zc); - reset(zc); - zc.clear(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/server"); - expectLastCall().once(); - replay(zc); - + public void testInvalidateCache_Noop() { - replay(zc); - // its not expected that any of the validate functions will interact w/ zoocache ++ var rtl = new RootClientTabletCache(lockChecker); ++ // it's not expected that any of the validate functions will do anything with the mock objects rtl.invalidateCache(context, "server"); + rtl.invalidateCache(RootTable.EXTENT); + rtl.invalidateCache(); + rtl.invalidateCache(List.of(RootTable.EXTENT)); - verify(zc); } } diff --cc core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java index 9629c69ffc,991e4d2dba..217303b2d9 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java @@@ -23,11 -22,16 +23,16 @@@ import static org.easymock.EasyMock.cre import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; + import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; + import java.util.UUID; +import java.util.function.Predicate; -import org.apache.accumulo.core.Constants; + import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; + import org.apache.accumulo.core.fate.zookeeper.ZooUtil; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@@ -48,12 -57,14 +58,16 @@@ public class ZookeeperLockCheckerTest @Test public void testInvalidateCache() { + var zklc = new ZookeeperLockChecker(context); + + verify(zc); + reset(zc); - zc.clear(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/server"); + @SuppressWarnings("unchecked") + Predicate<String> anyObj = anyObject(Predicate.class); + zc.clear(anyObj); - expectLastCall(); + expectLastCall().once(); replay(zc); + zklc.invalidateCache("server"); - verify(zc); } } diff --cc server/base/src/main/java/org/apache/accumulo/server/ServerContext.java index b9735a47bd,a20b5c62bb..fc0dd5f373 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java @@@ -53,7 -52,7 +53,8 @@@ import org.apache.accumulo.core.data.Na import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; + import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.SslConnectionParams; diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java index fb9cc7ca2b,24e7b218b7..af98fd0d20 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@@ -166,8 -166,7 +167,8 @@@ public class ListInstances } try { - var zLockManagerPath = ServiceLock.path(ZooUtil.getRoot(iid) + Constants.ZMANAGER_LOCK); + var zLockManagerPath = ServiceLockPaths.parse(Optional.of(Constants.ZMANAGER_LOCK), - Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK); ++ ZooUtil.getRoot(iid) + Constants.ZMANAGER_LOCK); Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, zLockManagerPath, null); if (sld.isEmpty()) { return null; diff --cc server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java index 6c9053b0a6,ca67be50fb..6ffdfb5437 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java @@@ -72,10 -73,9 +73,10 @@@ public class PropStoreEventTest expect(context.getZooKeepersSessionTimeOut()).andReturn(500).anyTimes(); expect(context.getInstanceID()).andReturn(instanceId).anyTimes(); - expect(zrw.exists(eq("/accumulo/" + instanceId), anyObject())).andReturn(true).anyTimes(); + expect(zrw.exists(eq(ZooUtil.getRoot(instanceId)), anyObject())).andReturn(true).anyTimes(); readyMonitor = createMock(ReadyMonitor.class); + } @AfterEach diff --cc server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java index d861bf3781,18a9045402..694541105a --- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java @@@ -18,46 -18,24 +18,54 @@@ */ package org.apache.accumulo.server.util; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; + import static org.easymock.EasyMock.anyObject; + import static org.easymock.EasyMock.createMock; + import static org.easymock.EasyMock.eq; + import static org.easymock.EasyMock.expect; + import static org.easymock.EasyMock.getCurrentArguments; + import static org.easymock.EasyMock.replay; + import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; + import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.KeeperException; +import org.easymock.EasyMock; import org.junit.jupiter.api.Test; public class AdminTest { @@@ -78,168 -56,50 +86,169 @@@ } @Test - public void testQualifySessionId() { + public void testQualifySessionId() throws KeeperException, InterruptedException { - ClientContext ctx = EasyMock.createMock(ClientContext.class); - ZooCache zc = EasyMock.createMock(ZooCache.class); ++ ClientContext ctx = createMock(ClientContext.class); + ZooCache zc = createMock(ZooCache.class); + InstanceId instanceId = InstanceId.of(UUID.randomUUID()); - String root = "/accumulo/id"; - String root = ZooUtil.getRoot(instanceId) + Constants.ZTSERVERS; ++ String root = ZooUtil.getRoot(instanceId); + String type = root + Constants.ZTSERVERS; + String group = type + "/" + Constants.DEFAULT_RESOURCE_GROUP_NAME; String server = "localhost:12345"; final long session = 123456789L; + ServiceLockData sld1 = new ServiceLockData(UUID.randomUUID(), server, ThriftService.TABLET_SCAN, + Constants.DEFAULT_RESOURCE_GROUP_NAME); - String serverPath = root + "/" + server; + String serverPath = group + "/" + server; String validZLockEphemeralNode = "zlock#" + UUID.randomUUID() + "#0000000000"; - EasyMock.expect(ctx.getZooKeeperRoot()).andReturn("/accumulo/id").anyTimes(); - EasyMock.expect(ctx.getZooCache()).andReturn(zc).anyTimes(); - EasyMock.expect(zc.getChildren(type)).andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME)) - expect(zc.getChildren(serverPath)) - .andReturn(Collections.singletonList(validZLockEphemeralNode)); ++ expect(ctx.getZooKeeperRoot()).andReturn(root).anyTimes(); ++ expect(ctx.getZooCache()).andReturn(zc).anyTimes(); ++ expect(zc.getChildren(type)).andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME)) ++ .anyTimes(); ++ expect(zc.getChildren(group)).andReturn(List.of(server)).anyTimes(); ++ expect(zc.getChildren(serverPath)).andReturn(Collections.singletonList(validZLockEphemeralNode)) + .anyTimes(); - EasyMock.expect(zc.getChildren(group)).andReturn(List.of(server)).anyTimes(); - EasyMock.expect(zc.getChildren(serverPath)) - .andReturn(Collections.singletonList(validZLockEphemeralNode)).anyTimes(); - EasyMock.expect( - zc.get(EasyMock.eq(serverPath + "/" + validZLockEphemeralNode), EasyMock.isA(ZcStat.class))) ++ expect(zc.get(eq(serverPath + "/" + validZLockEphemeralNode), EasyMock.isA(ZcStat.class))) + .andReturn(sld1.serialize()).once(); - EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/" + validZLockEphemeralNode), - EasyMock.anyObject(ZcStat.class))).andAnswer(() -> { - ZcStat stat = (ZcStat) EasyMock.getCurrentArguments()[1]; + expect(zc.get(eq(serverPath + "/" + validZLockEphemeralNode), anyObject(ZcStat.class))) + .andAnswer(() -> { + ZcStat stat = (ZcStat) getCurrentArguments()[1]; stat.setEphemeralOwner(session); return new byte[0]; }); - EasyMock.expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); - EasyMock.replay(ctx, zc); - - replay(zc); ++ expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); ++ replay(ctx, zc); assertEquals(server + "[" + Long.toHexString(session) + "]", - Admin.qualifyWithZooKeeperSessionId(root, zc, server)); + Admin.qualifyWithZooKeeperSessionId(ctx, zc, server)); - EasyMock.verify(ctx, zc); - verify(zc); ++ verify(ctx, zc); } @Test - public void testCannotQualifySessionId() { + public void testCannotQualifySessionId() throws KeeperException, InterruptedException { - ClientContext ctx = EasyMock.createMock(ClientContext.class); - ZooCache zc = EasyMock.createMock(ZooCache.class); ++ ClientContext ctx = createMock(ClientContext.class); + ZooCache zc = createMock(ZooCache.class); + InstanceId instanceId = InstanceId.of(UUID.randomUUID()); - String root = "/accumulo/id"; - String root = ZooUtil.getRoot(instanceId) + Constants.ZTSERVERS; ++ String root = ZooUtil.getRoot(instanceId); + String type = root + Constants.ZTSERVERS; + String group = type + "/" + Constants.DEFAULT_RESOURCE_GROUP_NAME; String server = "localhost:12345"; - String serverPath = root + "/" + server; + String serverPath = group + "/" + server; - EasyMock.expect(ctx.getZooKeeperRoot()).andReturn("/accumulo/id").anyTimes(); - EasyMock.expect(ctx.getZooCache()).andReturn(zc).anyTimes(); - EasyMock.expect(zc.getChildren(type)).andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME)); - EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.emptyList()); - EasyMock.expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); - EasyMock.replay(ctx, zc); ++ expect(ctx.getZooKeeperRoot()).andReturn(root).anyTimes(); ++ expect(ctx.getZooCache()).andReturn(zc).anyTimes(); ++ expect(zc.getChildren(type)).andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME)); + expect(zc.getChildren(serverPath)).andReturn(Collections.emptyList()); - - replay(zc); ++ expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); ++ replay(ctx, zc); // A server that isn't in ZooKeeper. Can't qualify it, should return the original - assertEquals(server, Admin.qualifyWithZooKeeperSessionId(root, zc, server)); + assertEquals(server, Admin.qualifyWithZooKeeperSessionId(ctx, zc, server)); - EasyMock.verify(ctx, zc); - verify(zc); ++ verify(ctx, zc); } + @Test + public void testDanglingFate() { + KeyExtent[] extents = new KeyExtent[10]; + for (int i = 0; i < extents.length; i++) { + extents[i] = new KeyExtent(TableId.of("" + i), null, null); + } + + FateId[] fateIds = new FateId[10]; + for (int i = 0; i < fateIds.length; i++) { + fateIds[i] = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + } + + var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, fateIds[0]); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateIds[1]); + var opid3 = TabletOperationId.from(TabletOperationType.MERGING, fateIds[5]); + + var files = Set.of(StoredTabletFile.of(new Path("file:///accumulo/tables/4/t-1/f4.rf"))); + var sf1 = + new SelectedFiles(files, true, fateIds[6], SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); + var sf2 = + new SelectedFiles(files, true, fateIds[7], SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); + + var tm1 = TabletMetadata.builder(extents[0]).putOperation(opid1).build(LOADED, SELECTED); + var tm2 = TabletMetadata.builder(extents[1]).putOperation(opid2).build(LOADED, SELECTED); + var tm3 = TabletMetadata.builder(extents[2]) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/1/t-1/f1.rf")), + fateIds[2]) + .build(OPID, SELECTED); + var tm4 = TabletMetadata.builder(extents[3]) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/2/t-1/f2.rf")), + fateIds[3]) + .build(OPID, SELECTED); + var tm5 = TabletMetadata.builder(extents[4]) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/3/t-1/f3.rf")), + fateIds[4]) + .putOperation(opid3).build(SELECTED); + var tm6 = TabletMetadata.builder(extents[5]).putSelectedFiles(sf1).build(OPID, LOADED); + var tm7 = TabletMetadata.builder(extents[6]).putSelectedFiles(sf2).build(OPID, LOADED); + + var tablets1 = Map.of(tm1.getExtent(), tm1, tm2.getExtent(), tm2, tm3.getExtent(), tm3, + tm4.getExtent(), tm4, tm5.getExtent(), tm5, tm6.getExtent(), tm6, tm7.getExtent(), tm7); + var tablets2 = new HashMap<>(tablets1); + var found = new HashMap<KeyExtent,Set<FateId>>(); + Function<Collection<KeyExtent>,Map<KeyExtent,TabletMetadata>> tabletLookup = lookups -> { + var results = new HashMap<KeyExtent,TabletMetadata>(); + lookups.forEach(extent -> { + assertTrue(tablets1.containsKey(extent)); + if (tablets2.containsKey(extent)) { + results.put(extent, tablets2.get(extent)); + } + }); + return results; + }; + + // run test where every fate id is considered inactive + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, fateId -> false, found::put, + 3); + assertEquals(Map.of(tm1.getExtent(), Set.of(fateIds[0]), tm2.getExtent(), Set.of(fateIds[1]), + tm3.getExtent(), Set.of(fateIds[2]), tm4.getExtent(), Set.of(fateIds[3]), tm5.getExtent(), + Set.of(fateIds[4], fateIds[5]), tm6.getExtent(), Set.of(fateIds[6]), tm7.getExtent(), + Set.of(fateIds[7])), found); + + // run test where some of the fate ids are active + Set<FateId> active = Set.of(fateIds[0], fateIds[2], fateIds[4], fateIds[6]); + found.clear(); + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, active::contains, found::put, + 3); + assertEquals(Map.of(tm2.getExtent(), Set.of(fateIds[1]), tm4.getExtent(), Set.of(fateIds[3]), + tm5.getExtent(), Set.of(fateIds[5]), tm7.getExtent(), Set.of(fateIds[7])), found); + + // run test where tablets change on 2nd read simulating race condition + var tm2_1 = TabletMetadata.builder(tm2.getExtent()).build(OPID, LOADED, SELECTED); + var tm4_1 = TabletMetadata.builder(tm4.getExtent()) + .putBulkFile(ReferencedTabletFile.of(new Path("file:///accumulo/tables/2/t-1/f2.rf")), + fateIds[8]) + .build(OPID, SELECTED); + tablets2.put(tm2_1.getExtent(), tm2_1); + tablets2.put(tm4_1.getExtent(), tm4_1); + tablets2.remove(tm5.getExtent()); + found.clear(); + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, active::contains, found::put, + 3); + assertEquals(Map.of(tm7.getExtent(), Set.of(fateIds[7])), found); + found.clear(); + + // run a test where all are active on second look + var tm7_1 = TabletMetadata.builder(tm7.getExtent()).putSelectedFiles(sf1).build(OPID, LOADED); + tablets2.put(tm7_1.getExtent(), tm7_1); + Admin.findDanglingFateOperations(tablets1.values(), tabletLookup, active::contains, found::put, + 3); + assertEquals(Map.of(), found); + + // run a test where all active on the first look + active = Arrays.stream(fateIds).collect(Collectors.toSet()); + found.clear(); + Admin.findDanglingFateOperations(tablets1.values(), le -> { + assertTrue(le.isEmpty()); + return Map.of(); + }, active::contains, found::put, 3); + assertEquals(Map.of(), found); + } } diff --cc server/base/src/test/java/org/apache/accumulo/server/util/ServiceStatusCmdTest.java index f375400ba1,5a8bb11b77..344e09a538 --- a/server/base/src/test/java/org/apache/accumulo/server/util/ServiceStatusCmdTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/ServiceStatusCmdTest.java @@@ -37,10 -37,8 +37,11 @@@ import java.util.UUID import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReader; + import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLockPaths; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.serviceStatus.ServiceStatusReport; import org.apache.accumulo.server.util.serviceStatus.StatusSummary; diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index aee0bcb0d7,ea5bb097dc..bf787607f0 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -67,17 -63,11 +67,16 @@@ import org.apache.accumulo.core.clientI import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; - import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.Fate; -import org.apache.accumulo.core.fate.TStore; +import org.apache.accumulo.core.fate.FateCleaner; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; @@@ -1251,20 -1338,18 +1242,20 @@@ public class Manager extends AbstractSe throw new IllegalStateException("Upgrade coordinator is unexpectedly not complete"); } try { - final AgeOffStore<Manager> store = - new AgeOffStore<>( - new org.apache.accumulo.core.fate.ZooStore<>( - context.getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter()), - HOURS.toMillis(8), System::currentTimeMillis); - - Fate<Manager> f = initializeFateInstance(store, getConfiguration()); - fateRef.set(f); + Predicate<ZooUtil.LockID> isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); - var metaInstance = - initializeFateInstance(context, new MetaFateStore<>(getZooKeeperRoot() + Constants.ZFATE, ++ var metaInstance = initializeFateInstance(context, ++ new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, + context.getZooReaderWriter(), managerLock.getLockID(), isLockHeld)); + var userInstance = initializeFateInstance(context, new UserFateStore<>(context, + AccumuloTable.FATE.tableName(), managerLock.getLockID(), isLockHeld)); + + if (!fateRefs.compareAndSet(null, + Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userInstance))) { + throw new IllegalStateException( + "Unexpected previous fate reference map already initialized"); + } fateReadyLatch.countDown(); - - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(store::ageOff, 63000, 63000, MILLISECONDS)); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception setting up FaTE cleanup thread", e); } diff --cc server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java index caf8be89d5,8cfd2d0c63..11ca402c88 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java @@@ -18,6 -18,11 +18,10 @@@ */ package org.apache.accumulo.manager.tableOps.compact; -import static java.nio.charset.StandardCharsets.UTF_8; + import static org.easymock.EasyMock.createMock; + import static org.easymock.EasyMock.expect; + import static org.easymock.EasyMock.replay; + import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@@ -36,8 -39,6 +40,7 @@@ import org.apache.accumulo.core.fate.zo import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable; import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; - import org.easymock.EasyMock; import org.junit.jupiter.api.Test; public class CompactionDriverTest { @@@ -86,20 -54,24 +89,20 @@@ final byte[] startRow = new byte[0]; final byte[] endRow = new byte[0]; - Manager manager = EasyMock.createNiceMock(Manager.class); - ServerContext ctx = EasyMock.createNiceMock(ServerContext.class); - ZooReaderWriter zrw = EasyMock.createNiceMock(ZooReaderWriter.class); - EasyMock.expect(manager.getInstanceID()).andReturn(instance).anyTimes(); - EasyMock.expect(manager.getContext()).andReturn(ctx); - EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw); + Manager manager = createMock(Manager.class); + ServerContext ctx = createMock(ServerContext.class); + ZooReaderWriter zrw = createMock(ZooReaderWriter.class); + expect(ctx.getInstanceID()).andReturn(instance).anyTimes(); + expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes(); + expect(manager.getContext()).andReturn(ctx).anyTimes(); - EasyMock.replay(manager, ctx, zrw); - final String zCancelID = CompactionDriver.createCompactionCancellationPath(instance, tableId); - expect(zrw.getData(zCancelID)).andReturn(Long.toString(cancelId).getBytes(UTF_8)); - + replay(manager, ctx, zrw); - final CompactionDriver driver = - new CompactionDriver(compactId, namespaceId, tableId, startRow, endRow); - final long tableIdLong = Long.parseLong(tableId.toString()); + final CancelledCompactionDriver driver = + new CancelledCompactionDriver(namespaceId, tableId, startRow, endRow); var e = assertThrows(AcceptableThriftTableOperationException.class, - () -> driver.isReady(tableIdLong, manager)); + () -> driver.isReady(compactionFateId, manager)); assertEquals(e.getTableId(), tableId.toString()); assertEquals(e.getOp(), TableOperation.COMPACT); @@@ -119,23 -92,27 +122,23 @@@ final byte[] startRow = new byte[0]; final byte[] endRow = new byte[0]; - Manager manager = EasyMock.createNiceMock(Manager.class); - ServerContext ctx = EasyMock.createNiceMock(ServerContext.class); - ZooReaderWriter zrw = EasyMock.createNiceMock(ZooReaderWriter.class); - EasyMock.expect(manager.getInstanceID()).andReturn(instance).anyTimes(); - EasyMock.expect(manager.getContext()).andReturn(ctx); - EasyMock.expect(ctx.getZooReaderWriter()).andReturn(zrw); + Manager manager = createMock(Manager.class); + ServerContext ctx = createMock(ServerContext.class); + ZooReaderWriter zrw = createMock(ZooReaderWriter.class); + expect(ctx.getInstanceID()).andReturn(instance).anyTimes(); + expect(ctx.getZooReaderWriter()).andReturn(zrw).anyTimes(); + expect(manager.getContext()).andReturn(ctx).anyTimes(); - final String zCancelID = CompactionDriver.createCompactionCancellationPath(instance, tableId); - expect(zrw.getData(zCancelID)).andReturn(Long.toString(cancelId).getBytes(UTF_8)); - String deleteMarkerPath = PreDeleteTable.createDeleteMarkerPath(instance, tableId); - EasyMock.expect(zrw.exists(deleteMarkerPath)).andReturn(true); + expect(zrw.exists(deleteMarkerPath)).andReturn(true); - EasyMock.replay(manager, ctx, zrw); + replay(manager, ctx, zrw); - final CompactionDriver driver = - new CompactionDriver(compactId, namespaceId, tableId, startRow, endRow); - final long tableIdLong = Long.parseLong(tableId.toString()); + final NotCancelledCompactionDriver driver = + new NotCancelledCompactionDriver(namespaceId, tableId, startRow, endRow); var e = assertThrows(AcceptableThriftTableOperationException.class, - () -> driver.isReady(tableIdLong, manager)); + () -> driver.isReady(compactionFateId, manager)); assertEquals(e.getTableId(), tableId.toString()); assertEquals(e.getOp(), TableOperation.COMPACT); diff --cc test/src/main/java/org/apache/accumulo/test/ImportExportIT.java index 26088f071d,efb3bbbf31..f37a2080f4 --- a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java @@@ -52,7 -47,7 +52,8 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.ImportConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; + import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; diff --cc test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index c5f541b5e9,0000000000..a96dc11e71 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@@ -1,113 -1,0 +1,116 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.meta; + +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; ++import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; ++import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateIT; +import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; + +@Tag(ZOOKEEPER_TESTING_SERVER) +public class MetaFateIT extends FateIT { + + private static ZooKeeperTestingServer szk = null; + private static ZooReaderWriter zk = null; - private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID(); ++ private static final InstanceId IID = InstanceId.of(UUID.randomUUID()); ++ private static final String ZK_ROOT = ZooUtil.getRoot(IID); + + @TempDir + private static File tempDir; + + @BeforeAll + public static void setup() throws Exception { + szk = new ZooKeeperTestingServer(tempDir); + zk = szk.getZooReaderWriter(); + zk.mkdirs(ZK_ROOT + Constants.ZFATE); + zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS); + } + + @AfterAll + public static void teardown() throws Exception { + szk.close(); + } + + @Override + public void executeTest(FateTestExecutor<TestEnv> testMethod, int maxDeferred, + FateIdGenerator fateIdGenerator) throws Exception { + ServerContext sctx = createMock(ServerContext.class); + expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); + expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); + replay(sctx); + + testMethod.execute(new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, createDummyLockID(), null, + maxDeferred, fateIdGenerator), sctx); + } + + @Override + protected TStatus getTxStatus(ServerContext sctx, FateId fateId) { + try { + return getTxStatus(sctx.getZooReaderWriter(), fateId); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + + /* + * Get the status of the TX from ZK directly. Unable to call MetaFateStore.getStatus because this + * test thread does not have the reservation (the FaTE thread does) + */ + private static TStatus getTxStatus(ZooReaderWriter zrw, FateId fateId) + throws KeeperException, InterruptedException { + zrw.sync(ZK_ROOT); + String txdir = String.format("%s%s/tx_%s", ZK_ROOT, Constants.ZFATE, fateId.getTxUUIDStr()); + + try (DataInputBuffer buffer = new DataInputBuffer()) { + var serialized = zrw.getData(txdir); + buffer.reset(serialized, serialized.length); + return TStatus.valueOf(buffer.readUTF()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (KeeperException.NoNodeException e) { + return TStatus.UNKNOWN; + } + + } + +} diff --cc test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java index d306e0bfef,0000000000..af95d02b02 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@@ -1,46 -1,0 +1,49 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.meta; + +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; + +import java.util.UUID; + +import org.apache.accumulo.core.Constants; ++import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; ++import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateInterleavingIT; + +public class MetaFateInterleavingIT extends FateInterleavingIT { + + // put the fate data for the test in a different location than what accumulo is using - private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID(); ++ private static final InstanceId IID = InstanceId.of(UUID.randomUUID()); ++ private static final String ZK_ROOT = ZooUtil.getRoot(IID); + + @Override + public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred, + AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { + ServerContext sctx = getCluster().getServerContext(); + String path = ZK_ROOT + Constants.ZFATE; + ZooReaderWriter zk = sctx.getZooReaderWriter(); + zk.mkdirs(ZK_ROOT); + testMethod.execute(new MetaFateStore<>(path, zk, createDummyLockID(), null), sctx); + } +} diff --cc test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index af8b98db0f,0000000000..fb9e9c7d75 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@@ -1,144 -1,0 +1,147 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.meta; + +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Optional; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; ++import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; ++import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateStoreIT; +import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; + +@Tag(ZOOKEEPER_TESTING_SERVER) +public class MetaFateStoreFateIT extends FateStoreIT { + + private static ZooKeeperTestingServer szk = null; + private static ZooReaderWriter zk = null; - private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID(); ++ private static final InstanceId IID = InstanceId.of(UUID.randomUUID()); ++ private static final String ZK_ROOT = ZooUtil.getRoot(IID); + + @TempDir + private static File tempDir; + + @BeforeAll + public static void setup() throws Exception { + szk = new ZooKeeperTestingServer(tempDir); + zk = szk.getZooReaderWriter(); + zk.mkdirs(ZK_ROOT + Constants.ZFATE); + zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS); + } + + @AfterAll + public static void teardown() throws Exception { + szk.close(); + } + + @Override + public void executeTest(FateTestExecutor<TestEnv> testMethod, int maxDeferred, + FateIdGenerator fateIdGenerator) throws Exception { + ServerContext sctx = createMock(ServerContext.class); + expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); + expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); + replay(sctx); + MetaFateStore<TestEnv> store = new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, + createDummyLockID(), null, maxDeferred, fateIdGenerator); + + // Check that the store has no transactions before and after each test + assertEquals(0, store.list().count()); + testMethod.execute(store, sctx); + assertEquals(0, store.list().count()); + } + + @Override + protected void deleteKey(FateId fateId, ServerContext sctx) { + try { + // We have to use reflection since the NodeValue is internal to the store + + // Grab both the constructors that use the serialized bytes and status, reservation + Class<?> nodeClass = Class.forName(MetaFateStore.class.getName() + "$NodeValue"); + Constructor<?> statusReservationCons = + nodeClass.getDeclaredConstructor(TStatus.class, FateStore.FateReservation.class); + Constructor<?> serializedCons = nodeClass.getDeclaredConstructor(byte[].class); + statusReservationCons.setAccessible(true); + serializedCons.setAccessible(true); + + // Get the status and reservation fields so they can be read and get the serialize method + Field nodeStatus = nodeClass.getDeclaredField("status"); + Field nodeReservation = nodeClass.getDeclaredField("reservation"); + Method nodeSerialize = nodeClass.getDeclaredMethod("serialize"); + nodeStatus.setAccessible(true); + nodeReservation.setAccessible(true); + nodeSerialize.setAccessible(true); + + // Get the existing status and reservation for the node and build a new node with an empty key + // but uses the existing tid + String txPath = ZK_ROOT + Constants.ZFATE + "/tx_" + fateId.getTxUUIDStr(); + Object currentNode = serializedCons.newInstance(new Object[] {zk.getData(txPath)}); + TStatus currentStatus = (TStatus) nodeStatus.get(currentNode); + Optional<FateStore.FateReservation> currentReservation = + getCurrentReservation(nodeReservation, currentNode); + // replace the node with no key and just a tid and existing status and reservation + Object newNode = + statusReservationCons.newInstance(currentStatus, currentReservation.orElse(null)); + + // Replace the transaction with the same status and reservation but no key + zk.putPersistentData(txPath, (byte[]) nodeSerialize.invoke(newNode), + NodeExistsPolicy.OVERWRITE); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private Optional<FateStore.FateReservation> getCurrentReservation(Field nodeReservation, + Object currentNode) throws Exception { + Object currentResAsObject = nodeReservation.get(currentNode); + Optional<FateStore.FateReservation> currentReservation = Optional.empty(); + if (currentResAsObject instanceof Optional) { + Optional<?> currentResAsOptional = (Optional<?>) currentResAsObject; + if (currentResAsOptional.isPresent() + && currentResAsOptional.orElseThrow() instanceof FateStore.FateReservation) { + currentReservation = + Optional.of((FateStore.FateReservation) currentResAsOptional.orElseThrow()); + } + } + return currentReservation; + } +}