This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 2d8bdf2b21 Removed rate limiters as they weren't really being used (#3874) 2d8bdf2b21 is described below commit 2d8bdf2b217e5b42050af5625e201b3ac398e3d7 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Oct 24 08:44:28 2023 -0400 Removed rate limiters as they weren't really being used (#3874) Fixes #3597 --- .../accumulo/core/client/rfile/RFileScanner.java | 8 +- .../org/apache/accumulo/core/conf/Property.java | 18 +- .../apache/accumulo/core/file/FileOperations.java | 46 ++--- .../file/blockfile/impl/CachableBlockFile.java | 20 +-- .../accumulo/core/file/rfile/RFileOperations.java | 6 +- .../accumulo/core/file/rfile/SplitLarge.java | 5 +- .../accumulo/core/file/rfile/bcfile/BCFile.java | 24 ++- .../core/file/streams/RateLimitedInputStream.java | 73 -------- .../core/file/streams/RateLimitedOutputStream.java | 60 ------- .../accumulo/core/summary/SummaryReader.java | 6 +- .../util/compaction/CompactionServicesConfig.java | 12 -- .../core/util/ratelimit/GuavaRateLimiter.java | 66 ------- .../core/util/ratelimit/NullRateLimiter.java | 37 ---- .../accumulo/core/util/ratelimit/RateLimiter.java | 29 ---- .../util/ratelimit/SharedRateLimiterFactory.java | 193 --------------------- .../core/file/rfile/AbstractRFileTest.java | 2 +- .../core/file/rfile/CreateCompatTestFile.java | 2 +- .../core/file/rfile/MultiLevelIndexTest.java | 2 +- .../core/file/rfile/MultiThreadedRFileTest.java | 2 +- .../file/streams/RateLimitedInputStreamTest.java | 83 --------- .../file/streams/RateLimitedOutputStreamTest.java | 68 -------- .../accumulo/server/compaction/FileCompactor.java | 10 +- .../org/apache/accumulo/compactor/ExtCEnv.java | 12 -- .../apache/accumulo/tserver/tablet/MinCEnv.java | 11 -- .../test/compaction/CompactionRateLimitingIT.java | 110 ------------ 25 files changed, 47 insertions(+), 858 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 79ea0bddfd..c5716fa6c8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -347,10 +347,10 @@ class RFileScanner extends ScannerOptions implements Scanner { for (int i = 0; i < sources.length; i++) { // TODO may have been a bug with multiple files and caching in older version... - FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream(); - CachableBuilder cb = - new CachableBuilder().input(inputStream, "source-" + i).length(sources[i].getLength()) - .conf(opts.in.getConf()).cacheProvider(cacheProvider).cryptoService(cryptoService); + CachableBuilder cb = new CachableBuilder() + .input((FSDataInputStream) sources[i].getInputStream(), "source-" + i) + .length(sources[i].getLength()).conf(opts.in.getConf()).cacheProvider(cacheProvider) + .cryptoService(cryptoService); readers.add(RFile.getReader(cb, sources[i].getRange())); } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 53011b7f33..d1938b6281 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -589,11 +589,7 @@ public enum Property { TSERV_COMPACTION_SERVICE_ROOT_PLANNER("tserver.compaction.major.service.root.planner", DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME, "Compaction planner for root tablet service", "2.1.0"), - TSERV_COMPACTION_SERVICE_ROOT_RATE_LIMIT("tserver.compaction.major.service.root.rate.limit", "0B", - PropertyType.BYTES, - "Maximum number of bytes to read or write per second over all major" - + " compactions in this compaction service, or 0B for unlimited.", - "2.1.0"), + // ELASTICITY_TODO: Deprecate TSERV_COMPACTION_SERVICE_ROOT_RATE_LIMIT in 3.x TSERV_COMPACTION_SERVICE_ROOT_MAX_OPEN( "tserver.compaction.major.service.root.planner.opts.maxOpen", "30", PropertyType.COUNT, "The maximum number of files a compaction will open", "2.1.0"), @@ -606,11 +602,7 @@ public enum Property { TSERV_COMPACTION_SERVICE_META_PLANNER("tserver.compaction.major.service.meta.planner", DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME, "Compaction planner for metadata table", "2.1.0"), - TSERV_COMPACTION_SERVICE_META_RATE_LIMIT("tserver.compaction.major.service.meta.rate.limit", "0B", - PropertyType.BYTES, - "Maximum number of bytes to read or write per second over all major" - + " compactions in this compaction service, or 0B for unlimited.", - "2.1.0"), + // ELASTICITY_TODO: Deprecate TSERV_COMPACTION_SERVICE_META_RATE_LIMIT in 3.x TSERV_COMPACTION_SERVICE_META_MAX_OPEN( "tserver.compaction.major.service.meta.planner.opts.maxOpen", "30", PropertyType.COUNT, "The maximum number of files a compaction will open", "2.1.0"), @@ -623,11 +615,7 @@ public enum Property { TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER("tserver.compaction.major.service.default.planner", DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME, "Planner for default compaction service.", "2.1.0"), - TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT("tserver.compaction.major.service.default.rate.limit", - "0B", PropertyType.BYTES, - "Maximum number of bytes to read or write per second over all major" - + " compactions in this compaction service, or 0B for unlimited.", - "2.1.0"), + // ELASTICITY_TODO: Deprecate TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT in 3.x TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN( "tserver.compaction.major.service.default.planner.opts.maxOpen", "10", PropertyType.COUNT, "The maximum number of files a compaction will open", "2.1.0"), diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index 5182c614aa..6d2df13fd1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -34,7 +34,6 @@ import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.UnreferencedTabletFile; import org.apache.accumulo.core.spi.crypto.CryptoService; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -171,7 +170,6 @@ public abstract class FileOperations { public final TabletFile file; public final FileSystem fs; public final Configuration fsConf; - public final RateLimiter rateLimiter; // writer only objects public final String compression; public final FSDataOutputStream outputStream; @@ -188,15 +186,14 @@ public abstract class FileOperations { public final boolean dropCacheBehind; protected FileOptions(AccumuloConfiguration tableConfiguration, TabletFile file, FileSystem fs, - Configuration fsConf, RateLimiter rateLimiter, String compression, - FSDataOutputStream outputStream, boolean enableAccumuloStart, CacheProvider cacheProvider, - Cache<String,Long> fileLenCache, boolean seekToBeginning, CryptoService cryptoService, - Range range, Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind) { + Configuration fsConf, String compression, FSDataOutputStream outputStream, + boolean enableAccumuloStart, CacheProvider cacheProvider, Cache<String,Long> fileLenCache, + boolean seekToBeginning, CryptoService cryptoService, Range range, + Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind) { this.tableConfiguration = tableConfiguration; this.file = Objects.requireNonNull(file); this.fs = fs; this.fsConf = fsConf; - this.rateLimiter = rateLimiter; this.compression = compression; this.outputStream = outputStream; this.enableAccumuloStart = enableAccumuloStart; @@ -226,10 +223,6 @@ public abstract class FileOperations { return fsConf; } - public RateLimiter getRateLimiter() { - return rateLimiter; - } - public String getCompression() { return compression; } @@ -279,7 +272,6 @@ public abstract class FileOperations { private TabletFile file; private FileSystem fs; private Configuration fsConf; - private RateLimiter rateLimiter; private CryptoService cryptoService; private boolean dropCacheBehind = false; @@ -303,11 +295,6 @@ public abstract class FileOperations { return this; } - protected FileHelper rateLimiter(RateLimiter rateLimiter) { - this.rateLimiter = rateLimiter; - return this; - } - protected FileHelper cryptoService(CryptoService cs) { this.cryptoService = Objects.requireNonNull(cs); return this; @@ -320,28 +307,27 @@ public abstract class FileOperations { protected FileOptions toWriterBuilderOptions(String compression, FSDataOutputStream outputStream, boolean startEnabled) { - return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, compression, - outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true, + return new FileOptions(tableConfiguration, file, fs, fsConf, compression, outputStream, + startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true, dropCacheBehind); } protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider, Cache<String,Long> fileLenCache, boolean seekToBeginning) { - return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, null, null, false, + return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false, cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache, seekToBeginning, cryptoService, null, null, true, dropCacheBehind); } protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) { - return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, null, null, false, - NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true, dropCacheBehind); + return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false, NULL_PROVIDER, + fileLenCache, false, cryptoService, null, null, true, dropCacheBehind); } protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies, boolean inclusive) { - return new FileOptions(tableConfiguration, file, fs, fsConf, rateLimiter, null, null, false, - NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive, - dropCacheBehind); + return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false, NULL_PROVIDER, + null, false, cryptoService, range, columnFamilies, inclusive, dropCacheBehind); } protected AccumuloConfiguration getTableConfiguration() { @@ -388,11 +374,6 @@ public abstract class FileOperations { return this; } - public WriterBuilder withRateLimiter(RateLimiter rateLimiter) { - rateLimiter(rateLimiter); - return this; - } - public WriterBuilder dropCachesBehind() { this.dropCacheBehind(true); return this; @@ -441,11 +422,6 @@ public abstract class FileOperations { return this; } - public ReaderBuilder withRateLimiter(RateLimiter rateLimiter) { - rateLimiter(rateLimiter); - return this; - } - public ReaderBuilder dropCachesBehind() { this.dropCacheBehind(true); return this; diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 0beb67f535..8bedcef04e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -33,17 +33,14 @@ import org.apache.accumulo.core.file.rfile.BlockIndex; import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader; import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; -import org.apache.accumulo.core.file.streams.RateLimitedInputStream; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCache.Loader; import org.apache.accumulo.core.spi.cache.CacheEntry; import org.apache.accumulo.core.spi.crypto.CryptoService; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Seekable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,11 +66,10 @@ public class CachableBlockFile { public static class CachableBuilder { String cacheId = null; - IoeSupplier<InputStream> inputSupplier = null; + IoeSupplier<FSDataInputStream> inputSupplier = null; IoeSupplier<Long> lengthSupplier = null; Cache<String,Long> fileLenCache = null; volatile CacheProvider cacheProvider = CacheProvider.NULL_PROVIDER; - RateLimiter readLimiter = null; Configuration hadoopConf = null; CryptoService cryptoService = null; @@ -109,7 +105,7 @@ public class CachableBlockFile { return this; } - public CachableBuilder input(InputStream is, String cacheId) { + public CachableBuilder input(FSDataInputStream is, String cacheId) { this.cacheId = cacheId; this.inputSupplier = () -> is; return this; @@ -130,11 +126,6 @@ public class CachableBlockFile { return this; } - public CachableBuilder readLimiter(RateLimiter readLimiter) { - this.readLimiter = readLimiter; - return this; - } - public CachableBuilder cryptoService(CryptoService cryptoService) { this.cryptoService = cryptoService; return this; @@ -145,7 +136,6 @@ public class CachableBlockFile { * Class wraps the BCFile reader. */ public static class Reader implements Closeable { - private final RateLimiter readLimiter; // private BCFile.Reader _bc; private final String cacheId; private CacheProvider cacheProvider; @@ -155,7 +145,7 @@ public class CachableBlockFile { private final Configuration conf; private final CryptoService cryptoService; - private final IoeSupplier<InputStream> inputSupplier; + private final IoeSupplier<FSDataInputStream> inputSupplier; private final IoeSupplier<Long> lengthSupplier; private final AtomicReference<BCFile.Reader> bcfr = new AtomicReference<>(); @@ -185,8 +175,7 @@ public class CachableBlockFile { BCFile.Reader reader = bcfr.get(); if (reader == null) { - RateLimitedInputStream fsIn = - new RateLimitedInputStream((InputStream & Seekable) inputSupplier.get(), readLimiter); + FSDataInputStream fsIn = inputSupplier.get(); BCFile.Reader tmpReader = null; if (serializedMetadata == null) { if (fileLenCache == null) { @@ -385,7 +374,6 @@ public class CachableBlockFile { this.lengthSupplier = b.lengthSupplier; this.fileLenCache = b.fileLenCache; this.cacheProvider = b.cacheProvider; - this.readLimiter = b.readLimiter; this.conf = b.hadoopConf; this.cryptoService = Objects.requireNonNull(b.cryptoService); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index cf3e9f64e2..46be67bf1e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -58,8 +58,7 @@ public class RFileOperations extends FileOperations { CachableBuilder cb = new CachableBuilder() .fsPath(options.getFileSystem(), options.getFile().getPath(), options.dropCacheBehind) .conf(options.getConfiguration()).fileLen(options.getFileLenCache()) - .cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter()) - .cryptoService(options.getCryptoService()); + .cacheProvider(options.cacheProvider).cryptoService(options.getCryptoService()); return RFile.getReader(cb, options.getFile()); } @@ -156,8 +155,7 @@ public class RFileOperations extends FileOperations { } } - BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression, - conf, options.cryptoService); + BCFile.Writer _cbw = new BCFile.Writer(outputStream, compression, conf, options.cryptoService); return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java index e3adf9e517..928b252b16 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java @@ -92,10 +92,9 @@ public class SplitLarge implements KeywordExecutable { int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE); try ( Writer small = new RFile.Writer( - new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", conf, cs), blockSize); + new BCFile.Writer(fs.create(new Path(smallName)), "gz", conf, cs), blockSize); Writer large = new RFile.Writer( - new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", conf, cs), - blockSize)) { + new BCFile.Writer(fs.create(new Path(largeName)), "gz", conf, cs), blockSize)) { small.startDefaultLocalityGroup(); large.startDefaultLocalityGroup(); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index 9b5683ce22..890dc58ab5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@ -38,7 +38,6 @@ import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl; import org.apache.accumulo.core.crypto.CryptoUtils; import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version; import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream; -import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; import org.apache.accumulo.core.file.streams.SeekableDataInputStream; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope; @@ -47,7 +46,6 @@ import org.apache.accumulo.core.spi.crypto.FileDecrypter; import org.apache.accumulo.core.spi.crypto.FileEncrypter; import org.apache.accumulo.core.spi.crypto.NoFileDecrypter; import org.apache.accumulo.core.spi.crypto.NoFileEncrypter; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -104,7 +102,7 @@ public final class BCFile { * BCFile writer, the entry point for creating a new BCFile. */ public static class Writer implements Closeable { - private final RateLimitedOutputStream out; + private final FSDataOutputStream out; private final Configuration conf; private FileEncrypter encrypter; private CryptoEnvironmentImpl cryptoEnvironment; @@ -131,18 +129,18 @@ public final class BCFile { private final CompressionAlgorithm compressAlgo; private Compressor compressor; // !null only if using native // Hadoop compression - private final RateLimitedOutputStream fsOut; + private final FSDataOutputStream fsOut; private final OutputStream cipherOut; private final long posStart; private final SimpleBufferedOutputStream fsBufferedOutput; private OutputStream out; - public WBlockState(CompressionAlgorithm compressionAlgo, RateLimitedOutputStream fsOut, + public WBlockState(CompressionAlgorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf, FileEncrypter encrypter) throws IOException { this.compressAlgo = compressionAlgo; this.fsOut = fsOut; - this.posStart = fsOut.position(); + this.posStart = fsOut.getPos(); fsOutputBuffer.setCapacity(getFSOutputBufferSize(conf)); @@ -174,7 +172,7 @@ public final class BCFile { * @return The current byte offset in underlying file. */ long getCurrentPos() { - return fsOut.position() + fsBufferedOutput.size(); + return fsOut.getPos() + fsBufferedOutput.size(); } long getStartPos() { @@ -311,13 +309,13 @@ public final class BCFile { * blocks. * @see Compression#getSupportedAlgorithms */ - public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName, - Configuration conf, CryptoService cryptoService) throws IOException { + public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, + CryptoService cryptoService) throws IOException { if (fout.getPos() != 0) { throw new IOException("Output file not at zero offset."); } - this.out = new RateLimitedOutputStream(fout, writeLimiter); + this.out = fout; this.conf = conf; dataIndex = new DataIndex(compressionName); metaIndex = new MetaIndex(); @@ -349,10 +347,10 @@ public final class BCFile { dataIndex.write(appender); } - long offsetIndexMeta = out.position(); + long offsetIndexMeta = out.getPos(); metaIndex.write(out); - long offsetCryptoParameter = out.position(); + long offsetCryptoParameter = out.getPos(); byte[] cryptoParams = this.encrypter.getDecryptionParameters(); out.writeInt(cryptoParams.length); out.write(cryptoParams); @@ -362,7 +360,7 @@ public final class BCFile { API_VERSION_3.write(out); Magic.write(out); out.flush(); - length = out.position(); + length = out.getPos(); out.close(); } } finally { diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java deleted file mode 100644 index 45d00a473b..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.file.streams; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.hadoop.fs.Seekable; - -/** - * A decorator for an {@code InputStream} which limits the rate at which reads are performed. - */ -public class RateLimitedInputStream extends FilterInputStream implements Seekable { - private final RateLimiter rateLimiter; - - public <StreamType extends InputStream & Seekable> RateLimitedInputStream(StreamType stream, - RateLimiter rateLimiter) { - super(stream); - this.rateLimiter = rateLimiter == null ? NullRateLimiter.INSTANCE : rateLimiter; - } - - @Override - public int read() throws IOException { - int val = in.read(); - if (val >= 0) { - rateLimiter.acquire(1); - } - return val; - } - - @Override - public int read(byte[] buffer, int offset, int length) throws IOException { - int count = in.read(buffer, offset, length); - if (count > 0) { - rateLimiter.acquire(count); - } - return count; - } - - @Override - public void seek(long pos) throws IOException { - ((Seekable) in).seek(pos); - } - - @Override - public long getPos() throws IOException { - return ((Seekable) in).getPos(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return ((Seekable) in).seekToNewSource(targetPos); - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java deleted file mode 100644 index 3ac7a761e7..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.file.streams; - -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.hadoop.fs.FSDataOutputStream; - -/** - * A decorator for {@code OutputStream} which limits the rate at which data may be written. - * Underlying OutputStream is a FSDataOutputStream. - */ -public class RateLimitedOutputStream extends DataOutputStream { - private final RateLimiter writeLimiter; - - public RateLimitedOutputStream(FSDataOutputStream fsDataOutputStream, RateLimiter writeLimiter) { - super(fsDataOutputStream); - this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter; - } - - @Override - public synchronized void write(int i) throws IOException { - writeLimiter.acquire(1); - out.write(i); - } - - @Override - public synchronized void write(byte[] buffer, int offset, int length) throws IOException { - writeLimiter.acquire(length); - out.write(buffer, offset, length); - } - - @Override - public void close() throws IOException { - out.close(); - } - - public long position() { - return ((FSDataOutputStream) out).getPos(); - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index d9bca52264..2d3bc40df8 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.spi.cache.CacheEntry; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.summary.Gatherer.RowRange; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableUtils; @@ -177,8 +178,9 @@ public class SummaryReader { public static SummaryReader load(Configuration conf, RFileSource source, String cacheId, Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory, CryptoService cryptoService) throws IOException { - CachableBuilder cb = new CachableBuilder().input(source.getInputStream(), cacheId) - .length(source.getLength()).conf(conf).cryptoService(cryptoService); + CachableBuilder cb = + new CachableBuilder().input((FSDataInputStream) source.getInputStream(), cacheId) + .length(source.getLength()).conf(conf).cryptoService(cryptoService); return load(new CachableBlockFile.Reader(cb), summarySelector, factory); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java index 471cd28f8a..300b175302 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java @@ -42,15 +42,9 @@ public class CompactionServicesConfig { private final Map<String,String> planners = new HashMap<>(); private final Map<String,Long> rateLimits = new HashMap<>(); private final Map<String,Map<String,String>> options = new HashMap<>(); - long defaultRateLimit; public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default"); - private long getDefaultThroughput() { - return ConfigurationTypeHelper - .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue()); - } - private static Map<String,String> getConfiguration(AccumuloConfiguration aconf) { return aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX); } @@ -84,8 +78,6 @@ public class CompactionServicesConfig { } }); - defaultRateLimit = getDefaultThroughput(); - var diff = Sets.difference(options.keySet(), planners.keySet()); if (!diff.isEmpty()) { @@ -94,10 +86,6 @@ public class CompactionServicesConfig { } } - public long getRateLimit(String serviceName) { - return getRateLimits().getOrDefault(serviceName, defaultRateLimit); - } - @Override public boolean equals(Object o) { if (o instanceof CompactionServicesConfig) { diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java deleted file mode 100644 index c2db7b0e9a..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.util.ratelimit; - -/** Rate limiter from the Guava library. */ -public class GuavaRateLimiter implements RateLimiter { - private final com.google.common.util.concurrent.RateLimiter rateLimiter; - private long currentRate; - - /** - * Constructor - * - * @param initialRate Count of permits which should be made available per second. A non-positive - * rate is taken to indicate there should be no limitation on rate. - */ - public GuavaRateLimiter(long initialRate) { - this.currentRate = initialRate; - this.rateLimiter = com.google.common.util.concurrent.RateLimiter - .create(initialRate > 0 ? initialRate : Long.MAX_VALUE); - } - - @Override - public long getRate() { - return currentRate; - } - - /** - * Change the rate at which permits are made available. - * - * @param newRate Count of permits which should be made available per second. A non-positive rate - * is taken to indicate that there should be no limitation on rate. - */ - public void setRate(long newRate) { - this.rateLimiter.setRate(newRate > 0 ? newRate : Long.MAX_VALUE); - this.currentRate = newRate; - } - - @Override - public void acquire(long numPermits) { - if (this.currentRate > 0) { - while (numPermits > Integer.MAX_VALUE) { - rateLimiter.acquire(Integer.MAX_VALUE); - numPermits -= Integer.MAX_VALUE; - } - if (numPermits > 0) { - rateLimiter.acquire((int) numPermits); - } - } - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java deleted file mode 100644 index 8fae1479ed..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.util.ratelimit; - -/** - * A rate limiter which doesn't actually limit rates at all. - */ -public class NullRateLimiter implements RateLimiter { - public static final NullRateLimiter INSTANCE = new NullRateLimiter(); - - private NullRateLimiter() {} - - @Override - public long getRate() { - return 0; - } - - @Override - public void acquire(long numPermits) {} - -} diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java deleted file mode 100644 index b94ecb2638..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.util.ratelimit; - -public interface RateLimiter { - /** - * Get current QPS of the rate limiter, with a non-positive rate indicating no limit. - */ - long getRate(); - - /** Sleep until the specified number of queries are available. */ - void acquire(long numPermits); -} diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java deleted file mode 100644 index 71e26cde19..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.util.ratelimit; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; - -import java.lang.ref.WeakReference; -import java.util.HashMap; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.core.util.threads.Threads; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides the ability to retrieve a {@link RateLimiter} keyed to a specific string, which will - * dynamically update its rate according to a specified callback function. - */ -public class SharedRateLimiterFactory { - private static final long REPORT_RATE = 60000; - private static final long UPDATE_RATE = 1000; - private static SharedRateLimiterFactory instance = null; - private static ScheduledFuture<?> updateTaskFuture; - private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class); - private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters = - new WeakHashMap<>(); - - private SharedRateLimiterFactory() {} - - /** Get the singleton instance of the SharedRateLimiterFactory. */ - public static synchronized SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) { - if (instance == null) { - instance = new SharedRateLimiterFactory(); - - ScheduledThreadPoolExecutor svc = - ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); - updateTaskFuture = svc.scheduleWithFixedDelay(Threads - .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll), - UPDATE_RATE, UPDATE_RATE, MILLISECONDS); - - ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads - .createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll), - REPORT_RATE, REPORT_RATE, MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); - - } - return instance; - } - - /** - * A callback which provides the current rate for a {@link RateLimiter}. - */ - public interface RateProvider { - /** - * Calculate the current rate for the {@link RateLimiter}. - * - * @return Count of permits which should be provided per second. A non-positive count is taken - * to indicate that no rate limiting should be performed. - */ - long getDesiredRate(); - } - - /** - * Lookup the RateLimiter associated with the specified name, or create a new one for that name. - * - * @param name key for the rate limiter - * @param rateProvider a function which can be called to get what the current rate for the rate - * limiter should be. - */ - public RateLimiter create(String name, RateProvider rateProvider) { - synchronized (activeLimiters) { - if (updateTaskFuture.isDone()) { - log.warn("SharedRateLimiterFactory update task has failed."); - } - var limiterRef = activeLimiters.get(name); - var limiter = limiterRef == null ? null : limiterRef.get(); - if (limiter == null) { - limiter = new SharedRateLimiter(name, rateProvider, rateProvider.getDesiredRate()); - activeLimiters.put(name, new WeakReference<>(limiter)); - } - return limiter; - } - } - - private void copyAndThen(String actionName, Consumer<SharedRateLimiter> action) { - Map<String,SharedRateLimiter> limitersCopy = new HashMap<>(); - // synchronize only for copy - synchronized (activeLimiters) { - activeLimiters.forEach((name, limiterRef) -> { - var limiter = limiterRef.get(); - if (limiter != null) { - limitersCopy.put(name, limiter); - } - }); - } - limitersCopy.forEach((name, limiter) -> { - try { - action.accept(limiter); - } catch (RuntimeException e) { - log.error("Failed to {} limiter {}", actionName, name, e); - } - }); - } - - /** - * Walk through all of the currently active RateLimiters, having each update its current rate. - * This is called periodically so that we can dynamically update as configuration changes. - */ - private void updateAll() { - copyAndThen("update", SharedRateLimiter::update); - } - - /** - * Walk through all of the currently active RateLimiters, having each report its activity to the - * debug log. - */ - private void reportAll() { - copyAndThen("report", SharedRateLimiter::report); - } - - protected class SharedRateLimiter extends GuavaRateLimiter { - private AtomicLong permitsAcquired = new AtomicLong(); - private AtomicLong lastUpdate = new AtomicLong(); - - private final RateProvider rateProvider; - private final String name; - - SharedRateLimiter(String name, RateProvider rateProvider, long initialRate) { - super(initialRate); - this.name = name; - this.rateProvider = rateProvider; - this.lastUpdate.set(System.nanoTime()); - } - - @Override - public void acquire(long numPermits) { - super.acquire(numPermits); - permitsAcquired.addAndGet(numPermits); - } - - /** Poll the callback, updating the current rate if necessary. */ - public void update() { - // Reset rate if needed - long rate = rateProvider.getDesiredRate(); - if (rate != getRate()) { - setRate(rate); - } - } - - /** Report the current throughput and usage of this rate limiter to the debug log. */ - public void report() { - if (log.isDebugEnabled()) { - long duration = NANOSECONDS.toMillis(System.nanoTime() - lastUpdate.get()); - if (duration == 0) { - return; - } - lastUpdate.set(System.nanoTime()); - - long sum = permitsAcquired.get(); - permitsAcquired.set(0); - - if (sum > 0) { - log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, - sum * 1000L / duration, getRate())); - } - } - } - } -} diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java index a220407b92..d7b31bc73d 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java @@ -99,7 +99,7 @@ public abstract class AbstractRFileTest { CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, accumuloConfiguration.getAllCryptoProperties()); - BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, cs); + BCFile.Writer _cbw = new BCFile.Writer(dos, "gz", conf, cs); SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java index cd13745a41..29926b0191 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java @@ -61,7 +61,7 @@ public class CreateCompatTestFile { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); AccumuloConfiguration aconf = DefaultConfiguration.getInstance(); - BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, "gz", conf, + BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), "gz", conf, CryptoFactoryLoader.getServiceForServer(aconf)); RFile.Writer writer = new RFile.Writer(_cbw, 1000); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java index 2065f8b676..b0d2bb0c3e 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java @@ -66,7 +66,7 @@ public class MultiLevelIndexTest { ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); CryptoService cs = CryptoFactoryLoader.getServiceForServer(aconf); - BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", hadoopConf, cs); + BCFile.Writer _cbw = new BCFile.Writer(dos, "gz", hadoopConf, cs); BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize)); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index 8c66bb6ead..5e7f237eb3 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -153,7 +153,7 @@ public class MultiThreadedRFileTest { FileSystem fs = FileSystem.newInstance(conf); Path path = new Path("file://" + rfile); dos = fs.create(path, true); - BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, + BCFile.Writer _cbw = new BCFile.Writer(dos, "gz", conf, CryptoFactoryLoader.getServiceForServer(accumuloConfiguration)); SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration); diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java deleted file mode 100644 index 8eb3a5ef67..0000000000 --- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.file.streams; - -import static org.apache.accumulo.core.util.LazySingletons.RANDOM; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.hadoop.fs.Seekable; -import org.easymock.EasyMock; -import org.junit.jupiter.api.Test; - -public class RateLimitedInputStreamTest { - - @Test - public void permitsAreProperlyAcquired() throws Exception { - // Create variables for tracking behaviors of mock object - AtomicLong rateLimiterPermitsAcquired = new AtomicLong(); - // Construct mock object - RateLimiter rateLimiter = EasyMock.niceMock(RateLimiter.class); - // Stub Mock Method - rateLimiter.acquire(EasyMock.anyLong()); - EasyMock.expectLastCall() - .andAnswer(() -> rateLimiterPermitsAcquired.addAndGet(EasyMock.getCurrentArgument(0))) - .anyTimes(); - EasyMock.replay(rateLimiter); - - long bytesRetrieved = 0; - try (InputStream is = new RateLimitedInputStream(new RandomInputStream(), rateLimiter)) { - for (int i = 0; i < 100; ++i) { - int count = Math.abs(RANDOM.get().nextInt()) % 65536; - int countRead = is.read(new byte[count]); - assertEquals(count, countRead); - bytesRetrieved += count; - } - } - assertEquals(bytesRetrieved, rateLimiterPermitsAcquired.get()); - } - - private static class RandomInputStream extends InputStream implements Seekable { - - @Override - public int read() { - return RANDOM.get().nextInt() & 0xff; - } - - @Override - public void seek(long pos) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public long getPos() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean seekToNewSource(long targetPos) { - throw new UnsupportedOperationException("Not supported yet."); - } - - } - -} diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java deleted file mode 100644 index 8df1a3104e..0000000000 --- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.file.streams; - -import static org.apache.accumulo.core.util.LazySingletons.RANDOM; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.easymock.EasyMock; -import org.junit.jupiter.api.Test; - -import com.google.common.io.CountingOutputStream; - -public class RateLimitedOutputStreamTest { - - @Test - public void permitsAreProperlyAcquired() throws Exception { - // Create variables for tracking behaviors of mock object - AtomicLong rateLimiterPermitsAcquired = new AtomicLong(); - // Construct mock object - RateLimiter rateLimiter = EasyMock.niceMock(RateLimiter.class); - // Stub Mock Method - rateLimiter.acquire(EasyMock.anyLong()); - EasyMock.expectLastCall() - .andAnswer(() -> rateLimiterPermitsAcquired.addAndGet(EasyMock.getCurrentArgument(0))) - .anyTimes(); - EasyMock.replay(rateLimiter); - - long bytesWritten = 0; - try (RateLimitedOutputStream os = - new RateLimitedOutputStream(new NullOutputStream(), rateLimiter)) { - for (int i = 0; i < 100; ++i) { - byte[] bytes = new byte[Math.abs(RANDOM.get().nextInt() % 65536)]; - os.write(bytes); - bytesWritten += bytes.length; - } - assertEquals(bytesWritten, os.position()); - } - assertEquals(bytesWritten, rateLimiterPermitsAcquired.get()); - } - - public static class NullOutputStream extends FSDataOutputStream { - public NullOutputStream() { - super(new CountingOutputStream(OutputStream.nullOutputStream()), null); - } - } - -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 1c5b35e81f..0c5bad07c4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -64,7 +64,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; @@ -94,10 +93,6 @@ public class FileCompactor implements Callable<CompactionStats> { IteratorScope getIteratorScope(); - RateLimiter getReadLimiter(); - - RateLimiter getWriteLimiter(); - SystemIteratorEnvironment createIteratorEnv(ServerContext context, AccumuloConfiguration acuTableConf, TableId tableId); @@ -233,7 +228,7 @@ public class FileCompactor implements Callable<CompactionStats> { WriterBuilder outBuilder = fileFactory.newWriterBuilder().forFile(outputFile, ns, ns.getConf(), cryptoService) - .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter()); + .withTableConfiguration(acuTableConf); if (dropCacheBehindOutput) { outBuilder.dropCachesBehind(); } @@ -338,8 +333,7 @@ public class FileCompactor implements Callable<CompactionStats> { FileSKVIterator reader; reader = fileFactory.newReaderBuilder().forFile(dataFile, fs, fs.getConf(), cryptoService) - .withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter()) - .dropCachesBehind().build(); + .withTableConfiguration(acuTableConf).dropCachesBehind().build(); readers.add(reader); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java index 59235b6cec..ea1c5245e1 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/ExtCEnv.java @@ -27,8 +27,6 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; -import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.FileCompactor.CompactionEnv; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; @@ -74,16 +72,6 @@ public class ExtCEnv implements CompactionEnv { return IteratorScope.majc; } - @Override - public RateLimiter getReadLimiter() { - return NullRateLimiter.INSTANCE; - } - - @Override - public RateLimiter getWriteLimiter() { - return NullRateLimiter.INSTANCE; - } - @Override public SystemIteratorEnvironment createIteratorEnv(ServerContext context, AccumuloConfiguration acuTableConf, TableId tableId) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java index ea683b04bf..aaa8062550 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinCEnv.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.FileCompactor; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; @@ -51,16 +50,6 @@ public class MinCEnv implements FileCompactor.CompactionEnv { return IteratorUtil.IteratorScope.minc; } - @Override - public RateLimiter getReadLimiter() { - return null; - } - - @Override - public RateLimiter getWriteLimiter() { - return null; - } - @Override public SystemIteratorEnvironment createIteratorEnv(ServerContext context, AccumuloConfiguration acuTableConf, TableId tableId) { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java deleted file mode 100644 index 0b4280f8d1..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionRateLimitingIT.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.compaction; - -import static org.apache.accumulo.core.util.LazySingletons.RANDOM; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Map; - -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.Test; - -public class CompactionRateLimitingIT extends ConfigurableMacBase { - public static final long BYTES_TO_WRITE = 10 * 1024 * 1024; - public static final long RATE = 1 * 1024 * 1024; - - protected Property getThroughputProp() { - return Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT; - } - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) { - cfg.setProperty(getThroughputProp(), RATE + "B"); - cfg.setProperty(Property.TABLE_MAJC_RATIO, "20"); - cfg.setProperty(Property.TABLE_FILE_COMPRESSION_TYPE, "none"); - - cfg.setProperty("tserver.compaction.major.service.test.rate.limit", RATE + "B"); - cfg.setProperty("tserver.compaction.major.service.test.planner", - DefaultCompactionPlanner.class.getName()); - cfg.setProperty("tserver.compaction.major.service.test.planner.opts.executors", - "[{'name':'all','numThreads':2}]".replaceAll("'", "\"")); - - } - - @Test - public void majorCompactionsAreRateLimited() throws Exception { - long bytesWritten = 0; - String[] tableNames = getUniqueNames(1); - - try (AccumuloClient client = - getCluster().createAccumuloClient("root", new PasswordToken(ROOT_PASSWORD))) { - - for (int i = 0; i < tableNames.length; i++) { - String tableName = tableNames[i]; - - NewTableConfiguration ntc = new NewTableConfiguration(); - if (i == 1) { - ntc.setProperties(Map.of("table.compaction.dispatcher.opts.service", "test")); - } - - client.tableOperations().create(tableName, ntc); - try (BatchWriter bw = client.createBatchWriter(tableName)) { - while (bytesWritten < BYTES_TO_WRITE) { - byte[] rowKey = new byte[32]; - RANDOM.get().nextBytes(rowKey); - - byte[] qual = new byte[32]; - RANDOM.get().nextBytes(qual); - - byte[] value = new byte[1024]; - RANDOM.get().nextBytes(value); - - Mutation m = new Mutation(rowKey); - m.put(new byte[0], qual, value); - bw.addMutation(m); - - bytesWritten += rowKey.length + qual.length + value.length; - } - } - - client.tableOperations().flush(tableName, null, null, true); - - long compactionStart = System.currentTimeMillis(); - client.tableOperations().compact(tableName, null, null, false, true); - long duration = System.currentTimeMillis() - compactionStart; - // The rate will be "bursty", try to account for that by taking 80% of the expected rate - // (allow for 20% under the maximum expected duration) - String message = String.format( - "Expected a compaction rate of no more than %,d bytes/sec, but saw a rate of %,f bytes/sec", - (int) (0.8d * RATE), 1000.0 * bytesWritten / duration); - assertTrue(duration > 1000L * 0.8 * BYTES_TO_WRITE / RATE, message); - } - } - } -}