This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new c5c6d5b588 Improvements and cleanup to ExternalCompaction ITs and related code (#3661) c5c6d5b588 is described below commit c5c6d5b588b98e1930fd21c3a66bbf2b2fe92f68 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Jul 27 08:38:52 2023 -0400 Improvements and cleanup to ExternalCompaction ITs and related code (#3661) Modified ExternalCompactionITs to remove the startup/shutdown of Compactors that are defined in the compaction services created by the MAC configuration callback. MAC now starts these Compactors based on the compaction service configuration. Made some modifications to ExternalCompactionITs to reduce the load on ZK as some calls where being made to find the compaction coordinator address inside of a loop, and that wasn't necessary. Made a modification to the CompactionCoordinator so that it won't try to update tablet metadata for a table that has been deleted. Made a modification to the Manager so that it only advertises the FATE and CompactionCoordinator Thrift service after the Manager has started up. --- .../java/org/apache/accumulo/manager/Manager.java | 11 ++- .../coordinator/CompactionCoordinator.java | 13 +++- .../compaction/ExternalCompactionProgressIT.java | 15 +++- .../compaction/ExternalCompactionTestUtils.java | 50 +++++++------ .../test/compaction/ExternalCompaction_1_IT.java | 34 ++------- .../test/compaction/ExternalCompaction_2_IT.java | 82 ++++++++++++---------- .../test/compaction/ExternalCompaction_3_IT.java | 36 +++++----- 7 files changed, 118 insertions(+), 123 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 09c673aef0..aa6ef53d19 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1362,8 +1362,8 @@ public class Manager extends AbstractServer String address = sa.address.toString(); UUID uuid = sld.getServerUUID(ThriftService.MANAGER); ServiceDescriptors descriptors = new ServiceDescriptors(); - for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, - ThriftService.COORDINATOR}) { + for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR, + ThriftService.FATE}) { descriptors.addService(new ServiceDescriptor(uuid, svc, address, this.getResourceGroup())); } @@ -1593,11 +1593,8 @@ public class Manager extends AbstractServer UUID zooLockUUID = UUID.randomUUID(); ServiceDescriptors descriptors = new ServiceDescriptors(); - for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, - ThriftService.COORDINATOR}) { - descriptors.addService( - new ServiceDescriptor(zooLockUUID, svc, managerClientAddress, this.getResourceGroup())); - } + descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.MANAGER, + managerClientAddress, this.getResourceGroup())); ServiceLockData sld = new ServiceLockData(descriptors); while (true) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 191f990a17..49a27ef7db 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -56,6 +56,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; @@ -1059,9 +1060,15 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { compactions.forEach((ecid, extent) -> { - tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid) - .requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid) - .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + try { + ctx.requireNotDeleted(extent.tableId()); + tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid) + .requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid) + .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + } catch (TableDeletedException e) { + LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.", + extent.tableId()); + } }); tabletsMutator.process().forEach((extent, result) -> { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index b1b3883f96..b77ec14a8e 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -31,6 +31,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.Accumulo; @@ -38,17 +39,22 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.net.HostAndPort; + /** * Tests that external compactions report progress from start to finish. To prevent flaky test * failures, we only measure progress in quarter segments: STARTED, QUARTER, HALF, THREE_QUARTERS. @@ -124,7 +130,14 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { * Check running compaction progress. */ private void checkRunning() throws TException { - var ecList = getRunningCompactions(getCluster().getServerContext()); + + ServerContext ctx = getCluster().getServerContext(); + Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + + var ecList = getRunningCompactions(ctx, coordinatorHost); var ecMap = ecList.getCompactions(); if (ecMap != null) { ecMap.forEach((ecid, ec) -> { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index a3be15838b..c9b0108278 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -233,13 +233,8 @@ public class ExternalCompactionTestUtils { coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } - public static TExternalCompactionList getRunningCompactions(ClientContext context) - throws TException { - Optional<HostAndPort> coordinatorHost = - ExternalCompactionUtil.findCompactionCoordinator(context); - if (coordinatorHost.isEmpty()) { - throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); - } + public static TExternalCompactionList getRunningCompactions(ClientContext context, + Optional<HostAndPort> coordinatorHost) throws TException { CompactionCoordinatorService.Client client = ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, coordinatorHost.orElseThrow(), context); try { @@ -251,13 +246,8 @@ public class ExternalCompactionTestUtils { } } - private static TExternalCompactionList getCompletedCompactions(ClientContext context) - throws Exception { - Optional<HostAndPort> coordinatorHost = - ExternalCompactionUtil.findCompactionCoordinator(context); - if (coordinatorHost.isEmpty()) { - throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); - } + private static TExternalCompactionList getCompletedCompactions(ClientContext context, + Optional<HostAndPort> coordinatorHost) throws Exception { CompactionCoordinatorService.Client client = ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, coordinatorHost.orElseThrow(), context); try { @@ -309,8 +299,13 @@ public class ExternalCompactionTestUtils { public static int confirmCompactionRunning(ServerContext ctx, Set<ExternalCompactionId> ecids) throws Exception { int matches = 0; + Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } while (matches == 0) { - TExternalCompactionList running = ExternalCompactionTestUtils.getRunningCompactions(ctx); + TExternalCompactionList running = + ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost); if (running.getCompactions() != null) { for (ExternalCompactionId ecid : ecids) { TExternalCompaction tec = running.getCompactions().get(ecid.canonical()); @@ -329,21 +324,24 @@ public class ExternalCompactionTestUtils { public static void confirmCompactionCompleted(ServerContext ctx, Set<ExternalCompactionId> ecids, TCompactionState expectedState) throws Exception { + Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + // The running compaction should be removed - TExternalCompactionList running = ExternalCompactionTestUtils.getRunningCompactions(ctx); - while (running.getCompactions() != null) { - running = ExternalCompactionTestUtils.getRunningCompactions(ctx); - if (running.getCompactions() == null) { - UtilWaitThread.sleep(250); - } + TExternalCompactionList running = + ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost); + while (running.getCompactions() != null && running.getCompactions().keySet().stream() + .anyMatch((e) -> ecids.contains(ExternalCompactionId.of(e)))) { + running = ExternalCompactionTestUtils.getRunningCompactions(ctx, coordinatorHost); } // The compaction should be in the completed list with the expected state - TExternalCompactionList completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx); + TExternalCompactionList completed = + ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost); while (completed.getCompactions() == null) { - completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx); - if (completed.getCompactions() == null) { - UtilWaitThread.sleep(50); - } + UtilWaitThread.sleep(50); + completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx, coordinatorHost); } for (ExternalCompactionId e : ecids) { TExternalCompaction tec = completed.getCompactions().get(e.canonical()); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 695c87b897..5085c310b4 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -22,7 +22,6 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP5; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; @@ -74,7 +73,6 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -95,13 +93,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { startMiniClusterWithConfig(new ExternalCompaction1Config()); } - @AfterEach - public void tearDown() throws Exception { - // The ExternalDoNothingCompactor needs to be restarted between tests - getCluster().getClusterControl().stop(ServerType.COMPACTOR); - getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups(); - } - public static class TestFilter extends Filter { int modulus = 1; @@ -159,10 +150,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { writeData(client, table1); writeData(client, table2); - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1, 1); - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP2, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR); - compact(client, table1, 2, GROUP1, true); verify(client, table1, 2); @@ -186,12 +173,7 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { writeData(client, table1); verify(client, table1, 1); - // ELASTICITY_TODO the compactors started by mini inspecting the config were interfering with - // starting the ExternalDoNothingCompactor, so killed all compactors. This is not the best way - // to handle this. getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP3, 1); getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, ExternalDoNothingCompactor.class); @@ -224,6 +206,10 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { } finally { // We stopped the TServer and started our own, restart the original TabletServers getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + // Restart the regular compactors + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + getCluster().getClusterControl().start(ServerType.COMPACTOR); + } } @@ -238,10 +224,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { writeData(client, table1); - // ELASTICITY_TODO there is already one compactor started by mini based on config - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP4, 2); - getCluster().getClusterControl().start(ServerType.COMPACTOR); - compact(client, table1, 3, GROUP4, true); verify(client, table1, 3); @@ -252,9 +234,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { public void testConfigurer() throws Exception { String tableName = this.getUniqueNames(1)[0]; - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP5, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR); - try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@ -328,8 +307,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { Accumulo.newClient().from(getCluster().getClientProperties()).build()) { createTable(client, table1, "cs6"); writeData(client, table1); - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP6, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR); compact(client, table1, 2, GROUP6, true); verify(client, table1, 2); @@ -368,9 +345,6 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { try (final AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP8, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR); - createTable(client, tableName, "cs8"); writeData(client, tableName); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java index c692e7fc86..4d1cf1b730 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java @@ -21,7 +21,6 @@ package org.apache.accumulo.test.compaction; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP5; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted; @@ -31,16 +30,22 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.ro import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Collections; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -51,11 +56,12 @@ import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; public class ExternalCompaction_2_IT extends SharedMiniClusterBase { @@ -69,22 +75,14 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { @BeforeAll public static void beforeTests() throws Exception { startMiniClusterWithConfig(new ExternalCompaction2Config()); - } - - @AfterEach - public void tearDown() throws Exception { - // The ExternalDoNothingCompactor needs to be restarted between tests getCluster().getClusterControl().stop(ServerType.COMPACTOR); - getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups(); + getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, + ExternalDoNothingCompactor.class); } @Test public void testSplitCancelsExternalCompaction() throws Exception { - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, - ExternalDoNothingCompactor.class); - String table1 = this.getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@ -133,10 +131,6 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { @Test public void testUserCompactionCancellation() throws Exception { - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP3, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, - ExternalDoNothingCompactor.class); - String table1 = this.getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@ -170,10 +164,6 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { @Test public void testDeleteTableCancelsUserExternalCompaction() throws Exception { - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP4, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, - ExternalDoNothingCompactor.class); - String table1 = this.getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@ -202,45 +192,59 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { @Test public void testDeleteTableCancelsExternalCompaction() throws Exception { - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP5, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, - ExternalDoNothingCompactor.class); - String table1 = this.getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { createTable(client, table1, "cs5"); + + ServerContext ctx = getCluster().getServerContext(); + TableId tid = ctx.getTableId(table1); + // set compaction ratio to 1 so that majc occurs naturally, not user compaction // user compaction blocks delete client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); - // cause multiple rfiles to be created - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - writeData(client, table1); - - TableId tid = getCluster().getServerContext().getTableId(table1); + AtomicReference<Throwable> error = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + Runnable r = () -> { + try { + // cause multiple rfiles to be created + latch.countDown(); + writeData(client, table1); + writeData(client, table1); + writeData(client, table1); + writeData(client, table1); + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + error.set(e); + } + }; + Thread t = new Thread(r); + t.start(); + latch.await(); // Wait for the compaction to start by waiting for 1 external compaction column - Set<ExternalCompactionId> ecids = ExternalCompactionTestUtils - .waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid); + Set<ExternalCompactionId> ecids = + ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids(ctx, tid); // Confirm that this ECID shows up in RUNNING set - int matches = ExternalCompactionTestUtils - .confirmCompactionRunning(getCluster().getServerContext(), ecids); + int matches = ExternalCompactionTestUtils.confirmCompactionRunning(ctx, ecids); assertTrue(matches > 0); client.tableOperations().delete(table1); - confirmCompactionCompleted(getCluster().getServerContext(), ecids, - TCompactionState.CANCELLED); + LoggerFactory.getLogger(getClass()).debug("Table deleted."); + + confirmCompactionCompleted(ctx, ecids, TCompactionState.CANCELLED); + + LoggerFactory.getLogger(getClass()).debug("Confirmed compaction cancelled."); - TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets().forTable(tid) - .fetch(ColumnType.ECOMP).build(); + TabletsMetadata tm = + ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.PREV_ROW).build(); assertEquals(0, tm.stream().count()); tm.close(); + t.join(); + assertNull(error.get()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java index d3de651332..52da17e009 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test.compaction; -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted; @@ -32,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collections; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -47,17 +47,22 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; -import org.junit.jupiter.api.AfterEach; +import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import com.google.common.net.HostAndPort; + public class ExternalCompaction_3_IT extends SharedMiniClusterBase { public static class ExternalCompaction3Config implements MiniClusterConfigurationCallback { @@ -70,22 +75,15 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { @BeforeAll public static void beforeTests() throws Exception { startMiniClusterWithConfig(new ExternalCompaction3Config()); - } - - @AfterEach - public void tearDown() throws Exception { - // The ExternalDoNothingCompactor needs to be restarted between tests getCluster().getClusterControl().stop(ServerType.COMPACTOR); - getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups(); + getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, + ExternalDoNothingCompactor.class); } @Test + @Disabled // ELASTICITY_TODO: Merges are broken currently public void testMergeCancelsExternalCompaction() throws Exception { - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, - ExternalDoNothingCompactor.class); - String table1 = this.getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@ -143,10 +141,6 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { @Test public void testCoordinatorRestartsDuringCompaction() throws Exception { - getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP2, 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, - ExternalDoNothingCompactor.class); - String table1 = this.getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { @@ -167,13 +161,21 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase { // Restart the Manager while the compaction is running getCluster().getClusterControl().start(ServerType.MANAGER); + ServerContext ctx = getCluster().getServerContext(); + // Confirm compaction is still running int matches = 0; while (matches == 0) { TExternalCompactionList running = null; while (running == null) { try { - running = getRunningCompactions(getCluster().getServerContext()); + Optional<HostAndPort> coordinatorHost = + ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException( + "Unable to get CompactionCoordinator address from ZooKeeper"); + } + running = getRunningCompactions(ctx, coordinatorHost); } catch (TException t) { running = null; Thread.sleep(2000);