This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 164c85cfd8 Implements missing compacton selection functionality (#3796) 164c85cfd8 is described below commit 164c85cfd88596784602908be1ab2e6d09030057 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Oct 6 15:11:23 2023 -0400 Implements missing compacton selection functionality (#3796) Compaction selection functionality that opened files was not implemented because it opens user files in the manager. This change implements the functionality inorder to get integration test passing. There is already an open issue about finding a better way to do this. See #3526 --- .../server/compaction/CompactionPluginUtils.java | 64 +++++++++++++++++++++- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java index 6f7529948f..7978d3c2e5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.compaction; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -34,6 +36,7 @@ import org.apache.accumulo.core.client.admin.PluginConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; +import org.apache.accumulo.core.client.rfile.RFileSource; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.client.summary.Summary; @@ -47,14 +50,23 @@ import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.summary.Gatherer; +import org.apache.accumulo.core.summary.SummarizerFactory; +import org.apache.accumulo.core.summary.SummaryCollection; +import org.apache.accumulo.core.summary.SummaryReader; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,7 +139,32 @@ public class CompactionPluginUtils { public Collection<Summary> getSummaries(Collection<CompactableFile> files, Predicate<SummarizerConfiguration> summarySelector) { - throw new UnsupportedOperationException(); + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var tableConf = context.getTableConfiguration(extent.tableId()); + + SummaryCollection sc = new SummaryCollection(); + SummarizerFactory factory = new SummarizerFactory(tableConf); + for (CompactableFile cf : files) { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + RFileSource source = new RFileSource(new FSDataInputStream(fs.open(file.getPath())), + fs.getFileStatus(file.getPath()).getLen(), file.getRange()); + + SummaryCollection fsc = SummaryReader + .load(conf, source, file.getFileName(), summarySelector, factory, + tableConf.getCryptoService()) + .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent))); + + sc.merge(fsc, factory); + } + return sc.getSummaries(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } @Override @@ -141,9 +178,30 @@ public class CompactionPluginUtils { } @Override - public Optional<SortedKeyValueIterator<Key,Value>> getSample(CompactableFile file, + public Optional<SortedKeyValueIterator<Key,Value>> getSample(CompactableFile cf, SamplerConfiguration sc) { - throw new UnsupportedOperationException(); + + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + var tableConf = context.getTableConfiguration(extent.tableId()); + var iter = FileOperations.getInstance().newReaderBuilder() + .forFile(file, fs, conf, tableConf.getCryptoService()) + .withTableConfiguration(tableConf).seekToBeginning().build(); + var sampleIter = iter.getSample(new SamplerConfigurationImpl(sc)); + if (sampleIter == null) { + iter.close(); + return Optional.empty(); + } + + return Optional.of(sampleIter); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } } });