This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 839e501a46 Support configuring compactions by tablet and output path 
(#5720)
839e501a46 is described below

commit 839e501a467be02cef0492806721b05720c3dfdc
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 1 17:11:34 2025 -0400

    Support configuring compactions by tablet and output path (#5720)
    
    Adds two methods to compaction configurer that provide the tablet and
    output path for the compaction.  This allows different configuration to
    be generated based on the tablet or volume.
---
 .../admin/compaction/CompactionConfigurer.java     | 25 +++++--
 .../accumulo/tserver/tablet/CompactableImpl.java   |  8 +--
 .../accumulo/tserver/tablet/CompactableUtils.java  | 35 ++++++---
 .../ConfigurableCompactionStrategyTest.java        | 11 +++
 .../accumulo/test/functional/CompactionIT.java     | 82 ++++++++++++++++++++++
 5 files changed, 143 insertions(+), 18 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
 
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
index 6a58704e54..e694535086 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
@@ -18,11 +18,13 @@
  */
 package org.apache.accumulo.core.client.admin.compaction;
 
+import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.PluginEnvironment;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.TabletId;
 
 /**
  * Enables dynamically overriding of per table properties used to create the 
output file for a
@@ -34,7 +36,7 @@ public interface CompactionConfigurer {
   /**
    * @since 2.1.0
    */
-  public interface InitParameters {
+  interface InitParameters {
     TableId getTableId();
 
     Map<String,String> getOptions();
@@ -47,10 +49,25 @@ public interface CompactionConfigurer {
   /**
    * @since 2.1.0
    */
-  public interface InputParameters {
+  interface InputParameters {
     TableId getTableId();
 
-    public Collection<CompactableFile> getInputFiles();
+    Collection<CompactableFile> getInputFiles();
+
+    /**
+     * Returns the tablet that is compacting.
+     *
+     * @since 2.1.4
+     */
+    TabletId getTabletId();
+
+    /**
+     * Returns the path that the compaction will write to, one use of this is 
to know the output
+     * volume.
+     *
+     * @since 2.1.4
+     */
+    URI getOutputFile();
 
     PluginEnvironment getEnvironment();
   }
@@ -60,7 +77,7 @@ public interface CompactionConfigurer {
    *
    * @since 2.1.0
    */
-  public class Overrides {
+  class Overrides {
     private final Map<String,String> tablePropertyOverrides;
 
     public Overrides(Map<String,String> tablePropertyOverrides) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 258529fba2..72aac99c27 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -146,7 +146,7 @@ public class CompactableImpl implements Compactable {
 
     Set<StoredTabletFile> getFilesToDrop();
 
-    Map<String,String> getConfigOverrides(Set<CompactableFile> files);
+    Map<String,String> getConfigOverrides(Set<CompactableFile> files, 
TabletFile tmpFile);
 
   }
 
@@ -1373,11 +1373,11 @@ public class CompactableImpl implements Compactable {
     var cInfo = ocInfo.orElseThrow();
 
     try {
-      Map<String,String> overrides =
-          CompactableUtils.getOverrides(job.getKind(), tablet, 
cInfo.localHelper, job.getFiles());
-
       TabletFile compactTmpName = 
tablet.getNextMapFilenameForMajc(cInfo.propagateDeletes);
 
+      Map<String,String> overrides = 
CompactableUtils.getOverrides(job.getKind(), tablet,
+          cInfo.localHelper, job.getFiles(), compactTmpName);
+
       ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
 
       ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles,
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index be4672107a..d332050aab 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.tablet;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -54,8 +55,10 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.TabletId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -212,7 +215,8 @@ public class CompactableUtils {
     return result;
   }
 
-  static Map<String,String> computeOverrides(Tablet tablet, 
Set<CompactableFile> files) {
+  static Map<String,String> computeOverrides(Tablet tablet, 
Set<CompactableFile> files,
+      TabletFile tmpfile) {
     var tconf = tablet.getTableConfiguration();
 
     var configurorClass = tconf.get(Property.TABLE_COMPACTION_CONFIGURER);
@@ -222,11 +226,11 @@ public class CompactableUtils {
 
     var opts = 
tconf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS);
 
-    return computeOverrides(tablet, files, new PluginConfig(configurorClass, 
opts));
+    return computeOverrides(tablet, files, new PluginConfig(configurorClass, 
opts), tmpfile);
   }
 
   static Map<String,String> computeOverrides(Tablet tablet, 
Set<CompactableFile> files,
-      PluginConfig cfg) {
+      PluginConfig cfg, TabletFile compactTmpName) {
     CompactionConfigurer configurer = 
CompactableUtils.newInstance(tablet.getTableConfiguration(),
         cfg.getClassName(), CompactionConfigurer.class);
 
@@ -255,6 +259,16 @@ public class CompactableUtils {
         return files;
       }
 
+      @Override
+      public TabletId getTabletId() {
+        return new TabletIdImpl(tablet.getExtent());
+      }
+
+      @Override
+      public URI getOutputFile() {
+        return computeCompactionFileDest(compactTmpName).getPath().toUri();
+      }
+
       @Override
       public PluginEnvironment getEnvironment() {
         return senv;
@@ -400,7 +414,7 @@ public class CompactableUtils {
     }
 
     @Override
-    public Map<String,String> getConfigOverrides(Set<CompactableFile> files) {
+    public Map<String,String> getConfigOverrides(Set<CompactableFile> files, 
TabletFile tmpFile) {
       return computeOverrides(wp);
     }
 
@@ -456,9 +470,10 @@ public class CompactableUtils {
     }
 
     @Override
-    public Map<String,String> getConfigOverrides(Set<CompactableFile> files) {
+    public Map<String,String> getConfigOverrides(Set<CompactableFile> files,
+        TabletFile compactTmpName) {
       if (!UserCompactionUtils.isDefault(compactionConfig.getConfigurer())) {
-        return computeOverrides(tablet, files, 
compactionConfig.getConfigurer());
+        return computeOverrides(tablet, files, 
compactionConfig.getConfigurer(), compactTmpName);
       } else if 
(!CompactionStrategyConfigUtil.isDefault(compactionConfig.getCompactionStrategy())
           && wp != null) {
         return computeOverrides(wp);
@@ -523,16 +538,16 @@ public class CompactableUtils {
   }
 
   public static Map<String,String> getOverrides(CompactionKind kind, Tablet 
tablet,
-      CompactionHelper driver, Set<CompactableFile> files) {
+      CompactionHelper driver, Set<CompactableFile> files, TabletFile 
compactTmpName) {
 
     Map<String,String> overrides = null;
 
     if (kind == CompactionKind.USER || kind == CompactionKind.SELECTOR) {
-      overrides = driver.getConfigOverrides(files);
+      overrides = driver.getConfigOverrides(files, compactTmpName);
     }
 
     if (overrides == null) {
-      overrides = computeOverrides(tablet, files);
+      overrides = computeOverrides(tablet, files, compactTmpName);
     }
 
     if (overrides == null) {
@@ -564,7 +579,7 @@ public class CompactableUtils {
     TableConfiguration tableConf = tablet.getTableConfiguration();
 
     AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
-        getOverrides(job.getKind(), tablet, cInfo.localHelper, 
job.getFiles()));
+        getOverrides(job.getKind(), tablet, cInfo.localHelper, job.getFiles(), 
tmpFileName));
 
     final FileCompactor compactor = new FileCompactor(tablet.getContext(), 
tablet.getExtent(),
         compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, 
compactionConfig,
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
index 7567fbc815..42f28744de 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
@@ -36,6 +36,7 @@ import 
org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer.Ove
 import org.apache.accumulo.core.compaction.CompactionSettings;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.TabletId;
 import org.junit.jupiter.api.Test;
 
 public class ConfigurableCompactionStrategyTest {
@@ -85,6 +86,16 @@ public class ConfigurableCompactionStrategyTest {
         return files;
       }
 
+      @Override
+      public TabletId getTabletId() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public URI getOutputFile() {
+        throw new UnsupportedOperationException();
+      }
+
       @Override
       public PluginEnvironment getEnvironment() {
         return null;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index f3104ec00f..069337173f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -38,6 +38,7 @@ import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -61,6 +62,7 @@ import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.PluginConfig;
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
 import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
 import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -70,6 +72,8 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FilePrefix;
+import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.iterators.DevNull;
 import org.apache.accumulo.core.iterators.Filter;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -78,7 +82,9 @@ import 
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.user.AgeOffFilter;
 import org.apache.accumulo.core.iterators.user.GrepIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
@@ -483,6 +489,45 @@ public class CompactionIT extends AccumuloClusterHarness {
     }
   }
 
+  public static class FourConfigurer implements CompactionConfigurer {
+
+    @Override
+    public void init(InitParameters iparams) {}
+
+    @Override
+    public Overrides override(InputParameters params) {
+      if (new Text("4").equals(params.getTabletId().getEndRow())) {
+        return new 
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "gz"));
+      } else {
+        return new 
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"));
+      }
+    }
+  }
+
+  public static class UriConfigurer implements CompactionConfigurer {
+
+    @Override
+    public void init(InitParameters iparams) {}
+
+    @Override
+    public Overrides override(InputParameters params) {
+      // This will validate the paths looks like a tablet file path and throw 
an exception if it
+      // does not
+      var parts = TabletFile.parsePath(new Path(params.getOutputFile()));
+      // For this test should be producing A files
+      Preconditions.checkArgument(
+          
parts.getFileName().startsWith(FilePrefix.FULL_COMPACTION.getPrefix() + ""));
+      
Preconditions.checkArgument(parts.getFileName().endsWith(RFile.EXTENSION));
+
+      if (parts.getTabletDir()
+          
.equals(MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME))
 {
+        return new 
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "gz"));
+      } else {
+        return new 
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"));
+      }
+    }
+  }
+
   @Test
   public void testConfigurer() throws Exception {
     String tableName = this.getUniqueNames(1)[0];
@@ -527,6 +572,43 @@ public class CompactionIT extends AccumuloClusterHarness {
       assertTrue(sizes > data.length * 10 && sizes < data.length * 11,
           "Unexpected files sizes : " + sizes);
 
+      // compact using a custom configurer that considers tablet end row
+      client.tableOperations().addSplits(tableName, new TreeSet<>(List.of(new 
Text("4"))));
+      client.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true)
+          .setConfigurer(new PluginConfig(FourConfigurer.class.getName())));
+      var tabletSizes = getFilesSizesPerTablet(client, tableName);
+      assertTrue(tabletSizes.get("4") > 0 && tabletSizes.get("4") < 1000, 
tabletSizes.toString());
+      assertTrue(tabletSizes.get("null") > 500_000 && tabletSizes.get("4") < 
510_000,
+          tabletSizes.toString());
+
+      // compact using a custom configurer that considers the output path, 
should invert which file
+      // is compressed
+      client.tableOperations().addSplits(tableName, new TreeSet<>(List.of(new 
Text("4"))));
+      client.tableOperations().compact(tableName, new 
CompactionConfig().setWait(true)
+          .setConfigurer(new PluginConfig(UriConfigurer.class.getName())));
+      tabletSizes = getFilesSizesPerTablet(client, tableName);
+      assertTrue(tabletSizes.get("4") > 500_000 && tabletSizes.get("4") < 
510_000,
+          tabletSizes.toString());
+      assertTrue(tabletSizes.get("null") > 0 && tabletSizes.get("null") < 1000,
+          tabletSizes.toString());
+
+    }
+  }
+
+  private Map<String,Long> getFilesSizesPerTablet(AccumuloClient client, 
String table)
+      throws Exception {
+    var ctx = (ClientContext) client;
+    var ample = ctx.getAmple();
+    var id = ctx.getTableId(table);
+
+    try (var tablets = ample.readTablets().forTable(id).build()) {
+      Map<String,Long> sizes = new TreeMap<>();
+      for (var tablet : tablets) {
+        var tsize = 
tablet.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
+        var endRow = tablet.getEndRow();
+        sizes.put(endRow == null ? "null" : endRow.toString(), tsize);
+      }
+      return sizes;
     }
   }
 

Reply via email to