This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 9419ba84aa compresses migrations used in metadata table scans (#5627)
9419ba84aa is described below

commit 9419ba84aa8131ecae3505fe772275fcf2f90b6f
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Jun 12 14:55:34 2025 -0400

    compresses migrations used in metadata table scans (#5627)
    
    The manager sends the set of migrations to each metadata tablet server
    when it scans the metadata table. For a large set of migrations and many
    metadata tablet servers the cost of repeatedly sending the same data
    adds up. This change compresses the migrations once and then each rpc
    to a tserver sends the compressed data.  Decompression will happen in
    parallel on the tservers.  The one time cost of compression should
    eventually be worthwhile because of the reduced amount of data sent from
    manager to many tservers.
    
    A much better way to solve this overall problem is #5416, but that would
    be a very disruptive to change to make in 2.1. This change and #5626
    can improve throughput for this situation in 2.1 w/ minor changes.
    
    
    Co-authored-by: Dave Marion <dlmar...@apache.org>
---
 core/pom.xml                                       |   6 +
 .../org/apache/accumulo/core/conf/Property.java    |   3 +
 .../apache/accumulo/core/conf/PropertyType.java    |  32 ++++
 .../accumulo/core/conf/PropertyTypeTest.java       |  11 ++
 pom.xml                                            |   1 +
 .../server/iterators/ServerIteratorOptions.java    | 139 ++++++++++++++++
 .../server/manager/state/MetaDataTableScanner.java |  16 +-
 .../manager/state/TabletStateChangeIterator.java   | 102 ++++++++----
 .../iterators/ServerIteratorOptionsTest.java       | 185 +++++++++++++++++++++
 .../state/TabletStateChangeIteratorTest.java       |  88 ++++++++++
 .../apache/accumulo/manager/state/MergeStats.java  |   4 +-
 .../accumulo/manager/tableOps/delete/CleanUp.java  |   3 +-
 .../functional/TabletStateChangeIteratorIT.java    |  35 +++-
 13 files changed, 576 insertions(+), 49 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 327a02ea42..b3166516d6 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -184,6 +184,12 @@
       <artifactId>junit-jupiter-api</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
+      <version>1.7.1</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <testResources>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index adfd95a66b..1d8a04cfc3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -338,6 +338,9 @@ public enum Property {
           + " was changed and it now can accept multiple class names. The 
metrics spi was introduced in 2.1.3,"
           + " the deprecated factory is 
org.apache.accumulo.core.metrics.MeterRegistryFactory.",
       "2.1.0"),
+  
GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO("general.server.iter.opts.compression",
 "none",
+      PropertyType.COMPRESSION_TYPE,
+      "Compression algorithm name to use for server-side iterator options 
compression.", "2.1.4"),
   
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval",
 "0",
       PropertyType.TIMEDURATION,
       "Interval at which the Manager and TabletServer should verify their 
server locks. A value of zero"
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java 
b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index 9286381782..eb01f3acfb 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -31,8 +31,11 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
 import org.apache.commons.lang3.Range;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.Compressor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,6 +134,9 @@ public enum PropertyType {
       "A list of fully qualified java class names representing classes on the 
classpath.\n"
           + "An example is 'java.lang.String', rather than 'String'"),
 
+  COMPRESSION_TYPE("compression type name", new ValidCompressionType(),
+      "One of the configured compression types."),
+
   DURABILITY("durability", in(false, null, "default", "none", "log", "flush", 
"sync"),
       "One of 'none', 'log', 'flush' or 'sync'."),
 
@@ -251,6 +257,32 @@ public enum PropertyType {
         || (suffixCheck.test(x) && new Bounds(lowerBound, 
upperBound).test(stripUnits.apply(x)));
   }
 
+  private static class ValidCompressionType implements Predicate<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValidCompressionType.class);
+
+    @Override
+    public boolean test(String type) {
+      if (!Compression.getSupportedAlgorithms().contains(type)) {
+        return false;
+      }
+      CompressionAlgorithm ca = 
Compression.getCompressionAlgorithmByName(type);
+      Compressor c = null;
+      try {
+        c = ca.getCompressor();
+        return true;
+      } catch (RuntimeException e) {
+        LOG.error("Error creating compressor for type {}", type, e);
+        return false;
+      } finally {
+        if (c != null) {
+          ca.returnCompressor(c);
+        }
+      }
+    }
+
+  }
+
   private static final Pattern SUFFIX_REGEX = Pattern.compile("[^\\d]*$");
   private static final Function<String,String> stripUnits =
       x -> x == null ? null : SUFFIX_REGEX.matcher(x.trim()).replaceAll("");
diff --git 
a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java 
b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
index af2fa3ab1b..59085382b3 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTypeTest.java
@@ -231,4 +231,15 @@ public class PropertyTypeTest extends WithTestNames {
     invalid(null, "", "AL L", " ALL", "non import", "     ");
   }
 
+  @Test
+  public void testTypeCOMPRESSION_TYPE() {
+    valid("none", "gz", "lz4", "snappy");
+    // The following are valid at runtime with the correct configuration
+    //
+    // bzip2 java implementation does not implement Compressor/Decompressor, 
requires native
+    // lzo not included in implementation due to license issues, but can be 
added by user
+    // zstd requires hadoop native libraries built with zstd support
+    //
+    invalid(null, "", "bzip2", "lzo", "zstd");
+  }
 }
diff --git a/pom.xml b/pom.xml
index a46d68dfd2..dd1d11e7e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -991,6 +991,7 @@
                 <!-- This should be removed upon completion of migrating junit 
4 to 5 -->
                 <unused>org.junit.vintage:junit-vintage-engine:jar:*</unused>
                 <unused>org.junit.jupiter:junit-jupiter-engine:jar:*</unused>
+                <unused>org.lz4:lz4-java:jar:*</unused>
               </ignoredUnusedDeclaredDependencies>
             </configuration>
           </execution>
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
 
b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
new file mode 100644
index 0000000000..d0f753b68a
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/iterators/ServerIteratorOptions.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Base64;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
+import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import com.google.common.base.Preconditions;
+
+public class ServerIteratorOptions {
+  static final String COMPRESSION_ALGO = "__COMPRESSION_ALGO";
+
+  private static final String NONE = new NoCompression().getName();
+
+  public interface Serializer {
+    void serialize(DataOutput dataOutput) throws IOException;
+  }
+
+  public static void compressOption(final AccumuloConfiguration config,
+      IteratorSetting iteratorSetting, String option, String value) {
+    final String algo = 
config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO);
+    setAlgo(iteratorSetting, algo);
+
+    if (algo.equals(NONE)) {
+      iteratorSetting.addOption(option, value);
+    } else {
+      compressOption(config, iteratorSetting, option, dataOutput -> {
+        byte[] bytes = value.getBytes(UTF_8);
+        dataOutput.writeInt(bytes.length);
+        dataOutput.write(bytes);
+      });
+    }
+  }
+
+  public static void compressOption(final AccumuloConfiguration config,
+      IteratorSetting iteratorSetting, String option, Serializer serializer) {
+    final String algo = 
config.get(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO);
+    final CompressionAlgorithm ca = 
Compression.getCompressionAlgorithmByName(algo);
+    final Compressor c = ca.getCompressor();
+
+    setAlgo(iteratorSetting, algo);
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
DataOutputStream dos =
+        new DataOutputStream(ca.createCompressionStream(baos, c, 32 * 1024))) {
+      serializer.serialize(dos);
+      dos.close();
+      var val = Base64.getEncoder().encodeToString(baos.toByteArray());
+      iteratorSetting.addOption(option, val);
+    } catch (IOException ioe) {
+      throw new UncheckedIOException(ioe);
+    } finally {
+      ca.returnCompressor(c);
+    }
+  }
+
+  private static void setAlgo(IteratorSetting iteratorSetting, String algo) {
+    if (iteratorSetting.getOptions().containsKey(COMPRESSION_ALGO)) {
+      
Preconditions.checkArgument(iteratorSetting.getOptions().get(COMPRESSION_ALGO).equals(algo));
+    } else {
+      iteratorSetting.addOption(COMPRESSION_ALGO, algo);
+    }
+  }
+
+  public interface Deserializer<T> {
+    T deserialize(DataInputStream dataInput) throws IOException;
+  }
+
+  public static String decompressOption(Map<String,String> options, String 
option) {
+    var algo = options.getOrDefault(COMPRESSION_ALGO, NONE);
+    if (algo.equals(NONE)) {
+      return options.get(option);
+    }
+
+    return decompressOption(options, option, dataInput -> {
+      int len = dataInput.readInt();
+      byte[] data = new byte[len];
+      dataInput.readFully(data);
+      return new String(data, UTF_8);
+    });
+  }
+
+  public static <T> T decompressOption(Map<String,String> options, String 
option,
+      Deserializer<T> deserializer) {
+    var val = options.get(option);
+    if (val == null) {
+      try {
+        return deserializer.deserialize(null);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+    var algo = options.getOrDefault(COMPRESSION_ALGO, NONE);
+    final byte[] data = Base64.getDecoder().decode(val);
+    final CompressionAlgorithm ca = 
Compression.getCompressionAlgorithmByName(algo);
+    final Decompressor d = ca.getDecompressor();
+    try (ByteArrayInputStream baos = new ByteArrayInputStream(data); 
DataInputStream dais =
+        new DataInputStream(ca.createDecompressionStream(baos, d, 256 * 
1024))) {
+      return deserializer.deserialize(dais);
+    } catch (IOException ioe) {
+      throw new UncheckedIOException(ioe);
+    } finally {
+      ca.returnDecompressor(d);
+    }
+  }
+}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java
index bee304c3e2..a137087d49 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -81,13 +82,13 @@ public class MetaDataTableScanner implements 
ClosableIterator<TabletLocationStat
       throw new IllegalStateException("Metadata table " + tableName + " should 
exist", e);
     }
     cleanable = CleanerUtil.unclosed(this, MetaDataTableScanner.class, closed, 
log, mdScanner);
-    configureScanner(mdScanner, state, level);
+    configureScanner(context.getConfiguration(), mdScanner, state, level);
     mdScanner.setRanges(Collections.singletonList(range));
     iter = mdScanner.iterator();
   }
 
-  public static void configureScanner(ScannerBase scanner, CurrentState state,
-      DataLevel dataLevel) {
+  public static void configureScanner(AccumuloConfiguration aconf, ScannerBase 
scanner,
+      CurrentState state, DataLevel dataLevel) {
     TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
     scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
     scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
@@ -99,12 +100,13 @@ public class MetaDataTableScanner implements 
ClosableIterator<TabletLocationStat
     IteratorSetting tabletChange =
         new IteratorSetting(1001, "tabletChange", 
TabletStateChangeIterator.class);
     if (state != null) {
-      TabletStateChangeIterator.setCurrentServers(tabletChange, 
state.onlineTabletServers());
-      TabletStateChangeIterator.setOnlineTables(tabletChange, 
state.onlineTables());
+      TabletStateChangeIterator.setCurrentServers(aconf, tabletChange, 
state.onlineTabletServers());
+      TabletStateChangeIterator.setOnlineTables(aconf, tabletChange, 
state.onlineTables());
       TabletStateChangeIterator.setMerges(tabletChange, state.merges());
-      TabletStateChangeIterator.setMigrations(tabletChange, 
state.migrationsSnapshot(dataLevel));
+      TabletStateChangeIterator.setMigrations(aconf, tabletChange,
+          state.migrationsSnapshot(dataLevel));
       TabletStateChangeIterator.setManagerState(tabletChange, 
state.getManagerState());
-      TabletStateChangeIterator.setShuttingDown(tabletChange, 
state.shutdownServers());
+      TabletStateChangeIterator.setShuttingDown(aconf, tabletChange, 
state.shutdownServers());
     }
     scanner.addScanIterator(tabletChange);
   }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java
index 42fd9cb05c..a14e78af05 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateChangeIterator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.server.manager.state;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
@@ -43,6 +45,7 @@ import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletLocationState;
 import 
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
 import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.server.iterators.ServerIteratorOptions;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.slf4j.Logger;
@@ -56,7 +59,9 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
   private static final String TABLES_OPTION = "tables";
   private static final String MERGES_OPTION = "merges";
   private static final String DEBUG_OPTION = "debug";
-  private static final String MIGRATIONS_OPTION = "migrations";
+  static final String MIGRATIONS_OPTION = "migrations";
+  // this was added in 2.1.4
+  private static final String MIGRATIONS_COUNT_OPTION = "migrationsCount";
   private static final String MANAGER_STATE_OPTION = "managerState";
   private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
   private static final Logger log = 
LoggerFactory.getLogger(TabletStateChangeIterator.class);
@@ -72,11 +77,11 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
   public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
       IteratorEnvironment env) throws IOException {
     super.init(source, options, env);
-    current = parseServers(options.get(SERVERS_OPTION));
-    onlineTables = parseTableIDs(options.get(TABLES_OPTION));
+    current = parseServers(ServerIteratorOptions.decompressOption(options, 
SERVERS_OPTION));
+    onlineTables = 
parseTableIDs(ServerIteratorOptions.decompressOption(options, TABLES_OPTION));
     merges = parseMerges(options.get(MERGES_OPTION));
     debug = options.containsKey(DEBUG_OPTION);
-    migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+    migrations = parseMigrations(options);
     try {
       managerState = ManagerState.valueOf(options.get(MANAGER_STATE_OPTION));
     } catch (Exception ex) {
@@ -84,28 +89,53 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
         log.error("Unable to decode managerState {}", 
options.get(MANAGER_STATE_OPTION));
       }
     }
-    Set<TServerInstance> shuttingDown = 
parseServers(options.get(SHUTTING_DOWN_OPTION));
+    Set<TServerInstance> shuttingDown =
+        parseServers(ServerIteratorOptions.decompressOption(options, 
SHUTTING_DOWN_OPTION));
     if (current != null && shuttingDown != null) {
       current.removeAll(shuttingDown);
     }
   }
 
-  private Set<KeyExtent> parseMigrations(String migrations) {
-    if (migrations == null) {
+  static Set<KeyExtent> parseMigrations(Map<String,String> options) {
+    String countStr = options.get(MIGRATIONS_COUNT_OPTION);
+    if (countStr != null) {
+      // this was created w/ 2.1.4 or newer so use the new decoding method 
that supports compression
+      int count = Integer.parseInt(countStr);
+      return ServerIteratorOptions.decompressOption(options, MIGRATIONS_OPTION,
+          dataInput -> decodeMigrations(dataInput, count));
+    } else {
+      // assume this was created by a 2.1.3 manager
+      String migrations = options.get(MIGRATIONS_OPTION);
+      if (migrations == null) {
+        return Collections.emptySet();
+      }
+      try {
+        Set<KeyExtent> result = new HashSet<>();
+        DataInputBuffer buffer = new DataInputBuffer();
+        byte[] data = Base64.getDecoder().decode(migrations);
+        buffer.reset(data, data.length);
+        while (buffer.available() > 0) {
+          result.add(KeyExtent.readFrom(buffer));
+        }
+        return result;
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+  }
+
+  static Set<KeyExtent> decodeMigrations(DataInputStream input, int count) 
throws IOException {
+    if (input == null) {
       return Collections.emptySet();
     }
-    try {
-      Set<KeyExtent> result = new HashSet<>();
-      DataInputBuffer buffer = new DataInputBuffer();
-      byte[] data = Base64.getDecoder().decode(migrations);
-      buffer.reset(data, data.length);
-      while (buffer.available() > 0) {
-        result.add(KeyExtent.readFrom(buffer));
-      }
-      return result;
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
+    Set<KeyExtent> result = new HashSet<>();
+    for (int i = 0; i < count; i++) {
+      // need a count and cannot use InputStream.available() because its 
behavior is not reliable
+      // across InputStream impls
+      result.add(KeyExtent.readFrom(input));
     }
+    return result;
   }
 
   private Set<TableId> parseTableIDs(String tableIDs) {
@@ -231,19 +261,23 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
     throw new UnsupportedOperationException();
   }
 
-  public static void setCurrentServers(IteratorSetting cfg, 
Set<TServerInstance> goodServers) {
+  public static void setCurrentServers(AccumuloConfiguration aconf, 
IteratorSetting cfg,
+      Set<TServerInstance> goodServers) {
     if (goodServers != null) {
       List<String> servers = new ArrayList<>();
       for (TServerInstance server : goodServers) {
         servers.add(server.getHostPortSession());
       }
-      cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
+      ServerIteratorOptions.compressOption(aconf, cfg, SERVERS_OPTION,
+          Joiner.on(",").join(servers));
     }
   }
 
-  public static void setOnlineTables(IteratorSetting cfg, Set<TableId> 
onlineTables) {
+  public static void setOnlineTables(AccumuloConfiguration aconf, 
IteratorSetting cfg,
+      Set<TableId> onlineTables) {
     if (onlineTables != null) {
-      cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
+      ServerIteratorOptions.compressOption(aconf, cfg, TABLES_OPTION,
+          Joiner.on(",").join(onlineTables));
     }
   }
 
@@ -253,6 +287,7 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
       for (MergeInfo info : merges) {
         KeyExtent extent = info.getExtent();
         if (extent != null && !info.getState().equals(MergeState.NONE)) {
+
           info.write(buffer);
         }
       }
@@ -264,28 +299,25 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
     cfg.addOption(MERGES_OPTION, encoded);
   }
 
-  public static void setMigrations(IteratorSetting cfg, Collection<KeyExtent> 
migrations) {
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    try {
+  public static void setMigrations(AccumuloConfiguration aconf, 
IteratorSetting cfg,
+      Collection<KeyExtent> migrations) {
+    cfg.addOption(MIGRATIONS_COUNT_OPTION, migrations.size() + "");
+    ServerIteratorOptions.compressOption(aconf, cfg, MIGRATIONS_OPTION, 
dataOutput -> {
       for (KeyExtent extent : migrations) {
-        extent.writeTo(buffer);
+        extent.writeTo(dataOutput);
       }
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    String encoded =
-        Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), 
buffer.getLength()));
-    cfg.addOption(MIGRATIONS_OPTION, encoded);
+    });
   }
 
   public static void setManagerState(IteratorSetting cfg, ManagerState state) {
     cfg.addOption(MANAGER_STATE_OPTION, state.toString());
   }
 
-  public static void setShuttingDown(IteratorSetting cfg, Set<TServerInstance> 
servers) {
+  public static void setShuttingDown(AccumuloConfiguration aconf, 
IteratorSetting cfg,
+      Set<TServerInstance> servers) {
     if (servers != null) {
-      cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
+      ServerIteratorOptions.compressOption(aconf, cfg, SHUTTING_DOWN_OPTION,
+          Joiner.on(",").join(servers));
     }
   }
-
 }
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
new file mode 100644
index 0000000000..fa20c6d169
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/iterators/ServerIteratorOptionsTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.base.Strings;
+
+public class ServerIteratorOptionsTest {
+  @Test
+  public void testStringNone() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"none"))
+        .build();
+
+    String v1 = Strings.repeat("a", 100_000);
+    String v2 = Strings.repeat("b", 110_000);
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1);
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2);
+
+    assertEquals(3, iterSetting.getOptions().size());
+
+    // should not copy string when compression type is none
+    assertSame(v1, iterSetting.getOptions().get("k1"));
+    assertSame(v2, iterSetting.getOptions().get("k2"));
+    assertEquals("none", 
iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO));
+
+    // should not copy string when compression type is none
+    assertSame(v1, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1"));
+    assertSame(v2, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2"));
+  }
+
+  @Test
+  public void testStringCompress() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"gz"))
+        .build();
+
+    String v1 = Strings.repeat("a", 100_000);
+    String v2 = Strings.repeat("b", 110_000);
+
+    assertEquals(100_000, v1.length());
+    assertEquals(110_000, v2.length());
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", v1);
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k2", v2);
+
+    assertEquals(3, iterSetting.getOptions().size());
+
+    // the stored value should be much smaller
+    assertTrue(iterSetting.getOptions().get("k1").length() < 1000);
+    assertTrue(iterSetting.getOptions().get("k2").length() < 1000);
+    assertEquals("gz", 
iterSetting.getOptions().get(ServerIteratorOptions.COMPRESSION_ALGO));
+
+    // should not copy string when compression type is none
+    assertEquals(v1, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1"));
+    assertEquals(v2, 
ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k2"));
+  }
+
+  @Test
+  public void testSerializeNone() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"none"))
+        .build();
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    Set<KeyExtent> extents = new HashSet<>();
+    TableId tableId = TableId.of("1234");
+
+    for (int i = 1; i < 100_000; i++) {
+      var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)),
+          new Text(String.format("%10d", i - 1)));
+      extents.add(extent);
+    }
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput 
-> {
+      dataOutput.writeInt(extents.size());
+      for (var extent : extents) {
+        extent.writeTo(dataOutput);
+      }
+    });
+
+    // expected minimum size of data, will be larger
+    int expMinSize = 100_000 * (4 + 10 + 10);
+    System.out.println(iterSetting.getOptions().get("k1").length());
+    assertTrue(iterSetting.getOptions().get("k1").length() > expMinSize);
+
+    Set<KeyExtent> extents2 =
+        ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1", 
dataInput -> {
+          int num = dataInput.readInt();
+          HashSet<KeyExtent> es = new HashSet<KeyExtent>();
+          for (int i = 0; i < num; i++) {
+            es.add(KeyExtent.readFrom(dataInput));
+          }
+          return es;
+        });
+
+    assertEquals(extents, extents2);
+    assertNotSame(extents, extents2);
+  }
+
+  @Test
+  public void testSerializeGz() {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
"gz"))
+        .build();
+
+    IteratorSetting iterSetting = new IteratorSetting(100, "ti", 
"TestIter.class");
+
+    Set<KeyExtent> extents = new HashSet<>();
+    TableId tableId = TableId.of("1234");
+
+    for (int i = 1; i < 100_000; i++) {
+      var extent = new KeyExtent(tableId, new Text(String.format("%10d", i)),
+          new Text(String.format("%10d", i - 1)));
+      extents.add(extent);
+    }
+
+    ServerIteratorOptions.compressOption(aconf, iterSetting, "k1", dataOutput 
-> {
+      dataOutput.writeInt(extents.size());
+      for (var extent : extents) {
+        extent.writeTo(dataOutput);
+      }
+    });
+
+    // expected minimum size of data
+    int expMinSize = 100_000 * (4 + 10 + 10);
+    System.out.println(iterSetting.getOptions().get("k1").length());
+    // should be smaller than the expected min size because of compression
+    assertTrue(iterSetting.getOptions().get("k1").length() < expMinSize);
+
+    Set<KeyExtent> extents2 =
+        ServerIteratorOptions.decompressOption(iterSetting.getOptions(), "k1", 
dataInput -> {
+          int num = dataInput.readInt();
+          HashSet<KeyExtent> es = new HashSet<KeyExtent>();
+          for (int i = 0; i < num; i++) {
+            es.add(KeyExtent.readFrom(dataInput));
+          }
+          return es;
+        });
+
+    assertEquals(extents, extents2);
+  }
+}
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletStateChangeIteratorTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletStateChangeIteratorTest.java
new file mode 100644
index 0000000000..262d43760b
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletStateChangeIteratorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.manager.state;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+public class TabletStateChangeIteratorTest {
+
+  // This is the algorithm used for encoding prior to 2.1.4. Duplicated here 
to test compatibility.
+  private Map<String,String> oldEncode(Collection<KeyExtent> migrations) {
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    try {
+      for (KeyExtent extent : migrations) {
+        extent.writeTo(buffer);
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    return Map.of(TabletStateChangeIterator.MIGRATIONS_OPTION,
+        Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), 
buffer.getLength())));
+  }
+
+  @Test
+  public void testMigrations() {
+    KeyExtent ke1 = new KeyExtent(TableId.of("1234"), null, null);
+    KeyExtent ke2 = new KeyExtent(TableId.of("7890"), new Text("abc"), null);
+
+    testMigrations(Set.of(), "none");
+    testMigrations(Set.of(), "gz");
+    testMigrations(Set.of(ke1, ke2), "none");
+    testMigrations(Set.of(ke1, ke2), "gz");
+
+    // when nothing is set for migrations in options map should return empty 
set
+    assertEquals(Set.of(), 
TabletStateChangeIterator.parseMigrations(Map.of()));
+  }
+
+  private void testMigrations(Set<KeyExtent> migrations, String compAlgo) {
+    var aconf = SiteConfiguration.empty()
+        .withOverrides(
+            
Map.of(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO.getKey(), 
compAlgo))
+        .build();
+    IteratorSetting iteratorSetting = new IteratorSetting(100, "myiter", 
"MyIter.class");
+    TabletStateChangeIterator.setMigrations(aconf, iteratorSetting, 
migrations);
+    var migrations2 = 
TabletStateChangeIterator.parseMigrations(iteratorSetting.getOptions());
+    assertEquals(migrations, migrations2);
+    assertNotSame(migrations, migrations2);
+
+    if (compAlgo.equals("none")) {
+      // simulate 2.1.3 sending data
+      var options = oldEncode(migrations);
+      var migrations3 = TabletStateChangeIterator.parseMigrations(options);
+      assertEquals(migrations, migrations3);
+      assertNotSame(migrations, migrations3);
+    }
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
index c6ef3e7c46..e71f5a50b0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
@@ -212,7 +213,8 @@ public class MergeStats {
     KeyExtent extent = info.getExtent();
     Scanner scanner = accumuloClient
         .createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, 
Authorizations.EMPTY);
-    MetaDataTableScanner.configureScanner(scanner, manager, 
DataLevel.of(extent.tableId()));
+    MetaDataTableScanner.configureScanner(((ClientContext) 
accumuloClient).getConfiguration(),
+        scanner, manager, DataLevel.of(extent.tableId()));
     Text start = extent.prevEndRow();
     if (start == null) {
       start = new Text();
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
index 71dea7b703..e4449985ad 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java
@@ -92,7 +92,8 @@ class CleanUp extends ManagerRepo {
     boolean done = true;
     Range tableRange = new KeyExtent(tableId, null, null).toMetaRange();
     Scanner scanner = manager.getContext().createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
-    MetaDataTableScanner.configureScanner(scanner, manager, 
DataLevel.of(tableId));
+    MetaDataTableScanner.configureScanner(manager.getConfiguration(), scanner, 
manager,
+        DataLevel.of(tableId));
     scanner.setRange(tableRange);
 
     for (Entry<Key,Value> entry : scanner) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
index 969d5d7cac..64e167c902 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.time.Duration;
@@ -47,6 +48,7 @@ import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
@@ -64,12 +66,16 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.manager.state.CurrentState;
 import org.apache.accumulo.server.manager.state.MergeInfo;
 import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
 import org.apache.accumulo.server.manager.state.TabletStateChangeIterator;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,17 +85,33 @@ import com.google.common.collect.Sets;
  * Test to ensure that the {@link TabletStateChangeIterator} properly skips 
over tablet information
  * in the metadata table when there is no work to be done on the tablet (see 
ACCUMULO-3580)
  */
+@Tag(MINI_CLUSTER_ONLY)
 public class TabletStateChangeIteratorIT extends AccumuloClusterHarness {
   private final static Logger log = 
LoggerFactory.getLogger(TabletStateChangeIteratorIT.class);
 
+  private String cType = null;
+
   @Override
   protected Duration defaultTimeout() {
     return Duration.ofMinutes(3);
   }
 
-  @Test
-  public void test() throws AccumuloException, AccumuloSecurityException, 
TableExistsException,
-      TableNotFoundException {
+  @Override
+  public void setupCluster() throws Exception {
+    // Overriding to *not* start cluster before test, we are going to do it 
manually
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.GENERAL_SERVER_ITERATOR_OPTIONS_COMPRESSION_ALGO, 
cType);
+  }
+
+  @ParameterizedTest()
+  @ValueSource(strings = {"none", "gz", "snappy"})
+  public void test(String compressionType) throws Exception {
+
+    cType = compressionType;
+    super.setupCluster();
 
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 
@@ -155,6 +177,8 @@ public class TabletStateChangeIteratorIT extends 
AccumuloClusterHarness {
 
       // clean up
       dropTables(client, t1, t2, t3, metaCopy1, metaCopy2, metaCopy3);
+    } finally {
+      super.teardownCluster();
     }
   }
 
@@ -209,7 +233,8 @@ public class TabletStateChangeIteratorIT extends 
AccumuloClusterHarness {
       if (table.equals(RootTable.NAME)) {
         currentDataLevel = DataLevel.METADATA;
       }
-      MetaDataTableScanner.configureScanner(scanner, state, currentDataLevel);
+      MetaDataTableScanner.configureScanner(((ClientContext) 
client).getConfiguration(), scanner,
+          state, currentDataLevel);
       log.debug("Current state = {}", state);
       scanner.updateScanIteratorOption("tabletChange", "debug", "1");
       for (Entry<Key,Value> e : scanner) {


Reply via email to