This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 2d8bdf2b21 Removed rate limiters as they weren't really being used 
(#3874)
2d8bdf2b21 is described below

commit 2d8bdf2b217e5b42050af5625e201b3ac398e3d7
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Oct 24 08:44:28 2023 -0400

    Removed rate limiters as they weren't really being used (#3874)
    
    Fixes #3597
---
 .../accumulo/core/client/rfile/RFileScanner.java   |   8 +-
 .../org/apache/accumulo/core/conf/Property.java    |  18 +-
 .../apache/accumulo/core/file/FileOperations.java  |  46 ++---
 .../file/blockfile/impl/CachableBlockFile.java     |  20 +--
 .../accumulo/core/file/rfile/RFileOperations.java  |   6 +-
 .../accumulo/core/file/rfile/SplitLarge.java       |   5 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java    |  24 ++-
 .../core/file/streams/RateLimitedInputStream.java  |  73 --------
 .../core/file/streams/RateLimitedOutputStream.java |  60 -------
 .../accumulo/core/summary/SummaryReader.java       |   6 +-
 .../util/compaction/CompactionServicesConfig.java  |  12 --
 .../core/util/ratelimit/GuavaRateLimiter.java      |  66 -------
 .../core/util/ratelimit/NullRateLimiter.java       |  37 ----
 .../accumulo/core/util/ratelimit/RateLimiter.java  |  29 ----
 .../util/ratelimit/SharedRateLimiterFactory.java   | 193 ---------------------
 .../core/file/rfile/AbstractRFileTest.java         |   2 +-
 .../core/file/rfile/CreateCompatTestFile.java      |   2 +-
 .../core/file/rfile/MultiLevelIndexTest.java       |   2 +-
 .../core/file/rfile/MultiThreadedRFileTest.java    |   2 +-
 .../file/streams/RateLimitedInputStreamTest.java   |  83 ---------
 .../file/streams/RateLimitedOutputStreamTest.java  |  68 --------
 .../accumulo/server/compaction/FileCompactor.java  |  10 +-
 .../org/apache/accumulo/compactor/ExtCEnv.java     |  12 --
 .../apache/accumulo/tserver/tablet/MinCEnv.java    |  11 --
 .../test/compaction/CompactionRateLimitingIT.java  | 110 ------------
 25 files changed, 47 insertions(+), 858 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 79ea0bddfd..c5716fa6c8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -347,10 +347,10 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
 
       for (int i = 0; i < sources.length; i++) {
         // TODO may have been a bug with multiple files and caching in older 
version...
-        FSDataInputStream inputStream = (FSDataInputStream) 
sources[i].getInputStream();
-        CachableBuilder cb =
-            new CachableBuilder().input(inputStream, "source-" + 
i).length(sources[i].getLength())
-                
.conf(opts.in.getConf()).cacheProvider(cacheProvider).cryptoService(cryptoService);
+        CachableBuilder cb = new CachableBuilder()
+            .input((FSDataInputStream) sources[i].getInputStream(), "source-" 
+ i)
+            
.length(sources[i].getLength()).conf(opts.in.getConf()).cacheProvider(cacheProvider)
+            .cryptoService(cryptoService);
         readers.add(RFile.getReader(cb, sources[i].getRange()));
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 53011b7f33..d1938b6281 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -589,11 +589,7 @@ public enum Property {
   
TSERV_COMPACTION_SERVICE_ROOT_PLANNER("tserver.compaction.major.service.root.planner",
       DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
       "Compaction planner for root tablet service", "2.1.0"),
-  
TSERV_COMPACTION_SERVICE_ROOT_RATE_LIMIT("tserver.compaction.major.service.root.rate.limit",
 "0B",
-      PropertyType.BYTES,
-      "Maximum number of bytes to read or write per second over all major"
-          + " compactions in this compaction service, or 0B for unlimited.",
-      "2.1.0"),
+  // ELASTICITY_TODO: Deprecate TSERV_COMPACTION_SERVICE_ROOT_RATE_LIMIT in 3.x
   TSERV_COMPACTION_SERVICE_ROOT_MAX_OPEN(
       "tserver.compaction.major.service.root.planner.opts.maxOpen", "30", 
PropertyType.COUNT,
       "The maximum number of files a compaction will open", "2.1.0"),
@@ -606,11 +602,7 @@ public enum Property {
   
TSERV_COMPACTION_SERVICE_META_PLANNER("tserver.compaction.major.service.meta.planner",
       DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
       "Compaction planner for metadata table", "2.1.0"),
-  
TSERV_COMPACTION_SERVICE_META_RATE_LIMIT("tserver.compaction.major.service.meta.rate.limit",
 "0B",
-      PropertyType.BYTES,
-      "Maximum number of bytes to read or write per second over all major"
-          + " compactions in this compaction service, or 0B for unlimited.",
-      "2.1.0"),
+  // ELASTICITY_TODO: Deprecate TSERV_COMPACTION_SERVICE_META_RATE_LIMIT in 3.x
   TSERV_COMPACTION_SERVICE_META_MAX_OPEN(
       "tserver.compaction.major.service.meta.planner.opts.maxOpen", "30", 
PropertyType.COUNT,
       "The maximum number of files a compaction will open", "2.1.0"),
@@ -623,11 +615,7 @@ public enum Property {
   
TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER("tserver.compaction.major.service.default.planner",
       DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME,
       "Planner for default compaction service.", "2.1.0"),
-  
TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT("tserver.compaction.major.service.default.rate.limit",
-      "0B", PropertyType.BYTES,
-      "Maximum number of bytes to read or write per second over all major"
-          + " compactions in this compaction service, or 0B for unlimited.",
-      "2.1.0"),
+  // ELASTICITY_TODO: Deprecate TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT in 
3.x
   TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN(
       "tserver.compaction.major.service.default.planner.opts.maxOpen", "10", 
PropertyType.COUNT,
       "The maximum number of files a compaction will open", "2.1.0"),
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java 
b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 5182c614aa..6d2df13fd1 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -171,7 +170,6 @@ public abstract class FileOperations {
     public final TabletFile file;
     public final FileSystem fs;
     public final Configuration fsConf;
-    public final RateLimiter rateLimiter;
     // writer only objects
     public final String compression;
     public final FSDataOutputStream outputStream;
@@ -188,15 +186,14 @@ public abstract class FileOperations {
     public final boolean dropCacheBehind;
 
     protected FileOptions(AccumuloConfiguration tableConfiguration, TabletFile 
file, FileSystem fs,
-        Configuration fsConf, RateLimiter rateLimiter, String compression,
-        FSDataOutputStream outputStream, boolean enableAccumuloStart, 
CacheProvider cacheProvider,
-        Cache<String,Long> fileLenCache, boolean seekToBeginning, 
CryptoService cryptoService,
-        Range range, Set<ByteSequence> columnFamilies, boolean inclusive, 
boolean dropCacheBehind) {
+        Configuration fsConf, String compression, FSDataOutputStream 
outputStream,
+        boolean enableAccumuloStart, CacheProvider cacheProvider, 
Cache<String,Long> fileLenCache,
+        boolean seekToBeginning, CryptoService cryptoService, Range range,
+        Set<ByteSequence> columnFamilies, boolean inclusive, boolean 
dropCacheBehind) {
       this.tableConfiguration = tableConfiguration;
       this.file = Objects.requireNonNull(file);
       this.fs = fs;
       this.fsConf = fsConf;
-      this.rateLimiter = rateLimiter;
       this.compression = compression;
       this.outputStream = outputStream;
       this.enableAccumuloStart = enableAccumuloStart;
@@ -226,10 +223,6 @@ public abstract class FileOperations {
       return fsConf;
     }
 
-    public RateLimiter getRateLimiter() {
-      return rateLimiter;
-    }
-
     public String getCompression() {
       return compression;
     }
@@ -279,7 +272,6 @@ public abstract class FileOperations {
     private TabletFile file;
     private FileSystem fs;
     private Configuration fsConf;
-    private RateLimiter rateLimiter;
     private CryptoService cryptoService;
     private boolean dropCacheBehind = false;
 
@@ -303,11 +295,6 @@ public abstract class FileOperations {
       return this;
     }
 
-    protected FileHelper rateLimiter(RateLimiter rateLimiter) {
-      this.rateLimiter = rateLimiter;
-      return this;
-    }
-
     protected FileHelper cryptoService(CryptoService cs) {
       this.cryptoService = Objects.requireNonNull(cs);
       return this;
@@ -320,28 +307,27 @@ public abstract class FileOperations {
 
     protected FileOptions toWriterBuilderOptions(String compression,
         FSDataOutputStream outputStream, boolean startEnabled) {
-      return new FileOptions(tableConfiguration, file, fs, fsConf, 
rateLimiter, compression,
-          outputStream, startEnabled, NULL_PROVIDER, null, false, 
cryptoService, null, null, true,
+      return new FileOptions(tableConfiguration, file, fs, fsConf, 
compression, outputStream,
+          startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, 
true,
           dropCacheBehind);
     }
 
     protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider,
         Cache<String,Long> fileLenCache, boolean seekToBeginning) {
-      return new FileOptions(tableConfiguration, file, fs, fsConf, 
rateLimiter, null, null, false,
+      return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, 
false,
           cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache, 
seekToBeginning,
           cryptoService, null, null, true, dropCacheBehind);
     }
 
     protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> 
fileLenCache) {
-      return new FileOptions(tableConfiguration, file, fs, fsConf, 
rateLimiter, null, null, false,
-          NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true, 
dropCacheBehind);
+      return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, 
false, NULL_PROVIDER,
+          fileLenCache, false, cryptoService, null, null, true, 
dropCacheBehind);
     }
 
     protected FileOptions toScanReaderBuilderOptions(Range range, 
Set<ByteSequence> columnFamilies,
         boolean inclusive) {
-      return new FileOptions(tableConfiguration, file, fs, fsConf, 
rateLimiter, null, null, false,
-          NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, 
inclusive,
-          dropCacheBehind);
+      return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, 
false, NULL_PROVIDER,
+          null, false, cryptoService, range, columnFamilies, inclusive, 
dropCacheBehind);
     }
 
     protected AccumuloConfiguration getTableConfiguration() {
@@ -388,11 +374,6 @@ public abstract class FileOperations {
       return this;
     }
 
-    public WriterBuilder withRateLimiter(RateLimiter rateLimiter) {
-      rateLimiter(rateLimiter);
-      return this;
-    }
-
     public WriterBuilder dropCachesBehind() {
       this.dropCacheBehind(true);
       return this;
@@ -441,11 +422,6 @@ public abstract class FileOperations {
       return this;
     }
 
-    public ReaderBuilder withRateLimiter(RateLimiter rateLimiter) {
-      rateLimiter(rateLimiter);
-      return this;
-    }
-
     public ReaderBuilder dropCachesBehind() {
       this.dropCacheBehind(true);
       return this;
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 0beb67f535..8bedcef04e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -33,17 +33,14 @@ import org.apache.accumulo.core.file.rfile.BlockIndex;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
-import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCache.Loader;
 import org.apache.accumulo.core.spi.cache.CacheEntry;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,11 +66,10 @@ public class CachableBlockFile {
 
   public static class CachableBuilder {
     String cacheId = null;
-    IoeSupplier<InputStream> inputSupplier = null;
+    IoeSupplier<FSDataInputStream> inputSupplier = null;
     IoeSupplier<Long> lengthSupplier = null;
     Cache<String,Long> fileLenCache = null;
     volatile CacheProvider cacheProvider = CacheProvider.NULL_PROVIDER;
-    RateLimiter readLimiter = null;
     Configuration hadoopConf = null;
     CryptoService cryptoService = null;
 
@@ -109,7 +105,7 @@ public class CachableBlockFile {
       return this;
     }
 
-    public CachableBuilder input(InputStream is, String cacheId) {
+    public CachableBuilder input(FSDataInputStream is, String cacheId) {
       this.cacheId = cacheId;
       this.inputSupplier = () -> is;
       return this;
@@ -130,11 +126,6 @@ public class CachableBlockFile {
       return this;
     }
 
-    public CachableBuilder readLimiter(RateLimiter readLimiter) {
-      this.readLimiter = readLimiter;
-      return this;
-    }
-
     public CachableBuilder cryptoService(CryptoService cryptoService) {
       this.cryptoService = cryptoService;
       return this;
@@ -145,7 +136,6 @@ public class CachableBlockFile {
    * Class wraps the BCFile reader.
    */
   public static class Reader implements Closeable {
-    private final RateLimiter readLimiter;
     // private BCFile.Reader _bc;
     private final String cacheId;
     private CacheProvider cacheProvider;
@@ -155,7 +145,7 @@ public class CachableBlockFile {
     private final Configuration conf;
     private final CryptoService cryptoService;
 
-    private final IoeSupplier<InputStream> inputSupplier;
+    private final IoeSupplier<FSDataInputStream> inputSupplier;
     private final IoeSupplier<Long> lengthSupplier;
     private final AtomicReference<BCFile.Reader> bcfr = new 
AtomicReference<>();
 
@@ -185,8 +175,7 @@ public class CachableBlockFile {
 
       BCFile.Reader reader = bcfr.get();
       if (reader == null) {
-        RateLimitedInputStream fsIn =
-            new RateLimitedInputStream((InputStream & Seekable) 
inputSupplier.get(), readLimiter);
+        FSDataInputStream fsIn = inputSupplier.get();
         BCFile.Reader tmpReader = null;
         if (serializedMetadata == null) {
           if (fileLenCache == null) {
@@ -385,7 +374,6 @@ public class CachableBlockFile {
       this.lengthSupplier = b.lengthSupplier;
       this.fileLenCache = b.fileLenCache;
       this.cacheProvider = b.cacheProvider;
-      this.readLimiter = b.readLimiter;
       this.conf = b.hadoopConf;
       this.cryptoService = Objects.requireNonNull(b.cryptoService);
     }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index cf3e9f64e2..46be67bf1e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -58,8 +58,7 @@ public class RFileOperations extends FileOperations {
     CachableBuilder cb = new CachableBuilder()
         .fsPath(options.getFileSystem(), options.getFile().getPath(), 
options.dropCacheBehind)
         .conf(options.getConfiguration()).fileLen(options.getFileLenCache())
-        
.cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter())
-        .cryptoService(options.getCryptoService());
+        
.cacheProvider(options.cacheProvider).cryptoService(options.getCryptoService());
     return RFile.getReader(cb, options.getFile());
   }
 
@@ -156,8 +155,7 @@ public class RFileOperations extends FileOperations {
       }
     }
 
-    BCFile.Writer _cbw = new BCFile.Writer(outputStream, 
options.getRateLimiter(), compression,
-        conf, options.cryptoService);
+    BCFile.Writer _cbw = new BCFile.Writer(outputStream, compression, conf, 
options.cryptoService);
 
     return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, 
samplerConfig, sampler);
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index e3adf9e517..928b252b16 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -92,10 +92,9 @@ public class SplitLarge implements KeywordExecutable {
         int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE);
         try (
             Writer small = new RFile.Writer(
-                new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", 
conf, cs), blockSize);
+                new BCFile.Writer(fs.create(new Path(smallName)), "gz", conf, 
cs), blockSize);
             Writer large = new RFile.Writer(
-                new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", 
conf, cs),
-                blockSize)) {
+                new BCFile.Writer(fs.create(new Path(largeName)), "gz", conf, 
cs), blockSize)) {
           small.startDefaultLocalityGroup();
           large.startDefaultLocalityGroup();
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 9b5683ce22..890dc58ab5 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -38,7 +38,6 @@ import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
 import org.apache.accumulo.core.crypto.CryptoUtils;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
 import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
 import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
@@ -47,7 +46,6 @@ import org.apache.accumulo.core.spi.crypto.FileDecrypter;
 import org.apache.accumulo.core.spi.crypto.FileEncrypter;
 import org.apache.accumulo.core.spi.crypto.NoFileDecrypter;
 import org.apache.accumulo.core.spi.crypto.NoFileEncrypter;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -104,7 +102,7 @@ public final class BCFile {
    * BCFile writer, the entry point for creating a new BCFile.
    */
   public static class Writer implements Closeable {
-    private final RateLimitedOutputStream out;
+    private final FSDataOutputStream out;
     private final Configuration conf;
     private FileEncrypter encrypter;
     private CryptoEnvironmentImpl cryptoEnvironment;
@@ -131,18 +129,18 @@ public final class BCFile {
       private final CompressionAlgorithm compressAlgo;
       private Compressor compressor; // !null only if using native
       // Hadoop compression
-      private final RateLimitedOutputStream fsOut;
+      private final FSDataOutputStream fsOut;
       private final OutputStream cipherOut;
       private final long posStart;
       private final SimpleBufferedOutputStream fsBufferedOutput;
       private OutputStream out;
 
-      public WBlockState(CompressionAlgorithm compressionAlgo, 
RateLimitedOutputStream fsOut,
+      public WBlockState(CompressionAlgorithm compressionAlgo, 
FSDataOutputStream fsOut,
           BytesWritable fsOutputBuffer, Configuration conf, FileEncrypter 
encrypter)
           throws IOException {
         this.compressAlgo = compressionAlgo;
         this.fsOut = fsOut;
-        this.posStart = fsOut.position();
+        this.posStart = fsOut.getPos();
 
         fsOutputBuffer.setCapacity(getFSOutputBufferSize(conf));
 
@@ -174,7 +172,7 @@ public final class BCFile {
        * @return The current byte offset in underlying file.
        */
       long getCurrentPos() {
-        return fsOut.position() + fsBufferedOutput.size();
+        return fsOut.getPos() + fsBufferedOutput.size();
       }
 
       long getStartPos() {
@@ -311,13 +309,13 @@ public final class BCFile {
      *        blocks.
      * @see Compression#getSupportedAlgorithms
      */
-    public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String 
compressionName,
-        Configuration conf, CryptoService cryptoService) throws IOException {
+    public Writer(FSDataOutputStream fout, String compressionName, 
Configuration conf,
+        CryptoService cryptoService) throws IOException {
       if (fout.getPos() != 0) {
         throw new IOException("Output file not at zero offset.");
       }
 
-      this.out = new RateLimitedOutputStream(fout, writeLimiter);
+      this.out = fout;
       this.conf = conf;
       dataIndex = new DataIndex(compressionName);
       metaIndex = new MetaIndex();
@@ -349,10 +347,10 @@ public final class BCFile {
             dataIndex.write(appender);
           }
 
-          long offsetIndexMeta = out.position();
+          long offsetIndexMeta = out.getPos();
           metaIndex.write(out);
 
-          long offsetCryptoParameter = out.position();
+          long offsetCryptoParameter = out.getPos();
           byte[] cryptoParams = this.encrypter.getDecryptionParameters();
           out.writeInt(cryptoParams.length);
           out.write(cryptoParams);
@@ -362,7 +360,7 @@ public final class BCFile {
           API_VERSION_3.write(out);
           Magic.write(out);
           out.flush();
-          length = out.position();
+          length = out.getPos();
           out.close();
         }
       } finally {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
 
b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
deleted file mode 100644
index 45d00a473b..0000000000
--- 
a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.hadoop.fs.Seekable;
-
-/**
- * A decorator for an {@code InputStream} which limits the rate at which reads 
are performed.
- */
-public class RateLimitedInputStream extends FilterInputStream implements 
Seekable {
-  private final RateLimiter rateLimiter;
-
-  public <StreamType extends InputStream & Seekable> 
RateLimitedInputStream(StreamType stream,
-      RateLimiter rateLimiter) {
-    super(stream);
-    this.rateLimiter = rateLimiter == null ? NullRateLimiter.INSTANCE : 
rateLimiter;
-  }
-
-  @Override
-  public int read() throws IOException {
-    int val = in.read();
-    if (val >= 0) {
-      rateLimiter.acquire(1);
-    }
-    return val;
-  }
-
-  @Override
-  public int read(byte[] buffer, int offset, int length) throws IOException {
-    int count = in.read(buffer, offset, length);
-    if (count > 0) {
-      rateLimiter.acquire(count);
-    }
-    return count;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    ((Seekable) in).seek(pos);
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return ((Seekable) in).getPos();
-  }
-
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    return ((Seekable) in).seekToNewSource(targetPos);
-  }
-}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
 
b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
deleted file mode 100644
index 3ac7a761e7..0000000000
--- 
a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-/**
- * A decorator for {@code OutputStream} which limits the rate at which data 
may be written.
- * Underlying OutputStream is a FSDataOutputStream.
- */
-public class RateLimitedOutputStream extends DataOutputStream {
-  private final RateLimiter writeLimiter;
-
-  public RateLimitedOutputStream(FSDataOutputStream fsDataOutputStream, 
RateLimiter writeLimiter) {
-    super(fsDataOutputStream);
-    this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : 
writeLimiter;
-  }
-
-  @Override
-  public synchronized void write(int i) throws IOException {
-    writeLimiter.acquire(1);
-    out.write(i);
-  }
-
-  @Override
-  public synchronized void write(byte[] buffer, int offset, int length) throws 
IOException {
-    writeLimiter.acquire(length);
-    out.write(buffer, offset, length);
-  }
-
-  @Override
-  public void close() throws IOException {
-    out.close();
-  }
-
-  public long position() {
-    return ((FSDataOutputStream) out).getPos();
-  }
-}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java 
b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index d9bca52264..2d3bc40df8 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.spi.cache.CacheEntry;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.summary.Gatherer.RowRange;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableUtils;
@@ -177,8 +178,9 @@ public class SummaryReader {
   public static SummaryReader load(Configuration conf, RFileSource source, 
String cacheId,
       Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory 
factory,
       CryptoService cryptoService) throws IOException {
-    CachableBuilder cb = new CachableBuilder().input(source.getInputStream(), 
cacheId)
-        .length(source.getLength()).conf(conf).cryptoService(cryptoService);
+    CachableBuilder cb =
+        new CachableBuilder().input((FSDataInputStream) 
source.getInputStream(), cacheId)
+            
.length(source.getLength()).conf(conf).cryptoService(cryptoService);
     return load(new CachableBlockFile.Reader(cb), summarySelector, factory);
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
index 471cd28f8a..300b175302 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java
@@ -42,15 +42,9 @@ public class CompactionServicesConfig {
   private final Map<String,String> planners = new HashMap<>();
   private final Map<String,Long> rateLimits = new HashMap<>();
   private final Map<String,Map<String,String>> options = new HashMap<>();
-  long defaultRateLimit;
 
   public static final CompactionServiceId DEFAULT_SERVICE = 
CompactionServiceId.of("default");
 
-  private long getDefaultThroughput() {
-    return ConfigurationTypeHelper
-        
.getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
-  }
-
   private static Map<String,String> getConfiguration(AccumuloConfiguration 
aconf) {
     return 
aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX);
   }
@@ -84,8 +78,6 @@ public class CompactionServicesConfig {
       }
     });
 
-    defaultRateLimit = getDefaultThroughput();
-
     var diff = Sets.difference(options.keySet(), planners.keySet());
 
     if (!diff.isEmpty()) {
@@ -94,10 +86,6 @@ public class CompactionServicesConfig {
     }
   }
 
-  public long getRateLimit(String serviceName) {
-    return getRateLimits().getOrDefault(serviceName, defaultRateLimit);
-  }
-
   @Override
   public boolean equals(Object o) {
     if (o instanceof CompactionServicesConfig) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
deleted file mode 100644
index c2db7b0e9a..0000000000
--- 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.util.ratelimit;
-
-/** Rate limiter from the Guava library. */
-public class GuavaRateLimiter implements RateLimiter {
-  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
-  private long currentRate;
-
-  /**
-   * Constructor
-   *
-   * @param initialRate Count of permits which should be made available per 
second. A non-positive
-   *        rate is taken to indicate there should be no limitation on rate.
-   */
-  public GuavaRateLimiter(long initialRate) {
-    this.currentRate = initialRate;
-    this.rateLimiter = com.google.common.util.concurrent.RateLimiter
-        .create(initialRate > 0 ? initialRate : Long.MAX_VALUE);
-  }
-
-  @Override
-  public long getRate() {
-    return currentRate;
-  }
-
-  /**
-   * Change the rate at which permits are made available.
-   *
-   * @param newRate Count of permits which should be made available per 
second. A non-positive rate
-   *        is taken to indicate that there should be no limitation on rate.
-   */
-  public void setRate(long newRate) {
-    this.rateLimiter.setRate(newRate > 0 ? newRate : Long.MAX_VALUE);
-    this.currentRate = newRate;
-  }
-
-  @Override
-  public void acquire(long numPermits) {
-    if (this.currentRate > 0) {
-      while (numPermits > Integer.MAX_VALUE) {
-        rateLimiter.acquire(Integer.MAX_VALUE);
-        numPermits -= Integer.MAX_VALUE;
-      }
-      if (numPermits > 0) {
-        rateLimiter.acquire((int) numPermits);
-      }
-    }
-  }
-}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
deleted file mode 100644
index 8fae1479ed..0000000000
--- 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.util.ratelimit;
-
-/**
- * A rate limiter which doesn't actually limit rates at all.
- */
-public class NullRateLimiter implements RateLimiter {
-  public static final NullRateLimiter INSTANCE = new NullRateLimiter();
-
-  private NullRateLimiter() {}
-
-  @Override
-  public long getRate() {
-    return 0;
-  }
-
-  @Override
-  public void acquire(long numPermits) {}
-
-}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
deleted file mode 100644
index b94ecb2638..0000000000
--- 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.util.ratelimit;
-
-public interface RateLimiter {
-  /**
-   * Get current QPS of the rate limiter, with a non-positive rate indicating 
no limit.
-   */
-  long getRate();
-
-  /** Sleep until the specified number of queries are available. */
-  void acquire(long numPermits);
-}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
deleted file mode 100644
index 71e26cde19..0000000000
--- 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.util.ratelimit;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-import java.lang.ref.WeakReference;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.util.threads.ThreadPools;
-import org.apache.accumulo.core.util.threads.Threads;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides the ability to retrieve a {@link RateLimiter} keyed to a specific 
string, which will
- * dynamically update its rate according to a specified callback function.
- */
-public class SharedRateLimiterFactory {
-  private static final long REPORT_RATE = 60000;
-  private static final long UPDATE_RATE = 1000;
-  private static SharedRateLimiterFactory instance = null;
-  private static ScheduledFuture<?> updateTaskFuture;
-  private final Logger log = 
LoggerFactory.getLogger(SharedRateLimiterFactory.class);
-  private final WeakHashMap<String,WeakReference<SharedRateLimiter>> 
activeLimiters =
-      new WeakHashMap<>();
-
-  private SharedRateLimiterFactory() {}
-
-  /** Get the singleton instance of the SharedRateLimiterFactory. */
-  public static synchronized SharedRateLimiterFactory 
getInstance(AccumuloConfiguration conf) {
-    if (instance == null) {
-      instance = new SharedRateLimiterFactory();
-
-      ScheduledThreadPoolExecutor svc =
-          
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-      updateTaskFuture = svc.scheduleWithFixedDelay(Threads
-          .createNamedRunnable("SharedRateLimiterFactory update polling", 
instance::updateAll),
-          UPDATE_RATE, UPDATE_RATE, MILLISECONDS);
-
-      ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads
-          .createNamedRunnable("SharedRateLimiterFactory report polling", 
instance::reportAll),
-          REPORT_RATE, REPORT_RATE, MILLISECONDS);
-      ThreadPools.watchNonCriticalScheduledTask(future);
-
-    }
-    return instance;
-  }
-
-  /**
-   * A callback which provides the current rate for a {@link RateLimiter}.
-   */
-  public interface RateProvider {
-    /**
-     * Calculate the current rate for the {@link RateLimiter}.
-     *
-     * @return Count of permits which should be provided per second. A 
non-positive count is taken
-     *         to indicate that no rate limiting should be performed.
-     */
-    long getDesiredRate();
-  }
-
-  /**
-   * Lookup the RateLimiter associated with the specified name, or create a 
new one for that name.
-   *
-   * @param name key for the rate limiter
-   * @param rateProvider a function which can be called to get what the 
current rate for the rate
-   *        limiter should be.
-   */
-  public RateLimiter create(String name, RateProvider rateProvider) {
-    synchronized (activeLimiters) {
-      if (updateTaskFuture.isDone()) {
-        log.warn("SharedRateLimiterFactory update task has failed.");
-      }
-      var limiterRef = activeLimiters.get(name);
-      var limiter = limiterRef == null ? null : limiterRef.get();
-      if (limiter == null) {
-        limiter = new SharedRateLimiter(name, rateProvider, 
rateProvider.getDesiredRate());
-        activeLimiters.put(name, new WeakReference<>(limiter));
-      }
-      return limiter;
-    }
-  }
-
-  private void copyAndThen(String actionName, Consumer<SharedRateLimiter> 
action) {
-    Map<String,SharedRateLimiter> limitersCopy = new HashMap<>();
-    // synchronize only for copy
-    synchronized (activeLimiters) {
-      activeLimiters.forEach((name, limiterRef) -> {
-        var limiter = limiterRef.get();
-        if (limiter != null) {
-          limitersCopy.put(name, limiter);
-        }
-      });
-    }
-    limitersCopy.forEach((name, limiter) -> {
-      try {
-        action.accept(limiter);
-      } catch (RuntimeException e) {
-        log.error("Failed to {} limiter {}", actionName, name, e);
-      }
-    });
-  }
-
-  /**
-   * Walk through all of the currently active RateLimiters, having each update 
its current rate.
-   * This is called periodically so that we can dynamically update as 
configuration changes.
-   */
-  private void updateAll() {
-    copyAndThen("update", SharedRateLimiter::update);
-  }
-
-  /**
-   * Walk through all of the currently active RateLimiters, having each report 
its activity to the
-   * debug log.
-   */
-  private void reportAll() {
-    copyAndThen("report", SharedRateLimiter::report);
-  }
-
-  protected class SharedRateLimiter extends GuavaRateLimiter {
-    private AtomicLong permitsAcquired = new AtomicLong();
-    private AtomicLong lastUpdate = new AtomicLong();
-
-    private final RateProvider rateProvider;
-    private final String name;
-
-    SharedRateLimiter(String name, RateProvider rateProvider, long 
initialRate) {
-      super(initialRate);
-      this.name = name;
-      this.rateProvider = rateProvider;
-      this.lastUpdate.set(System.nanoTime());
-    }
-
-    @Override
-    public void acquire(long numPermits) {
-      super.acquire(numPermits);
-      permitsAcquired.addAndGet(numPermits);
-    }
-
-    /** Poll the callback, updating the current rate if necessary. */
-    public void update() {
-      // Reset rate if needed
-      long rate = rateProvider.getDesiredRate();
-      if (rate != getRate()) {
-        setRate(rate);
-      }
-    }
-
-    /** Report the current throughput and usage of this rate limiter to the 
debug log. */
-    public void report() {
-      if (log.isDebugEnabled()) {
-        long duration = NANOSECONDS.toMillis(System.nanoTime() - 
lastUpdate.get());
-        if (duration == 0) {
-          return;
-        }
-        lastUpdate.set(System.nanoTime());
-
-        long sum = permitsAcquired.get();
-        permitsAcquired.set(0);
-
-        if (sum > 0) {
-          log.debug(String.format("RateLimiter '%s': %,d of %,d 
permits/second", name,
-              sum * 1000L / duration, getRate()));
-        }
-      }
-    }
-  }
-}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java
index a220407b92..d7b31bc73d 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java
@@ -99,7 +99,7 @@ public abstract class AbstractRFileTest {
       CryptoService cs = 
CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE,
           accumuloConfiguration.getAllCryptoProperties());
 
-      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, cs);
+      BCFile.Writer _cbw = new BCFile.Writer(dos, "gz", conf, cs);
 
       SamplerConfigurationImpl samplerConfig =
           SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration);
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index cd13745a41..29926b0191 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -61,7 +61,7 @@ public class CreateCompatTestFile {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
     AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
-    BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, 
"gz", conf,
+    BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), "gz", 
conf,
         CryptoFactoryLoader.getServiceForServer(aconf));
     RFile.Writer writer = new RFile.Writer(_cbw, 1000);
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 2065f8b676..b0d2bb0c3e 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -66,7 +66,7 @@ public class MultiLevelIndexTest {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream dos = new FSDataOutputStream(baos, new 
FileSystem.Statistics("a"));
     CryptoService cs = CryptoFactoryLoader.getServiceForServer(aconf);
-    BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", hadoopConf, cs);
+    BCFile.Writer _cbw = new BCFile.Writer(dos, "gz", hadoopConf, cs);
 
     BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 8c66bb6ead..5e7f237eb3 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -153,7 +153,7 @@ public class MultiThreadedRFileTest {
       FileSystem fs = FileSystem.newInstance(conf);
       Path path = new Path("file://" + rfile);
       dos = fs.create(path, true);
-      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf,
+      BCFile.Writer _cbw = new BCFile.Writer(dos, "gz", conf,
           CryptoFactoryLoader.getServiceForServer(accumuloConfiguration));
       SamplerConfigurationImpl samplerConfig =
           SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration);
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
deleted file mode 100644
index 8eb3a5ef67..0000000000
--- 
a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.hadoop.fs.Seekable;
-import org.easymock.EasyMock;
-import org.junit.jupiter.api.Test;
-
-public class RateLimitedInputStreamTest {
-
-  @Test
-  public void permitsAreProperlyAcquired() throws Exception {
-    // Create variables for tracking behaviors of mock object
-    AtomicLong rateLimiterPermitsAcquired = new AtomicLong();
-    // Construct mock object
-    RateLimiter rateLimiter = EasyMock.niceMock(RateLimiter.class);
-    // Stub Mock Method
-    rateLimiter.acquire(EasyMock.anyLong());
-    EasyMock.expectLastCall()
-        .andAnswer(() -> 
rateLimiterPermitsAcquired.addAndGet(EasyMock.getCurrentArgument(0)))
-        .anyTimes();
-    EasyMock.replay(rateLimiter);
-
-    long bytesRetrieved = 0;
-    try (InputStream is = new RateLimitedInputStream(new RandomInputStream(), 
rateLimiter)) {
-      for (int i = 0; i < 100; ++i) {
-        int count = Math.abs(RANDOM.get().nextInt()) % 65536;
-        int countRead = is.read(new byte[count]);
-        assertEquals(count, countRead);
-        bytesRetrieved += count;
-      }
-    }
-    assertEquals(bytesRetrieved, rateLimiterPermitsAcquired.get());
-  }
-
-  private static class RandomInputStream extends InputStream implements 
Seekable {
-
-    @Override
-    public int read() {
-      return RANDOM.get().nextInt() & 0xff;
-    }
-
-    @Override
-    public void seek(long pos) {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public long getPos() {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-    @Override
-    public boolean seekToNewSource(long targetPos) {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-  }
-
-}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
 
b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
deleted file mode 100644
index 8df1a3104e..0000000000
--- 
a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.easymock.EasyMock;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.io.CountingOutputStream;
-
-public class RateLimitedOutputStreamTest {
-
-  @Test
-  public void permitsAreProperlyAcquired() throws Exception {
-    // Create variables for tracking behaviors of mock object
-    AtomicLong rateLimiterPermitsAcquired = new AtomicLong();
-    // Construct mock object
-    RateLimiter rateLimiter = EasyMock.niceMock(RateLimiter.class);
-    // Stub Mock Method
-    rateLimiter.acquire(EasyMock.anyLong());
-    EasyMock.expectLastCall()
-        .andAnswer(() -> 
rateLimiterPermitsAcquired.addAndGet(EasyMock.getCurrentArgument(0)))
-        .anyTimes();
-    EasyMock.replay(rateLimiter);
-
-    long bytesWritten = 0;
-    try (RateLimitedOutputStream os =
-        new RateLimitedOutputStream(new NullOutputStream(), rateLimiter)) {
-      for (int i = 0; i < 100; ++i) {
-        byte[] bytes = new byte[Math.abs(RANDOM.get().nextInt() % 65536)];
-        os.write(bytes);
-        bytesWritten += bytes.length;
-      }
-      assertEquals(bytesWritten, os.position());
-    }
-    assertEquals(bytesWritten, rateLimiterPermitsAcquired.get());
-  }
-
-  public static class NullOutputStream extends FSDataOutputStream {
-    public NullOutputStream() {
-      super(new CountingOutputStream(OutputStream.nullOutputStream()), null);
-    }
-  }
-
-}
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 1c5b35e81f..0c5bad07c4 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -64,7 +64,6 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import 
org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
@@ -94,10 +93,6 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
     IteratorScope getIteratorScope();
 
-    RateLimiter getReadLimiter();
-
-    RateLimiter getWriteLimiter();
-
     SystemIteratorEnvironment createIteratorEnv(ServerContext context,
         AccumuloConfiguration acuTableConf, TableId tableId);
 
@@ -233,7 +228,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
 
       WriterBuilder outBuilder =
           fileFactory.newWriterBuilder().forFile(outputFile, ns, ns.getConf(), 
cryptoService)
-              
.withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter());
+              .withTableConfiguration(acuTableConf);
       if (dropCacheBehindOutput) {
         outBuilder.dropCachesBehind();
       }
@@ -338,8 +333,7 @@ public class FileCompactor implements 
Callable<CompactionStats> {
         FileSKVIterator reader;
 
         reader = fileFactory.newReaderBuilder().forFile(dataFile, fs, 
fs.getConf(), cryptoService)
-            
.withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter())
-            .dropCachesBehind().build();
+            .withTableConfiguration(acuTableConf).dropCachesBehind().build();
 
         readers.add(reader);
 
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
index 59235b6cec..ea1c5245e1 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java
@@ -27,8 +27,6 @@ import 
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.FileCompactor.CompactionEnv;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
@@ -74,16 +72,6 @@ public class ExtCEnv implements CompactionEnv {
     return IteratorScope.majc;
   }
 
-  @Override
-  public RateLimiter getReadLimiter() {
-    return NullRateLimiter.INSTANCE;
-  }
-
-  @Override
-  public RateLimiter getWriteLimiter() {
-    return NullRateLimiter.INSTANCE;
-  }
-
   @Override
   public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
       AccumuloConfiguration acuTableConf, TableId tableId) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java
index ea683b04bf..aaa8062550 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java
@@ -25,7 +25,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.FileCompactor;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
@@ -51,16 +50,6 @@ public class MinCEnv implements FileCompactor.CompactionEnv {
     return IteratorUtil.IteratorScope.minc;
   }
 
-  @Override
-  public RateLimiter getReadLimiter() {
-    return null;
-  }
-
-  @Override
-  public RateLimiter getWriteLimiter() {
-    return null;
-  }
-
   @Override
   public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
       AccumuloConfiguration acuTableConf, TableId tableId) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java
deleted file mode 100644
index 0b4280f8d1..0000000000
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.compaction;
-
-import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.Map;
-
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
-import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacBase;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.Test;
-
-public class CompactionRateLimitingIT extends ConfigurableMacBase {
-  public static final long BYTES_TO_WRITE = 10 * 1024 * 1024;
-  public static final long RATE = 1 * 1024 * 1024;
-
-  protected Property getThroughputProp() {
-    return Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT;
-  }
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
-    cfg.setProperty(getThroughputProp(), RATE + "B");
-    cfg.setProperty(Property.TABLE_MAJC_RATIO, "20");
-    cfg.setProperty(Property.TABLE_FILE_COMPRESSION_TYPE, "none");
-
-    cfg.setProperty("tserver.compaction.major.service.test.rate.limit", RATE + 
"B");
-    cfg.setProperty("tserver.compaction.major.service.test.planner",
-        DefaultCompactionPlanner.class.getName());
-    
cfg.setProperty("tserver.compaction.major.service.test.planner.opts.executors",
-        "[{'name':'all','numThreads':2}]".replaceAll("'", "\""));
-
-  }
-
-  @Test
-  public void majorCompactionsAreRateLimited() throws Exception {
-    long bytesWritten = 0;
-    String[] tableNames = getUniqueNames(1);
-
-    try (AccumuloClient client =
-        getCluster().createAccumuloClient("root", new 
PasswordToken(ROOT_PASSWORD))) {
-
-      for (int i = 0; i < tableNames.length; i++) {
-        String tableName = tableNames[i];
-
-        NewTableConfiguration ntc = new NewTableConfiguration();
-        if (i == 1) {
-          ntc.setProperties(Map.of("table.compaction.dispatcher.opts.service", 
"test"));
-        }
-
-        client.tableOperations().create(tableName, ntc);
-        try (BatchWriter bw = client.createBatchWriter(tableName)) {
-          while (bytesWritten < BYTES_TO_WRITE) {
-            byte[] rowKey = new byte[32];
-            RANDOM.get().nextBytes(rowKey);
-
-            byte[] qual = new byte[32];
-            RANDOM.get().nextBytes(qual);
-
-            byte[] value = new byte[1024];
-            RANDOM.get().nextBytes(value);
-
-            Mutation m = new Mutation(rowKey);
-            m.put(new byte[0], qual, value);
-            bw.addMutation(m);
-
-            bytesWritten += rowKey.length + qual.length + value.length;
-          }
-        }
-
-        client.tableOperations().flush(tableName, null, null, true);
-
-        long compactionStart = System.currentTimeMillis();
-        client.tableOperations().compact(tableName, null, null, false, true);
-        long duration = System.currentTimeMillis() - compactionStart;
-        // The rate will be "bursty", try to account for that by taking 80% of 
the expected rate
-        // (allow for 20% under the maximum expected duration)
-        String message = String.format(
-            "Expected a compaction rate of no more than %,d bytes/sec, but saw 
a rate of %,f bytes/sec",
-            (int) (0.8d * RATE), 1000.0 * bytesWritten / duration);
-        assertTrue(duration > 1000L * 0.8 * BYTES_TO_WRITE / RATE, message);
-      }
-    }
-  }
-}

Reply via email to