This is an automated email from the ASF dual-hosted git repository. ibella pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new a0b3f9dd82 Created a CombinedSizeLimitSelector a0b3f9dd82 is described below commit a0b3f9dd82c7e474a22ed2b894dc2fc2bc5cc2d1 Author: Ivan Bella <i...@bella.name> AuthorDate: Sat Oct 21 16:57:13 2023 +0000 Created a CombinedSizeLimitSelector --- .../compaction/CombinedSizeLimitSelector.java | 50 +++++++ .../compaction/CombinedSizeLimitSelectorTest.java | 157 +++++++++++++++++++++ 2 files changed, 207 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CombinedSizeLimitSelector.java b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CombinedSizeLimitSelector.java new file mode 100644 index 0000000000..6ad56da4f4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CombinedSizeLimitSelector.java @@ -0,0 +1,50 @@ +package org.apache.accumulo.core.client.admin.compaction; + +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +/** + * A file selector for compactions that will select a set of files that is below a + * configured combined size up to a number of files. The configured max size is + * specified using the max.size option (table.compaction.selector.opts.max.size) + * as a bytes value (e.g. 10M). The configured max files is specified using the + * max.files option. + */ +public class CombinedSizeLimitSelector implements CompactionSelector { + public static final String MAX_SIZE_OPT = "max.size"; + public static final String MAX_FILES_OPT = "max.files"; + + // a max combined size or files of 0 essentially disables this selector + private long maxCombinedSize = 0; + private int maxFiles = Integer.MAX_VALUE; + + @Override + public void init(InitParameters iparams) { + maxCombinedSize = ConfigurationTypeHelper.getFixedMemoryAsBytes(iparams.getOptions().getOrDefault(MAX_SIZE_OPT, "0")); + maxFiles = Integer.parseInt(iparams.getOptions().getOrDefault(MAX_FILES_OPT, String.valueOf(Integer.MAX_VALUE))); + } + + @Override + public Selection select(SelectionParameters sparams) { + // short circuit if disabled + if (maxCombinedSize == 0 || maxFiles == 0) { + return new Selection(Collections.EMPTY_LIST); + } + + AtomicLong totalSize = new AtomicLong(); + AtomicInteger totalCount = new AtomicInteger(); + // return a selection of files whose combined size is under our limit + // first filter to those under the max size + // then sort by size, + // then filter to those whose combined size is under the max size up to the max number of files + // return empty selection if only 1 file selected + return new Selection(sparams.getAvailableFiles().stream().filter(f -> f.getEstimatedSize() <= maxCombinedSize) + .sorted((a, b) -> Long.signum(a.getEstimatedSize() - b.getEstimatedSize())) + .filter(f -> totalSize.addAndGet(f.getEstimatedSize()) <= maxCombinedSize && totalCount.incrementAndGet() <= maxFiles) + .collect(Collectors.collectingAndThen(Collectors.toList(), list -> list.size() <= 1 ? Collections.emptyList() : list))); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/client/admin/compaction/CombinedSizeLimitSelectorTest.java b/core/src/test/java/org/apache/accumulo/core/client/admin/compaction/CombinedSizeLimitSelectorTest.java new file mode 100644 index 0000000000..82c9efc04f --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/client/admin/compaction/CombinedSizeLimitSelectorTest.java @@ -0,0 +1,157 @@ +package org.apache.accumulo.core.client.admin.compaction; + +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import org.apache.accumulo.core.client.PluginEnvironment; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CombinedSizeLimitSelectorTest { + + @Test + public void testSizeLimit() { + CombinedSizeLimitSelector selector = new CombinedSizeLimitSelector(); + selector.init(createInitParameters("1G", null)); + CompactionSelector.Selection selection = selector.select(createSelectionParameters("1M", "2M", "500M", "999M", "1G")); + assertSelected(selection, "1M", "2M", "500M"); + + selection = selector.select(createSelectionParameters("1M", "511M", "512M", "999M", "1G")); + assertSelected(selection, "1M", "511M", "512M"); + } + + @Test + public void testFilesLimit() { + CombinedSizeLimitSelector selector = new CombinedSizeLimitSelector(); + selector.init(createInitParameters("1G", "2")); + CompactionSelector.Selection selection = selector.select(createSelectionParameters("1M", "2M", "500M", "999M", "1G")); + assertSelected(selection, "1M", "2M"); + } + + @Test + public void testDoNotCompactOnlyOne() { + CombinedSizeLimitSelector selector = new CombinedSizeLimitSelector(); + selector.init(createInitParameters("1M", null)); + CompactionSelector.Selection selection = selector.select(createSelectionParameters("1M", "2M", "500M", "999M", "1G")); + assertSelected(selection); + + selector.init(createInitParameters("1G", "1")); + selection = selector.select(createSelectionParameters("1M", "2M", "500M", "999M", "1G")); + assertSelected(selection); + } + + @Test + public void testDefaultDisabled() { + CombinedSizeLimitSelector selector = new CombinedSizeLimitSelector(); + selector.init(createInitParameters(null, null)); + CompactionSelector.Selection selection = selector.select(createSelectionParameters("1M", "2M", "500M", "999M", "1G")); + assertSelected(selection); + } + + @Test + public void testZeroMaxDisabled() { + CombinedSizeLimitSelector selector = new CombinedSizeLimitSelector(); + selector.init(createInitParameters("0B", null)); + CompactionSelector.Selection selection = selector.select(createSelectionParameters("1M", "2M", "500M", "999M", "1G")); + assertSelected(selection); + } + + @Test + public void testZeroFilesDisabled() { + CombinedSizeLimitSelector selector = new CombinedSizeLimitSelector(); + selector.init(createInitParameters("1G", "0")); + CompactionSelector.Selection selection = selector.select(createSelectionParameters("1M", "2M", "500M", "999M", "1G")); + assertSelected(selection); + } + + private CompactionSelector.InitParameters createInitParameters(final String maxSize, final String maxFiles) { + return new CompactionSelector.InitParameters() { + @Override + public Map<String, String> getOptions() { + ImmutableMap.Builder<String,String> builder = ImmutableMap.builder(); + if (maxSize != null) { + builder.put(CombinedSizeLimitSelector.MAX_SIZE_OPT, maxSize); + } + if (maxFiles != null) { + builder.put(CombinedSizeLimitSelector.MAX_FILES_OPT, maxFiles); + } + return builder.build(); + } + + @Override + public TableId getTableId() { + return null; + } + + @Override + public PluginEnvironment getEnvironment() { + return null; + } + }; + } + + private CompactionSelector.SelectionParameters createSelectionParameters(final String ... values) { + return new CompactionSelector.SelectionParameters() { + + @Override + public PluginEnvironment getEnvironment() { + return null; + } + + @Override + public Collection<CompactableFile> getAvailableFiles() { + return Collections2.transform(Arrays.asList(values), + v -> new CompactableFileImpl(randomURI(), ConfigurationTypeHelper.getFixedMemoryAsBytes(v), 10)); + } + + private URI randomURI() { + UUID uuid = UUID.randomUUID(); + try { + return new URI("file:///accumulo/tables/1/default_tablet/" + uuid); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Override + public Collection<Summary> getSummaries(Collection<CompactableFile> files, Predicate<SummarizerConfiguration> summarySelector) { + return null; + } + + @Override + public TableId getTableId() { + return null; + } + + @Override + public Optional<SortedKeyValueIterator<Key, Value>> getSample(CompactableFile cf, SamplerConfiguration sc) { + return Optional.empty(); + } + }; + } + + private void assertSelected(CompactionSelector.Selection selection, String ... sizes) { + assertEquals(Collections2.transform(Arrays.asList(sizes), s -> ConfigurationTypeHelper.getFixedMemoryAsBytes(s)).stream().sorted().collect(Collectors.toList()), + Collections2.transform(selection.getFilesToCompact(), f -> f.getEstimatedSize()).stream().sorted().collect(Collectors.toList())); + } +}