This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 509a86574eb299433923bbce1fabe1ca00ca4dc8
Merge: 51b439018f f05b78ee79
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Oct 17 17:06:32 2024 +0000

    Merge branch '3.1'

 .../org/apache/accumulo/core/client/admin/ActiveCompaction.java    | 1 +
 .../org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java  | 7 ++++---
 .../org/apache/accumulo/shell/commands/ActiveCompactionHelper.java | 2 ++
 .../java/org/apache/accumulo/test/functional/CompactionIT.java     | 1 +
 4 files changed, 8 insertions(+), 3 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
index c6c89e6f47,5dc7343ed2..af591d2b38
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
@@@ -25,8 -25,6 +25,7 @@@ import java.util.Map
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.TableNotFoundException;
  import org.apache.accumulo.core.client.admin.ActiveCompaction;
- import 
org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost.Type;
 +import org.apache.accumulo.core.client.admin.servers.ServerId;
  import org.apache.accumulo.core.data.TabletId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@@ -42,16 -41,15 +42,17 @@@ public class ActiveCompactionImpl exten
    private final org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction 
tac;
    private final ClientContext context;
    private final HostAndPort hostport;
-   private final Type type;
+   private final CompactionHost.Type type;
 +  private final String resourceGroup;
  
    ActiveCompactionImpl(ClientContext context,
 -      org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac, 
HostAndPort hostport,
 -      CompactionHost.Type type) {
 +      org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac, 
ServerId server) {
      this.tac = tac;
      this.context = context;
 -    this.hostport = hostport;
 -    this.type = type;
 +    this.hostport = HostAndPort.fromParts(server.getHost(), server.getPort());
-     this.type = server.getType() == ServerId.Type.COMPACTOR ? Type.COMPACTOR 
: Type.TSERVER;
++    this.type = server.getType() == ServerId.Type.COMPACTOR ? 
CompactionHost.Type.COMPACTOR
++        : CompactionHost.Type.TSERVER;
 +    this.resourceGroup = server.getResourceGroup();
    }
  
    @Override
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 687c233345,df49fcd19a..9d034a933f
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -898,231 -959,7 +898,232 @@@ public class CompactionIT extends Compa
      }
    }
  
 +  @Test
 +  public void testCancelUserCompactionTimeoutExceeded() throws Exception {
 +    testCancelUserCompactionTimeout(true);
 +  }
 +
 +  @Test
 +  public void testCancelUserCompactionTimeoutNotExceeded() throws Exception {
 +    testCancelUserCompactionTimeout(false);
 +  }
 +
 +  private void testCancelUserCompactionTimeout(boolean timeout) throws 
Exception {
 +
 +    var uniqueNames = getUniqueNames(2);
 +    String table1 = uniqueNames[0];
 +    String table2 = uniqueNames[1];
 +
 +    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      // create a compaction service that uses a Planner that will schedule 
system jobs
 +      // at a higher priority than user jobs
 +      client.instanceOperations().setProperty(
 +          Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner",
 +          TestPlanner.class.getName());
 +      client.instanceOperations().setProperty(
 +          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"testcancel.planner.opts.groups",
 +          ("[{'group':'" + COMPACTOR_GROUP_1 + "'}]").replaceAll("'", "\""));
 +
 +      // create two tables that uses the compaction service
 +      Map<String,String> props = new HashMap<>();
 +      props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
 +          SimpleCompactionDispatcher.class.getName());
 +      props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "testcancel");
 +      // Disable system compactions to start for these tables
 +      props.put(Property.TABLE_MAJC_RATIO.getKey(), "20");
 +
 +      // configure tablet compaction iterator that slows compaction down
 +      var ntc = new NewTableConfiguration();
 +      IteratorSetting iterSetting = new IteratorSetting(50, 
SlowIterator.class);
 +      SlowIterator.setSleepTime(iterSetting, 5);
 +      ntc.attachIterator(iterSetting, EnumSet.of(IteratorScope.majc));
 +      ntc.setProperties(props);
 +
 +      // Create two tables and write some data
 +      client.tableOperations().create(table1, ntc);
 +      client.tableOperations().create(table2, ntc);
 +      writeRows((ClientContext) client, table1, MAX_DATA, true);
 +      writeRows((ClientContext) client, table2, MAX_DATA, true);
 +
 +      var ctx = getCluster().getServerContext();
 +      Optional<HostAndPort> coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(ctx);
 +      if (coordinatorHost.isEmpty()) {
 +        throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
 +      }
 +
 +      // Start a compaction for table2, this is done so that the compactor 
will be busy
 +      // and new jobs will queue up and wait
 +      client.tableOperations().compact(table2, new 
CompactionConfig().setWait(false));
 +
 +      var tableId = 
TableId.of(client.tableOperations().tableIdMap().get(table1));
 +      var extent = new KeyExtent(tableId, null, null);
 +
 +      // If timeout is true then set a short timeout so the system job can 
cancel the user job
 +      // Otherwise the long timeout should prevent the system from clearing 
the selected files
 +      var expiration = timeout ? "100ms" : "100s";
 +      client.tableOperations().setProperty(table1,
 +          Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey(), 
expiration);
 +
 +      // Submit a user job for table1 that will be put on the queue and 
waiting
 +      // for the current job to finish
 +      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(false));
 +      // Wait for the fate operation to write selectedFiles
 +      Wait.waitFor(() -> {
 +        var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +        var selectedFiles = tabletMeta.getSelectedFiles();
 +        if (selectedFiles != null) {
 +          return !selectedFiles.getFiles().isEmpty();
 +        }
 +        return false;
 +      }, Wait.MAX_WAIT_MILLIS, 10);
 +
 +      // Change the ratio so a system compaction will attempt to be scheduled 
for table 1
 +      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.getKey(), "1");
 +
 +      if (timeout) {
 +        // Because of the custom planner, the system compaction should now 
take priority
 +        // System compactions were previously not eligible to run if 
selectedFiles existed
 +        // for a user compaction already (and they overlapped). But now 
system compaction jobs
 +        // are eligible to run if the user compaction has not started or 
completed any jobs
 +        // and the expiration period has been exceeded.
 +        // When this happens the system compaction will delete the 
selectedFiles column
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          return tabletMeta.getSelectedFiles() == null;
 +        }, Wait.MAX_WAIT_MILLIS, 100);
 +
 +        // Wait for the system compaction to be running
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          var externalCompactions = tabletMeta.getExternalCompactions();
 +          assertTrue(externalCompactions.values().stream()
 +              .allMatch(ec -> ec.getKind() == CompactionKind.SYSTEM));
 +          return externalCompactions.size() == 1;
 +        }, Wait.MAX_WAIT_MILLIS, 10);
 +
 +        // Wait for the user compaction to now run after the system finishes
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          var externalCompactions = tabletMeta.getExternalCompactions();
 +          var running = externalCompactions.values().stream()
 +              .filter(ec -> ec.getKind() == CompactionKind.USER).count();
 +          return running == 1;
 +        }, Wait.MAX_WAIT_MILLIS, 100);
 +      } else {
 +        // Wait for the user compaction to run, there should no system 
compactions scheduled
 +        // even though system has the higher priority in the test because the 
timeout was
 +        // not exceeded
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          var externalCompactions = tabletMeta.getExternalCompactions();
 +          assertTrue(externalCompactions.values().stream()
 +              .allMatch(ec -> ec.getKind() == CompactionKind.USER));
 +          return externalCompactions.size() == 1;
 +        }, Wait.MAX_WAIT_MILLIS, 10);
 +      }
 +
 +      // Wait and verify all compactions finish
 +      Wait.waitFor(() -> {
 +        var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +        var externalCompactions = tabletMeta.getExternalCompactions().size();
 +        log.debug("Waiting for compactions to finish, count {}", 
externalCompactions);
 +        return externalCompactions == 0 && tabletMeta.getCompacted().isEmpty()
 +            && tabletMeta.getSelectedFiles() == null;
 +      }, Wait.MAX_WAIT_MILLIS, 100);
 +    }
 +
 +    
ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), 
table1);
 +  }
 +
 +  @Test
 +  public void testOfflineAndCompactions() throws Exception {
 +    var uniqueNames = getUniqueNames(1);
 +    String table = uniqueNames[0];
 +
 +    // This test exercises concurrent compactions and table offline.
 +
 +    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      SortedSet<Text> splits = new TreeSet<>();
 +      for (int i = 1; i < 32; i++) {
 +        splits.add(new Text(String.format("r:%04d", i)));
 +      }
 +
 +      client.tableOperations().create(table, new 
NewTableConfiguration().withSplits(splits));
 +      writeRows(client, table, 33, true);
 +      // create two files per tablet
 +      writeRows(client, table, 33, true);
 +
 +      var ctx = getCluster().getServerContext();
 +      var tableId = ctx.getTableId(table);
 +
 +      // verify assumptions of test, expect all tablets to have files
 +      var files0 = getFiles(ctx, tableId);
 +      assertEquals(32, files0.size());
 +      assertFalse(files0.values().stream().anyMatch(Set::isEmpty));
 +
 +      // lower the tables compaction ratio to cause system compactions
 +      client.tableOperations().setProperty(table, 
Property.TABLE_MAJC_RATIO.getKey(), "1");
 +
 +      // start a bunch of compactions in the background
 +      var executor = Executors.newCachedThreadPool();
 +      List<Future<?>> futures = new ArrayList<>();
 +      // start user compactions on a subset of the tables tablets, system 
compactions should attempt
 +      // to run on all tablets. With concurrency should get a mix.
 +      for (int i = 1; i < 20; i++) {
 +        var startRow = new Text(String.format("r:%04d", i - 1));
 +        var endRow = new Text(String.format("r:%04d", i));
 +        futures.add(executor.submit(() -> {
 +          CompactionConfig config = new CompactionConfig();
 +          config.setWait(true);
 +          config.setStartRow(startRow);
 +          config.setEndRow(endRow);
 +          client.tableOperations().compact(table, config);
 +          return null;
 +        }));
 +      }
 +
 +      log.debug("Waiting for offline");
 +      // take tablet offline while there are concurrent compactions
 +      client.tableOperations().offline(table, true);
 +
 +      // grab a snapshot of all the tablets files after waiting for offline, 
do not expect any
 +      // tablets files to change at this point
 +      var files1 = getFiles(ctx, tableId);
 +
 +      // wait for the background compactions
 +      log.debug("Waiting for futures");
 +      for (var future : futures) {
 +        try {
 +          future.get();
 +        } catch (ExecutionException ee) {
 +          // its ok if some of the compactions fail because the table was 
concurrently taken offline
 +          assertTrue(ee.getMessage().contains("is offline"));
 +        }
 +      }
 +
 +      // grab a second snapshot of the tablets files after all the background 
operations completed
 +      var files2 = getFiles(ctx, tableId);
 +
 +      // do not expect the files to have changed after the offline operation 
returned.
 +      assertEquals(files1, files2);
 +
 +      executor.shutdown();
 +    }
 +  }
 +
 +  private Map<KeyExtent,Set<StoredTabletFile>> getFiles(ServerContext ctx, 
TableId tableId) {
 +    Map<KeyExtent,Set<StoredTabletFile>> files = new HashMap<>();
 +    try (var tablets = 
ctx.getAmple().readTablets().forTable(tableId).build()) {
 +      for (var tablet : tablets) {
 +        files.put(tablet.getExtent(), tablet.getFiles());
 +      }
 +    }
 +    return files;
 +  }
 +
+   @SuppressWarnings("deprecation")
    @Test
    public void testGetActiveCompactions() throws Exception {
      final String table1 = this.getUniqueNames(1)[0];

Reply via email to