This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 908105b Refactor LogSorter to use config (#2191) 908105b is described below commit 908105bc8fddd64aa2d5907c22ccf2a232209306 Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed Jul 7 07:29:44 2021 -0400 Refactor LogSorter to use config (#2191) * Refactor LogSorter writeBuffer to allow use of system configuration when writing out sorted rfiles. This will use configured settings on the sorted files instead of only the defaults. --- .../java/org/apache/accumulo/tserver/log/LogSorter.java | 13 ++++++------- .../apache/accumulo/tserver/log/SortedLogRecoveryTest.java | 6 ++++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 293ff82..bd299af 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -32,7 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -146,7 +145,7 @@ public class LogSorter { // Creating a 'finished' marker will cause recovery to proceed normally and the // empty file will be correctly ignored downstream. fs.mkdirs(new Path(destPath)); - writeBuffer(context, destPath, Collections.emptyList(), part++); + writeBuffer(destPath, Collections.emptyList(), part++); fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); return; } @@ -164,10 +163,10 @@ public class LogSorter { value.readFields(decryptingInput); buffer.add(new Pair<>(key, value)); } - writeBuffer(context, destPath, buffer, part++); + writeBuffer(destPath, buffer, part++); buffer.clear(); } catch (EOFException ex) { - writeBuffer(context, destPath, buffer, part++); + writeBuffer(destPath, buffer, part++); break; } } @@ -215,8 +214,8 @@ public class LogSorter { } @VisibleForTesting - public static void writeBuffer(ServerContext context, String destPath, - List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException { + void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part) + throws IOException { String filename = String.format("part-r-%05d.rf", part); Path path = new Path(destPath, filename); FileSystem fs = context.getVolumeManager().getFileSystemByPath(path); @@ -238,7 +237,7 @@ public class LogSorter { try (var writer = FileOperations.getInstance().newWriterBuilder() .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService()) - .withTableConfiguration(DefaultConfiguration.getInstance()).build()) { + .withTableConfiguration(conf).build()) { writer.startDefaultLocalityGroup(); for (var entry : keyListMap.entrySet()) { LogFileValue val = new LogFileValue(); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index 2121725..582b9bc 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -79,6 +79,7 @@ public class SortedLogRecoveryTest { static final Text cq = new Text("cq"); static final Value value = new Value("value"); static ServerContext context; + static LogSorter logSorter; @Rule public TemporaryFolder tempFolder = @@ -87,6 +88,7 @@ public class SortedLogRecoveryTest { @Before public void setup() { context = EasyMock.createMock(ServerContext.class); + logSorter = new LogSorter(context, DefaultConfiguration.getInstance()); } static class KeyValue implements Comparable<KeyValue> { @@ -179,11 +181,11 @@ public class SortedLogRecoveryTest { for (KeyValue pair : entry.getValue()) { buffer.add(new Pair<>(pair.key, pair.value)); if (buffer.size() >= bufferSize) { - LogSorter.writeBuffer(context, destPath, buffer, parts++); + logSorter.writeBuffer(destPath, buffer, parts++); buffer.clear(); } } - LogSorter.writeBuffer(context, destPath, buffer, parts); + logSorter.writeBuffer(destPath, buffer, parts); ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); dirs.add(new Path(destPath));