This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 839e501a46 Support configuring compactions by tablet and output path
(#5720)
839e501a46 is described below
commit 839e501a467be02cef0492806721b05720c3dfdc
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 1 17:11:34 2025 -0400
Support configuring compactions by tablet and output path (#5720)
Adds two methods to compaction configurer that provide the tablet and
output path for the compaction. This allows different configuration to
be generated based on the tablet or volume.
---
.../admin/compaction/CompactionConfigurer.java | 25 +++++--
.../accumulo/tserver/tablet/CompactableImpl.java | 8 +--
.../accumulo/tserver/tablet/CompactableUtils.java | 35 ++++++---
.../ConfigurableCompactionStrategyTest.java | 11 +++
.../accumulo/test/functional/CompactionIT.java | 82 ++++++++++++++++++++++
5 files changed, 143 insertions(+), 18 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
index 6a58704e54..e694535086 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionConfigurer.java
@@ -18,11 +18,13 @@
*/
package org.apache.accumulo.core.client.admin.compaction;
+import java.net.URI;
import java.util.Collection;
import java.util.Map;
import org.apache.accumulo.core.client.PluginEnvironment;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.TabletId;
/**
* Enables dynamically overriding of per table properties used to create the
output file for a
@@ -34,7 +36,7 @@ public interface CompactionConfigurer {
/**
* @since 2.1.0
*/
- public interface InitParameters {
+ interface InitParameters {
TableId getTableId();
Map<String,String> getOptions();
@@ -47,10 +49,25 @@ public interface CompactionConfigurer {
/**
* @since 2.1.0
*/
- public interface InputParameters {
+ interface InputParameters {
TableId getTableId();
- public Collection<CompactableFile> getInputFiles();
+ Collection<CompactableFile> getInputFiles();
+
+ /**
+ * Returns the tablet that is compacting.
+ *
+ * @since 2.1.4
+ */
+ TabletId getTabletId();
+
+ /**
+ * Returns the path that the compaction will write to, one use of this is
to know the output
+ * volume.
+ *
+ * @since 2.1.4
+ */
+ URI getOutputFile();
PluginEnvironment getEnvironment();
}
@@ -60,7 +77,7 @@ public interface CompactionConfigurer {
*
* @since 2.1.0
*/
- public class Overrides {
+ class Overrides {
private final Map<String,String> tablePropertyOverrides;
public Overrides(Map<String,String> tablePropertyOverrides) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 258529fba2..72aac99c27 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -146,7 +146,7 @@ public class CompactableImpl implements Compactable {
Set<StoredTabletFile> getFilesToDrop();
- Map<String,String> getConfigOverrides(Set<CompactableFile> files);
+ Map<String,String> getConfigOverrides(Set<CompactableFile> files,
TabletFile tmpFile);
}
@@ -1373,11 +1373,11 @@ public class CompactableImpl implements Compactable {
var cInfo = ocInfo.orElseThrow();
try {
- Map<String,String> overrides =
- CompactableUtils.getOverrides(job.getKind(), tablet,
cInfo.localHelper, job.getFiles());
-
TabletFile compactTmpName =
tablet.getNextMapFilenameForMajc(cInfo.propagateDeletes);
+ Map<String,String> overrides =
CompactableUtils.getOverrides(job.getKind(), tablet,
+ cInfo.localHelper, job.getFiles(), compactTmpName);
+
ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles,
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index be4672107a..d332050aab 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.tablet;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -54,8 +55,10 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -212,7 +215,8 @@ public class CompactableUtils {
return result;
}
- static Map<String,String> computeOverrides(Tablet tablet,
Set<CompactableFile> files) {
+ static Map<String,String> computeOverrides(Tablet tablet,
Set<CompactableFile> files,
+ TabletFile tmpfile) {
var tconf = tablet.getTableConfiguration();
var configurorClass = tconf.get(Property.TABLE_COMPACTION_CONFIGURER);
@@ -222,11 +226,11 @@ public class CompactableUtils {
var opts =
tconf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS);
- return computeOverrides(tablet, files, new PluginConfig(configurorClass,
opts));
+ return computeOverrides(tablet, files, new PluginConfig(configurorClass,
opts), tmpfile);
}
static Map<String,String> computeOverrides(Tablet tablet,
Set<CompactableFile> files,
- PluginConfig cfg) {
+ PluginConfig cfg, TabletFile compactTmpName) {
CompactionConfigurer configurer =
CompactableUtils.newInstance(tablet.getTableConfiguration(),
cfg.getClassName(), CompactionConfigurer.class);
@@ -255,6 +259,16 @@ public class CompactableUtils {
return files;
}
+ @Override
+ public TabletId getTabletId() {
+ return new TabletIdImpl(tablet.getExtent());
+ }
+
+ @Override
+ public URI getOutputFile() {
+ return computeCompactionFileDest(compactTmpName).getPath().toUri();
+ }
+
@Override
public PluginEnvironment getEnvironment() {
return senv;
@@ -400,7 +414,7 @@ public class CompactableUtils {
}
@Override
- public Map<String,String> getConfigOverrides(Set<CompactableFile> files) {
+ public Map<String,String> getConfigOverrides(Set<CompactableFile> files,
TabletFile tmpFile) {
return computeOverrides(wp);
}
@@ -456,9 +470,10 @@ public class CompactableUtils {
}
@Override
- public Map<String,String> getConfigOverrides(Set<CompactableFile> files) {
+ public Map<String,String> getConfigOverrides(Set<CompactableFile> files,
+ TabletFile compactTmpName) {
if (!UserCompactionUtils.isDefault(compactionConfig.getConfigurer())) {
- return computeOverrides(tablet, files,
compactionConfig.getConfigurer());
+ return computeOverrides(tablet, files,
compactionConfig.getConfigurer(), compactTmpName);
} else if
(!CompactionStrategyConfigUtil.isDefault(compactionConfig.getCompactionStrategy())
&& wp != null) {
return computeOverrides(wp);
@@ -523,16 +538,16 @@ public class CompactableUtils {
}
public static Map<String,String> getOverrides(CompactionKind kind, Tablet
tablet,
- CompactionHelper driver, Set<CompactableFile> files) {
+ CompactionHelper driver, Set<CompactableFile> files, TabletFile
compactTmpName) {
Map<String,String> overrides = null;
if (kind == CompactionKind.USER || kind == CompactionKind.SELECTOR) {
- overrides = driver.getConfigOverrides(files);
+ overrides = driver.getConfigOverrides(files, compactTmpName);
}
if (overrides == null) {
- overrides = computeOverrides(tablet, files);
+ overrides = computeOverrides(tablet, files, compactTmpName);
}
if (overrides == null) {
@@ -564,7 +579,7 @@ public class CompactableUtils {
TableConfiguration tableConf = tablet.getTableConfiguration();
AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
- getOverrides(job.getKind(), tablet, cInfo.localHelper,
job.getFiles()));
+ getOverrides(job.getKind(), tablet, cInfo.localHelper, job.getFiles(),
tmpFileName));
final FileCompactor compactor = new FileCompactor(tablet.getContext(),
tablet.getExtent(),
compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters,
compactionConfig,
diff --git
a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
index 7567fbc815..42f28744de 100644
---
a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
+++
b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/strategies/ConfigurableCompactionStrategyTest.java
@@ -36,6 +36,7 @@ import
org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer.Ove
import org.apache.accumulo.core.compaction.CompactionSettings;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.TabletId;
import org.junit.jupiter.api.Test;
public class ConfigurableCompactionStrategyTest {
@@ -85,6 +86,16 @@ public class ConfigurableCompactionStrategyTest {
return files;
}
+ @Override
+ public TabletId getTabletId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public URI getOutputFile() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public PluginEnvironment getEnvironment() {
return null;
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index f3104ec00f..069337173f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -38,6 +38,7 @@ import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -61,6 +62,7 @@ import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.PluginConfig;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -70,6 +72,8 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FilePrefix;
+import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.iterators.DevNull;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -78,7 +82,9 @@ import
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
@@ -483,6 +489,45 @@ public class CompactionIT extends AccumuloClusterHarness {
}
}
+ public static class FourConfigurer implements CompactionConfigurer {
+
+ @Override
+ public void init(InitParameters iparams) {}
+
+ @Override
+ public Overrides override(InputParameters params) {
+ if (new Text("4").equals(params.getTabletId().getEndRow())) {
+ return new
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "gz"));
+ } else {
+ return new
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"));
+ }
+ }
+ }
+
+ public static class UriConfigurer implements CompactionConfigurer {
+
+ @Override
+ public void init(InitParameters iparams) {}
+
+ @Override
+ public Overrides override(InputParameters params) {
+ // This will validate the paths looks like a tablet file path and throw
an exception if it
+ // does not
+ var parts = TabletFile.parsePath(new Path(params.getOutputFile()));
+ // For this test should be producing A files
+ Preconditions.checkArgument(
+
parts.getFileName().startsWith(FilePrefix.FULL_COMPACTION.getPrefix() + ""));
+
Preconditions.checkArgument(parts.getFileName().endsWith(RFile.EXTENSION));
+
+ if (parts.getTabletDir()
+
.equals(MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME))
{
+ return new
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "gz"));
+ } else {
+ return new
Overrides(Map.of(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"));
+ }
+ }
+ }
+
@Test
public void testConfigurer() throws Exception {
String tableName = this.getUniqueNames(1)[0];
@@ -527,6 +572,43 @@ public class CompactionIT extends AccumuloClusterHarness {
assertTrue(sizes > data.length * 10 && sizes < data.length * 11,
"Unexpected files sizes : " + sizes);
+ // compact using a custom configurer that considers tablet end row
+ client.tableOperations().addSplits(tableName, new TreeSet<>(List.of(new
Text("4"))));
+ client.tableOperations().compact(tableName, new
CompactionConfig().setWait(true)
+ .setConfigurer(new PluginConfig(FourConfigurer.class.getName())));
+ var tabletSizes = getFilesSizesPerTablet(client, tableName);
+ assertTrue(tabletSizes.get("4") > 0 && tabletSizes.get("4") < 1000,
tabletSizes.toString());
+ assertTrue(tabletSizes.get("null") > 500_000 && tabletSizes.get("4") <
510_000,
+ tabletSizes.toString());
+
+ // compact using a custom configurer that considers the output path,
should invert which file
+ // is compressed
+ client.tableOperations().addSplits(tableName, new TreeSet<>(List.of(new
Text("4"))));
+ client.tableOperations().compact(tableName, new
CompactionConfig().setWait(true)
+ .setConfigurer(new PluginConfig(UriConfigurer.class.getName())));
+ tabletSizes = getFilesSizesPerTablet(client, tableName);
+ assertTrue(tabletSizes.get("4") > 500_000 && tabletSizes.get("4") <
510_000,
+ tabletSizes.toString());
+ assertTrue(tabletSizes.get("null") > 0 && tabletSizes.get("null") < 1000,
+ tabletSizes.toString());
+
+ }
+ }
+
+ private Map<String,Long> getFilesSizesPerTablet(AccumuloClient client,
String table)
+ throws Exception {
+ var ctx = (ClientContext) client;
+ var ample = ctx.getAmple();
+ var id = ctx.getTableId(table);
+
+ try (var tablets = ample.readTablets().forTable(id).build()) {
+ Map<String,Long> sizes = new TreeMap<>();
+ for (var tablet : tablets) {
+ var tsize =
tablet.getFilesMap().values().stream().mapToLong(DataFileValue::getSize).sum();
+ var endRow = tablet.getEndRow();
+ sizes.put(endRow == null ? "null" : endRow.toString(), tsize);
+ }
+ return sizes;
}
}