This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 509a86574eb299433923bbce1fabe1ca00ca4dc8 Merge: 51b439018f f05b78ee79 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Oct 17 17:06:32 2024 +0000 Merge branch '3.1' .../org/apache/accumulo/core/client/admin/ActiveCompaction.java | 1 + .../org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java | 7 ++++--- .../org/apache/accumulo/shell/commands/ActiveCompactionHelper.java | 2 ++ .../java/org/apache/accumulo/test/functional/CompactionIT.java | 1 + 4 files changed, 8 insertions(+), 3 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java index c6c89e6f47,5dc7343ed2..af591d2b38 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java @@@ -25,8 -25,6 +25,7 @@@ import java.util.Map import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveCompaction; - import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost.Type; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; @@@ -42,16 -41,15 +42,17 @@@ public class ActiveCompactionImpl exten private final org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac; private final ClientContext context; private final HostAndPort hostport; - private final Type type; + private final CompactionHost.Type type; + private final String resourceGroup; ActiveCompactionImpl(ClientContext context, - org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac, HostAndPort hostport, - CompactionHost.Type type) { + org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac, ServerId server) { this.tac = tac; this.context = context; - this.hostport = hostport; - this.type = type; + this.hostport = HostAndPort.fromParts(server.getHost(), server.getPort()); - this.type = server.getType() == ServerId.Type.COMPACTOR ? Type.COMPACTOR : Type.TSERVER; ++ this.type = server.getType() == ServerId.Type.COMPACTOR ? CompactionHost.Type.COMPACTOR ++ : CompactionHost.Type.TSERVER; + this.resourceGroup = server.getResourceGroup(); } @Override diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 687c233345,df49fcd19a..9d034a933f --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@@ -898,231 -959,7 +898,232 @@@ public class CompactionIT extends Compa } } + @Test + public void testCancelUserCompactionTimeoutExceeded() throws Exception { + testCancelUserCompactionTimeout(true); + } + + @Test + public void testCancelUserCompactionTimeoutNotExceeded() throws Exception { + testCancelUserCompactionTimeout(false); + } + + private void testCancelUserCompactionTimeout(boolean timeout) throws Exception { + + var uniqueNames = getUniqueNames(2); + String table1 = uniqueNames[0]; + String table2 = uniqueNames[1]; + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + // create a compaction service that uses a Planner that will schedule system jobs + // at a higher priority than user jobs + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner", + TestPlanner.class.getName()); + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner.opts.groups", + ("[{'group':'" + COMPACTOR_GROUP_1 + "'}]").replaceAll("'", "\"")); + + // create two tables that uses the compaction service + Map<String,String> props = new HashMap<>(); + props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), + SimpleCompactionDispatcher.class.getName()); + props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "testcancel"); + // Disable system compactions to start for these tables + props.put(Property.TABLE_MAJC_RATIO.getKey(), "20"); + + // configure tablet compaction iterator that slows compaction down + var ntc = new NewTableConfiguration(); + IteratorSetting iterSetting = new IteratorSetting(50, SlowIterator.class); + SlowIterator.setSleepTime(iterSetting, 5); + ntc.attachIterator(iterSetting, EnumSet.of(IteratorScope.majc)); + ntc.setProperties(props); + + // Create two tables and write some data + client.tableOperations().create(table1, ntc); + client.tableOperations().create(table2, ntc); + writeRows((ClientContext) client, table1, MAX_DATA, true); + writeRows((ClientContext) client, table2, MAX_DATA, true); + + var ctx = getCluster().getServerContext(); + Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + + // Start a compaction for table2, this is done so that the compactor will be busy + // and new jobs will queue up and wait + client.tableOperations().compact(table2, new CompactionConfig().setWait(false)); + + var tableId = TableId.of(client.tableOperations().tableIdMap().get(table1)); + var extent = new KeyExtent(tableId, null, null); + + // If timeout is true then set a short timeout so the system job can cancel the user job + // Otherwise the long timeout should prevent the system from clearing the selected files + var expiration = timeout ? "100ms" : "100s"; + client.tableOperations().setProperty(table1, + Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey(), expiration); + + // Submit a user job for table1 that will be put on the queue and waiting + // for the current job to finish + client.tableOperations().compact(table1, new CompactionConfig().setWait(false)); + // Wait for the fate operation to write selectedFiles + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var selectedFiles = tabletMeta.getSelectedFiles(); + if (selectedFiles != null) { + return !selectedFiles.getFiles().isEmpty(); + } + return false; + }, Wait.MAX_WAIT_MILLIS, 10); + + // Change the ratio so a system compaction will attempt to be scheduled for table 1 + client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1"); + + if (timeout) { + // Because of the custom planner, the system compaction should now take priority + // System compactions were previously not eligible to run if selectedFiles existed + // for a user compaction already (and they overlapped). But now system compaction jobs + // are eligible to run if the user compaction has not started or completed any jobs + // and the expiration period has been exceeded. + // When this happens the system compaction will delete the selectedFiles column + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + return tabletMeta.getSelectedFiles() == null; + }, Wait.MAX_WAIT_MILLIS, 100); + + // Wait for the system compaction to be running + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + assertTrue(externalCompactions.values().stream() + .allMatch(ec -> ec.getKind() == CompactionKind.SYSTEM)); + return externalCompactions.size() == 1; + }, Wait.MAX_WAIT_MILLIS, 10); + + // Wait for the user compaction to now run after the system finishes + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + var running = externalCompactions.values().stream() + .filter(ec -> ec.getKind() == CompactionKind.USER).count(); + return running == 1; + }, Wait.MAX_WAIT_MILLIS, 100); + } else { + // Wait for the user compaction to run, there should no system compactions scheduled + // even though system has the higher priority in the test because the timeout was + // not exceeded + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + assertTrue(externalCompactions.values().stream() + .allMatch(ec -> ec.getKind() == CompactionKind.USER)); + return externalCompactions.size() == 1; + }, Wait.MAX_WAIT_MILLIS, 10); + } + + // Wait and verify all compactions finish + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions().size(); + log.debug("Waiting for compactions to finish, count {}", externalCompactions); + return externalCompactions == 0 && tabletMeta.getCompacted().isEmpty() + && tabletMeta.getSelectedFiles() == null; + }, Wait.MAX_WAIT_MILLIS, 100); + } + + ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), table1); + } + + @Test + public void testOfflineAndCompactions() throws Exception { + var uniqueNames = getUniqueNames(1); + String table = uniqueNames[0]; + + // This test exercises concurrent compactions and table offline. + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + SortedSet<Text> splits = new TreeSet<>(); + for (int i = 1; i < 32; i++) { + splits.add(new Text(String.format("r:%04d", i))); + } + + client.tableOperations().create(table, new NewTableConfiguration().withSplits(splits)); + writeRows(client, table, 33, true); + // create two files per tablet + writeRows(client, table, 33, true); + + var ctx = getCluster().getServerContext(); + var tableId = ctx.getTableId(table); + + // verify assumptions of test, expect all tablets to have files + var files0 = getFiles(ctx, tableId); + assertEquals(32, files0.size()); + assertFalse(files0.values().stream().anyMatch(Set::isEmpty)); + + // lower the tables compaction ratio to cause system compactions + client.tableOperations().setProperty(table, Property.TABLE_MAJC_RATIO.getKey(), "1"); + + // start a bunch of compactions in the background + var executor = Executors.newCachedThreadPool(); + List<Future<?>> futures = new ArrayList<>(); + // start user compactions on a subset of the tables tablets, system compactions should attempt + // to run on all tablets. With concurrency should get a mix. + for (int i = 1; i < 20; i++) { + var startRow = new Text(String.format("r:%04d", i - 1)); + var endRow = new Text(String.format("r:%04d", i)); + futures.add(executor.submit(() -> { + CompactionConfig config = new CompactionConfig(); + config.setWait(true); + config.setStartRow(startRow); + config.setEndRow(endRow); + client.tableOperations().compact(table, config); + return null; + })); + } + + log.debug("Waiting for offline"); + // take tablet offline while there are concurrent compactions + client.tableOperations().offline(table, true); + + // grab a snapshot of all the tablets files after waiting for offline, do not expect any + // tablets files to change at this point + var files1 = getFiles(ctx, tableId); + + // wait for the background compactions + log.debug("Waiting for futures"); + for (var future : futures) { + try { + future.get(); + } catch (ExecutionException ee) { + // its ok if some of the compactions fail because the table was concurrently taken offline + assertTrue(ee.getMessage().contains("is offline")); + } + } + + // grab a second snapshot of the tablets files after all the background operations completed + var files2 = getFiles(ctx, tableId); + + // do not expect the files to have changed after the offline operation returned. + assertEquals(files1, files2); + + executor.shutdown(); + } + } + + private Map<KeyExtent,Set<StoredTabletFile>> getFiles(ServerContext ctx, TableId tableId) { + Map<KeyExtent,Set<StoredTabletFile>> files = new HashMap<>(); + try (var tablets = ctx.getAmple().readTablets().forTable(tableId).build()) { + for (var tablet : tablets) { + files.put(tablet.getExtent(), tablet.getFiles()); + } + } + return files; + } + + @SuppressWarnings("deprecation") @Test public void testGetActiveCompactions() throws Exception { final String table1 = this.getUniqueNames(1)[0];