This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 9419ba84aa compresses migrations used in metadata table scans (#5627) 9419ba84aa is described below commit 9419ba84aa8131ecae3505fe772275fcf2f90b6f Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Jun 12 14:55:34 2025 -0400 compresses migrations used in metadata table scans (#5627) The manager sends the set of migrations to each metadata tablet server when it scans the metadata table. For a large set of migrations and many metadata tablet servers the cost of repeatedly sending the same data adds up. This change compresses the migrations once and then each rpc to a tserver sends the compressed data. Decompression will happen in parallel on the tservers. The one time cost of compression should eventually be worthwhile because of the reduced amount of data sent from manager to many tservers. A much better way to solve this overall problem is #5416, but that would be a very disruptive to change to make in 2.1. This change and #5626 can improve throughput for this situation in 2.1 w/ minor changes. Co-authored-by: Dave Marion <dlmar...@apache.org> --- core/pom.xml | 6 + .../org/apache/accumulo/core/conf/Property.java | 3 + .../apache/accumulo/core/conf/PropertyType.java | 32 ++++ .../accumulo/core/conf/PropertyTypeTest.java | 11 ++ pom.xml | 1 + .../server/iterators/ServerIteratorOptions.java | 139 ++++++++++++++++ .../server/manager/state/MetaDataTableScanner.java | 16 +- .../manager/state/TabletStateChangeIterator.java | 102 ++++++++---- .../iterators/ServerIteratorOptionsTest.java | 185 +++++++++++++++++++++ .../state/TabletStateChangeIteratorTest.java | 88 ++++++++++ .../apache/accumulo/manager/state/MergeStats.java | 4 +- .../accumulo/manager/tableOps/delete/CleanUp.java | 3 +- .../functional/TabletStateChangeIteratorIT.java | 35 +++- 13 files changed, 576 insertions(+), 49 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 327a02ea42..b3166516d6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -184,6 +184,12 @@ <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>1.7.1</version> + <scope>test</scope> + </dependency> </dependencies> <build> <testResources> diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index adfd95a66b..1d8a04cfc3 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -338,6 +338,9 @@ public enum Property { + " was changed and it now can accept multiple class names. The metrics spi was introduced in 2.1.3," + " the deprecated factory is org.apache.accumulo.core.metrics.MeterRegistryFactory.", "2.1.0"), + GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO("general.server.iter.opts.compression", "none", + PropertyType.COMPRESSION_TYPE, + "Compression algorithm name to use for server-side iterator options compression.", "2.1.4"), GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "0", PropertyType.TIMEDURATION, "Interval at which the Manager and TabletServer should verify their server locks. A value of zero" diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java index 9286381782..eb01f3acfb 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java @@ -31,8 +31,11 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.bcfile.Compression; +import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm; import org.apache.commons.lang3.Range; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.Compressor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,6 +134,9 @@ public enum PropertyType { "A list of fully qualified java class names representing classes on the classpath.\n" + "An example is 'java.lang.String', rather than 'String'"), + COMPRESSION_TYPE("compression type name", new ValidCompressionType(), + "One of the configured compression types."), + DURABILITY("durability", in(false, null, "default", "none", "log", "flush", "sync"), "One of 'none', 'log', 'flush' or 'sync'."), @@ -251,6 +257,32 @@ public enum PropertyType { || (suffixCheck.test(x) && new Bounds(lowerBound, upperBound).test(stripUnits.apply(x))); } + private static class ValidCompressionType implements Predicate<String> { + + private static final Logger LOG = LoggerFactory.getLogger(ValidCompressionType.class); + + @Override + public boolean test(String type) { + if (!Compression.getSupportedAlgorithms().contains(type)) { + return false; + } + CompressionAlgorithm ca = Compression.getCompressionAlgorithmByName(type); + Compressor c = null; + try { + c = ca.getCompressor(); + return true; + } catch (RuntimeException e) { + LOG.error("Error creating compressor for type {}", type, e); + return false; + } finally { + if (c != null) { + ca.returnCompressor(c); + } + } + } + + } + private static final Pattern SUFFIX_REGEX = Pattern.compile("[^\\d]*$"); private static final Function<String,String> stripUnits = x -> x == null ? null : SUFFIX_REGEX.matcher(x.trim()).replaceAll(""); diff --git a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java index af2fa3ab1b..59085382b3 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java @@ -231,4 +231,15 @@ public class PropertyTypeTest extends WithTestNames { invalid(null, "", "AL L", " ALL", "non import", " "); } + @Test + public void testTypeCOMPRESSION_TYPE() { + valid("none", "gz", "lz4", "snappy"); + // The following are valid at runtime with the correct configuration + // + // bzip2 java implementation does not implement Compressor/Decompressor, requires native + // lzo not included in implementation due to license issues, but can be added by user + // zstd requires hadoop native libraries built with zstd support + // + invalid(null, "", "bzip2", "lzo", "zstd"); + } } diff --git a/pom.xml b/pom.xml index a46d68dfd2..dd1d11e7e3 100644 --- a/pom.xml +++ b/pom.xml @@ -991,6 +991,7 @@ <!-- This should be removed upon completion of migrating junit 4 to 5 --> <unused>org.junit.vintage:junit-vintage-engine:jar:*</unused> <unused>org.junit.jupiter:junit-jupiter-engine:jar:*</unused> + <unused>org.lz4:lz4-java:jar:*</unused> </ignoredUnusedDeclaredDependencies> </configuration> </execution> diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java new file mode 100644 index 0000000000..d0f753b68a --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.iterators; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Base64; +import java.util.Map; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.rfile.bcfile.Compression; +import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm; +import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +import com.google.common.base.Preconditions; + +public class ServerIteratorOptions { + static final String COMPRESSION_ALGO = "__COMPRESSION_ALGO"; + + private static final String NONE = new NoCompression().getName(); + + public interface Serializer { + void serialize(DataOutput dataOutput) throws IOException; + } + + public static void compressOption(final AccumuloConfiguration config, + IteratorSetting iteratorSetting, String option, String value) { + final String algo = config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO); + setAlgo(iteratorSetting, algo); + + if (algo.equals(NONE)) { + iteratorSetting.addOption(option, value); + } else { + compressOption(config, iteratorSetting, option, dataOutput -> { + byte[] bytes = value.getBytes(UTF_8); + dataOutput.writeInt(bytes.length); + dataOutput.write(bytes); + }); + } + } + + public static void compressOption(final AccumuloConfiguration config, + IteratorSetting iteratorSetting, String option, Serializer serializer) { + final String algo = config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO); + final CompressionAlgorithm ca = Compression.getCompressionAlgorithmByName(algo); + final Compressor c = ca.getCompressor(); + + setAlgo(iteratorSetting, algo); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = + new DataOutputStream(ca.createCompressionStream(baos, c, 32 * 1024))) { + serializer.serialize(dos); + dos.close(); + var val = Base64.getEncoder().encodeToString(baos.toByteArray()); + iteratorSetting.addOption(option, val); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } finally { + ca.returnCompressor(c); + } + } + + private static void setAlgo(IteratorSetting iteratorSetting, String algo) { + if (iteratorSetting.getOptions().containsKey(COMPRESSION_ALGO)) { + Preconditions.checkArgument(iteratorSetting.getOptions().get(COMPRESSION_ALGO).equals(algo)); + } else { + iteratorSetting.addOption(COMPRESSION_ALGO, algo); + } + } + + public interface Deserializer<T> { + T deserialize(DataInputStream dataInput) throws IOException; + } + + public static String decompressOption(Map<String,String> options, String option) { + var algo = options.getOrDefault(COMPRESSION_ALGO, NONE); + if (algo.equals(NONE)) { + return options.get(option); + } + + return decompressOption(options, option, dataInput -> { + int len = dataInput.readInt(); + byte[] data = new byte[len]; + dataInput.readFully(data); + return new String(data, UTF_8); + }); + } + + public static <T> T decompressOption(Map<String,String> options, String option, + Deserializer<T> deserializer) { + var val = options.get(option); + if (val == null) { + try { + return deserializer.deserialize(null); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + var algo = options.getOrDefault(COMPRESSION_ALGO, NONE); + final byte[] data = Base64.getDecoder().decode(val); + final CompressionAlgorithm ca = Compression.getCompressionAlgorithmByName(algo); + final Decompressor d = ca.getDecompressor(); + try (ByteArrayInputStream baos = new ByteArrayInputStream(data); DataInputStream dais = + new DataInputStream(ca.createDecompressionStream(baos, d, 256 * 1024))) { + return deserializer.deserialize(dais); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } finally { + ca.returnDecompressor(d); + } + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java index bee304c3e2..a137087d49 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -81,13 +82,13 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat throw new IllegalStateException("Metadata table " + tableName + " should exist", e); } cleanable = CleanerUtil.unclosed(this, MetaDataTableScanner.class, closed, log, mdScanner); - configureScanner(mdScanner, state, level); + configureScanner(context.getConfiguration(), mdScanner, state, level); mdScanner.setRanges(Collections.singletonList(range)); iter = mdScanner.iterator(); } - public static void configureScanner(ScannerBase scanner, CurrentState state, - DataLevel dataLevel) { + public static void configureScanner(AccumuloConfiguration aconf, ScannerBase scanner, + CurrentState state, DataLevel dataLevel) { TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME); @@ -99,12 +100,13 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class); if (state != null) { - TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers()); - TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables()); + TabletStateChangeIterator.setCurrentServers(aconf, tabletChange, state.onlineTabletServers()); + TabletStateChangeIterator.setOnlineTables(aconf, tabletChange, state.onlineTables()); TabletStateChangeIterator.setMerges(tabletChange, state.merges()); - TabletStateChangeIterator.setMigrations(tabletChange, state.migrationsSnapshot(dataLevel)); + TabletStateChangeIterator.setMigrations(aconf, tabletChange, + state.migrationsSnapshot(dataLevel)); TabletStateChangeIterator.setManagerState(tabletChange, state.getManagerState()); - TabletStateChangeIterator.setShuttingDown(tabletChange, state.shutdownServers()); + TabletStateChangeIterator.setShuttingDown(aconf, tabletChange, state.shutdownServers()); } scanner.addScanIterator(tabletChange); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java index 42fd9cb05c..a14e78af05 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.server.manager.state; +import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -31,6 +32,7 @@ import java.util.Map; import java.util.Set; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; @@ -43,6 +45,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletLocationState; import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.server.iterators.ServerIteratorOptions; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.slf4j.Logger; @@ -56,7 +59,9 @@ public class TabletStateChangeIterator extends SkippingIterator { private static final String TABLES_OPTION = "tables"; private static final String MERGES_OPTION = "merges"; private static final String DEBUG_OPTION = "debug"; - private static final String MIGRATIONS_OPTION = "migrations"; + static final String MIGRATIONS_OPTION = "migrations"; + // this was added in 2.1.4 + private static final String MIGRATIONS_COUNT_OPTION = "migrationsCount"; private static final String MANAGER_STATE_OPTION = "managerState"; private static final String SHUTTING_DOWN_OPTION = "shuttingDown"; private static final Logger log = LoggerFactory.getLogger(TabletStateChangeIterator.class); @@ -72,11 +77,11 @@ public class TabletStateChangeIterator extends SkippingIterator { public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { super.init(source, options, env); - current = parseServers(options.get(SERVERS_OPTION)); - onlineTables = parseTableIDs(options.get(TABLES_OPTION)); + current = parseServers(ServerIteratorOptions.decompressOption(options, SERVERS_OPTION)); + onlineTables = parseTableIDs(ServerIteratorOptions.decompressOption(options, TABLES_OPTION)); merges = parseMerges(options.get(MERGES_OPTION)); debug = options.containsKey(DEBUG_OPTION); - migrations = parseMigrations(options.get(MIGRATIONS_OPTION)); + migrations = parseMigrations(options); try { managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION)); } catch (Exception ex) { @@ -84,28 +89,53 @@ public class TabletStateChangeIterator extends SkippingIterator { log.error("Unable to decode managerState {}", options.get(MANAGER_STATE_OPTION)); } } - Set<TServerInstance> shuttingDown = parseServers(options.get(SHUTTING_DOWN_OPTION)); + Set<TServerInstance> shuttingDown = + parseServers(ServerIteratorOptions.decompressOption(options, SHUTTING_DOWN_OPTION)); if (current != null && shuttingDown != null) { current.removeAll(shuttingDown); } } - private Set<KeyExtent> parseMigrations(String migrations) { - if (migrations == null) { + static Set<KeyExtent> parseMigrations(Map<String,String> options) { + String countStr = options.get(MIGRATIONS_COUNT_OPTION); + if (countStr != null) { + // this was created w/ 2.1.4 or newer so use the new decoding method that supports compression + int count = Integer.parseInt(countStr); + return ServerIteratorOptions.decompressOption(options, MIGRATIONS_OPTION, + dataInput -> decodeMigrations(dataInput, count)); + } else { + // assume this was created by a 2.1.3 manager + String migrations = options.get(MIGRATIONS_OPTION); + if (migrations == null) { + return Collections.emptySet(); + } + try { + Set<KeyExtent> result = new HashSet<>(); + DataInputBuffer buffer = new DataInputBuffer(); + byte[] data = Base64.getDecoder().decode(migrations); + buffer.reset(data, data.length); + while (buffer.available() > 0) { + result.add(KeyExtent.readFrom(buffer)); + } + return result; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + } + + static Set<KeyExtent> decodeMigrations(DataInputStream input, int count) throws IOException { + if (input == null) { return Collections.emptySet(); } - try { - Set<KeyExtent> result = new HashSet<>(); - DataInputBuffer buffer = new DataInputBuffer(); - byte[] data = Base64.getDecoder().decode(migrations); - buffer.reset(data, data.length); - while (buffer.available() > 0) { - result.add(KeyExtent.readFrom(buffer)); - } - return result; - } catch (Exception ex) { - throw new RuntimeException(ex); + Set<KeyExtent> result = new HashSet<>(); + for (int i = 0; i < count; i++) { + // need a count and cannot use InputStream.available() because its behavior is not reliable + // across InputStream impls + result.add(KeyExtent.readFrom(input)); } + return result; } private Set<TableId> parseTableIDs(String tableIDs) { @@ -231,19 +261,23 @@ public class TabletStateChangeIterator extends SkippingIterator { throw new UnsupportedOperationException(); } - public static void setCurrentServers(IteratorSetting cfg, Set<TServerInstance> goodServers) { + public static void setCurrentServers(AccumuloConfiguration aconf, IteratorSetting cfg, + Set<TServerInstance> goodServers) { if (goodServers != null) { List<String> servers = new ArrayList<>(); for (TServerInstance server : goodServers) { servers.add(server.getHostPortSession()); } - cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers)); + ServerIteratorOptions.compressOption(aconf, cfg, SERVERS_OPTION, + Joiner.on(",").join(servers)); } } - public static void setOnlineTables(IteratorSetting cfg, Set<TableId> onlineTables) { + public static void setOnlineTables(AccumuloConfiguration aconf, IteratorSetting cfg, + Set<TableId> onlineTables) { if (onlineTables != null) { - cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables)); + ServerIteratorOptions.compressOption(aconf, cfg, TABLES_OPTION, + Joiner.on(",").join(onlineTables)); } } @@ -253,6 +287,7 @@ public class TabletStateChangeIterator extends SkippingIterator { for (MergeInfo info : merges) { KeyExtent extent = info.getExtent(); if (extent != null && !info.getState().equals(MergeState.NONE)) { + info.write(buffer); } } @@ -264,28 +299,25 @@ public class TabletStateChangeIterator extends SkippingIterator { cfg.addOption(MERGES_OPTION, encoded); } - public static void setMigrations(IteratorSetting cfg, Collection<KeyExtent> migrations) { - DataOutputBuffer buffer = new DataOutputBuffer(); - try { + public static void setMigrations(AccumuloConfiguration aconf, IteratorSetting cfg, + Collection<KeyExtent> migrations) { + cfg.addOption(MIGRATIONS_COUNT_OPTION, migrations.size() + ""); + ServerIteratorOptions.compressOption(aconf, cfg, MIGRATIONS_OPTION, dataOutput -> { for (KeyExtent extent : migrations) { - extent.writeTo(buffer); + extent.writeTo(dataOutput); } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - String encoded = - Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength())); - cfg.addOption(MIGRATIONS_OPTION, encoded); + }); } public static void setManagerState(IteratorSetting cfg, ManagerState state) { cfg.addOption(MANAGER_STATE_OPTION, state.toString()); } - public static void setShuttingDown(IteratorSetting cfg, Set<TServerInstance> servers) { + public static void setShuttingDown(AccumuloConfiguration aconf, IteratorSetting cfg, + Set<TServerInstance> servers) { if (servers != null) { - cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers)); + ServerIteratorOptions.compressOption(aconf, cfg, SHUTTING_DOWN_OPTION, + Joiner.on(",").join(servers)); } } - } diff --git a/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java new file mode 100644 index 0000000000..fa20c6d169 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.iterators; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +import com.google.common.base.Strings; + +public class ServerIteratorOptionsTest { + @Test + public void testStringNone() { + var aconf = SiteConfiguration.empty() + .withOverrides( + Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), "none")) + .build(); + + String v1 = Strings.repeat("a", 100_000); + String v2 = Strings.repeat("b", 110_000); + + IteratorSetting iterSetting = new IteratorSetting(100, "ti", "TestIter.class"); + + ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1); + ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2); + + assertEquals(3, iterSetting.getOptions().size()); + + // should not copy string when compression type is none + assertSame(v1, iterSetting.getOptions().get("k1")); + assertSame(v2, iterSetting.getOptions().get("k2")); + assertEquals("none", iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO)); + + // should not copy string when compression type is none + assertSame(v1, ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1")); + assertSame(v2, ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2")); + } + + @Test + public void testStringCompress() { + var aconf = SiteConfiguration.empty() + .withOverrides( + Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), "gz")) + .build(); + + String v1 = Strings.repeat("a", 100_000); + String v2 = Strings.repeat("b", 110_000); + + assertEquals(100_000, v1.length()); + assertEquals(110_000, v2.length()); + + IteratorSetting iterSetting = new IteratorSetting(100, "ti", "TestIter.class"); + + ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1); + ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2); + + assertEquals(3, iterSetting.getOptions().size()); + + // the stored value should be much smaller + assertTrue(iterSetting.getOptions().get("k1").length() < 1000); + assertTrue(iterSetting.getOptions().get("k2").length() < 1000); + assertEquals("gz", iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO)); + + // should not copy string when compression type is none + assertEquals(v1, ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1")); + assertEquals(v2, ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2")); + } + + @Test + public void testSerializeNone() { + var aconf = SiteConfiguration.empty() + .withOverrides( + Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), "none")) + .build(); + + IteratorSetting iterSetting = new IteratorSetting(100, "ti", "TestIter.class"); + + Set<KeyExtent> extents = new HashSet<>(); + TableId tableId = TableId.of("1234"); + + for (int i = 1; i < 100_000; i++) { + var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)), + new Text(String.format("%10d", i - 1))); + extents.add(extent); + } + + ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput -> { + dataOutput.writeInt(extents.size()); + for (var extent : extents) { + extent.writeTo(dataOutput); + } + }); + + // expected minimum size of data, will be larger + int expMinSize = 100_000 * (4 + 10 + 10); + System.out.println(iterSetting.getOptions().get("k1").length()); + assertTrue(iterSetting.getOptions().get("k1").length() > expMinSize); + + Set<KeyExtent> extents2 = + ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1", dataInput -> { + int num = dataInput.readInt(); + HashSet<KeyExtent> es = new HashSet<KeyExtent>(); + for (int i = 0; i < num; i++) { + es.add(KeyExtent.readFrom(dataInput)); + } + return es; + }); + + assertEquals(extents, extents2); + assertNotSame(extents, extents2); + } + + @Test + public void testSerializeGz() { + var aconf = SiteConfiguration.empty() + .withOverrides( + Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), "gz")) + .build(); + + IteratorSetting iterSetting = new IteratorSetting(100, "ti", "TestIter.class"); + + Set<KeyExtent> extents = new HashSet<>(); + TableId tableId = TableId.of("1234"); + + for (int i = 1; i < 100_000; i++) { + var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)), + new Text(String.format("%10d", i - 1))); + extents.add(extent); + } + + ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput -> { + dataOutput.writeInt(extents.size()); + for (var extent : extents) { + extent.writeTo(dataOutput); + } + }); + + // expected minimum size of data + int expMinSize = 100_000 * (4 + 10 + 10); + System.out.println(iterSetting.getOptions().get("k1").length()); + // should be smaller than the expected min size because of compression + assertTrue(iterSetting.getOptions().get("k1").length() < expMinSize); + + Set<KeyExtent> extents2 = + ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1", dataInput -> { + int num = dataInput.readInt(); + HashSet<KeyExtent> es = new HashSet<KeyExtent>(); + for (int i = 0; i < num; i++) { + es.add(KeyExtent.readFrom(dataInput)); + } + return es; + }); + + assertEquals(extents, extents2); + } +} diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletStateChangeIteratorTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletStateChangeIteratorTest.java new file mode 100644 index 0000000000..262d43760b --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletStateChangeIteratorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; + +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class TabletStateChangeIteratorTest { + + // This is the algorithm used for encoding prior to 2.1.4. Duplicated here to test compatibility. + private Map<String,String> oldEncode(Collection<KeyExtent> migrations) { + DataOutputBuffer buffer = new DataOutputBuffer(); + try { + for (KeyExtent extent : migrations) { + extent.writeTo(buffer); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + return Map.of(TabletStateChangeIterator.MIGRATIONS_OPTION, + Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength()))); + } + + @Test + public void testMigrations() { + KeyExtent ke1 = new KeyExtent(TableId.of("1234"), null, null); + KeyExtent ke2 = new KeyExtent(TableId.of("7890"), new Text("abc"), null); + + testMigrations(Set.of(), "none"); + testMigrations(Set.of(), "gz"); + testMigrations(Set.of(ke1, ke2), "none"); + testMigrations(Set.of(ke1, ke2), "gz"); + + // when nothing is set for migrations in options map should return empty set + assertEquals(Set.of(), TabletStateChangeIterator.parseMigrations(Map.of())); + } + + private void testMigrations(Set<KeyExtent> migrations, String compAlgo) { + var aconf = SiteConfiguration.empty() + .withOverrides( + Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), compAlgo)) + .build(); + IteratorSetting iteratorSetting = new IteratorSetting(100, "myiter", "MyIter.class"); + TabletStateChangeIterator.setMigrations(aconf, iteratorSetting, migrations); + var migrations2 = TabletStateChangeIterator.parseMigrations(iteratorSetting.getOptions()); + assertEquals(migrations, migrations2); + assertNotSame(migrations, migrations2); + + if (compAlgo.equals("none")) { + // simulate 2.1.3 sending data + var options = oldEncode(migrations); + var migrations3 = TabletStateChangeIterator.parseMigrations(options); + assertEquals(migrations, migrations3); + assertNotSame(migrations, migrations3); + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java index c6ef3e7c46..e71f5a50b0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; @@ -212,7 +213,8 @@ public class MergeStats { KeyExtent extent = info.getExtent(); Scanner scanner = accumuloClient .createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY); - MetaDataTableScanner.configureScanner(scanner, manager, DataLevel.of(extent.tableId())); + MetaDataTableScanner.configureScanner(((ClientContext) accumuloClient).getConfiguration(), + scanner, manager, DataLevel.of(extent.tableId())); Text start = extent.prevEndRow(); if (start == null) { start = new Text(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java index 71dea7b703..e4449985ad 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java @@ -92,7 +92,8 @@ class CleanUp extends ManagerRepo { boolean done = true; Range tableRange = new KeyExtent(tableId, null, null).toMetaRange(); Scanner scanner = manager.getContext().createScanner(MetadataTable.NAME, Authorizations.EMPTY); - MetaDataTableScanner.configureScanner(scanner, manager, DataLevel.of(tableId)); + MetaDataTableScanner.configureScanner(manager.getConfiguration(), scanner, manager, + DataLevel.of(tableId)); scanner.setRange(tableRange); for (Entry<Key,Value> entry : scanner) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java index 969d5d7cac..64e167c902 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.functional; +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Duration; @@ -47,6 +48,7 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; @@ -64,12 +66,16 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.manager.state.CurrentState; import org.apache.accumulo.server.manager.state.MergeInfo; import org.apache.accumulo.server.manager.state.MetaDataTableScanner; import org.apache.accumulo.server.manager.state.TabletStateChangeIterator; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,17 +85,33 @@ import com.google.common.collect.Sets; * Test to ensure that the {@link TabletStateChangeIterator} properly skips over tablet information * in the metadata table when there is no work to be done on the tablet (see ACCUMULO-3580) */ +@Tag(MINI_CLUSTER_ONLY) public class TabletStateChangeIteratorIT extends AccumuloClusterHarness { private final static Logger log = LoggerFactory.getLogger(TabletStateChangeIteratorIT.class); + private String cType = null; + @Override protected Duration defaultTimeout() { return Duration.ofMinutes(3); } - @Test - public void test() throws AccumuloException, AccumuloSecurityException, TableExistsException, - TableNotFoundException { + @Override + public void setupCluster() throws Exception { + // Overriding to *not* start cluster before test, we are going to do it manually + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO, cType); + } + + @ParameterizedTest() + @ValueSource(strings = {"none", "gz", "snappy"}) + public void test(String compressionType) throws Exception { + + cType = compressionType; + super.setupCluster(); try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { @@ -155,6 +177,8 @@ public class TabletStateChangeIteratorIT extends AccumuloClusterHarness { // clean up dropTables(client, t1, t2, t3, metaCopy1, metaCopy2, metaCopy3); + } finally { + super.teardownCluster(); } } @@ -209,7 +233,8 @@ public class TabletStateChangeIteratorIT extends AccumuloClusterHarness { if (table.equals(RootTable.NAME)) { currentDataLevel = DataLevel.METADATA; } - MetaDataTableScanner.configureScanner(scanner, state, currentDataLevel); + MetaDataTableScanner.configureScanner(((ClientContext) client).getConfiguration(), scanner, + state, currentDataLevel); log.debug("Current state = {}", state); scanner.updateScanIteratorOption("tabletChange", "debug", "1"); for (Entry<Key,Value> e : scanner) {