This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 19a4ebc0301a9e8132697f15315970a3b5cd863d Merge: 4bd6deaac2 9e40a1e068 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Jan 8 14:56:28 2025 +0000 Merge branch '2.1' into 3.1 .../main/java/org/apache/accumulo/core/conf/Property.java | 10 ++++++++-- .../blockfile/cache/impl/BlockCacheManagerFactory.java | 10 ++++++++-- .../core/file/blockfile/cache/BlockCacheFactoryTest.java | 4 ++-- .../core/file/blockfile/cache/TestLruBlockCache.java | 14 +++++++------- .../apache/accumulo/core/file/rfile/AbstractRFileTest.java | 2 +- .../org/apache/accumulo/core/file/rfile/RFileTest.java | 2 +- 6 files changed, 27 insertions(+), 15 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6b70bd059d,981799f6d0..92ce7812bb --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -283,6 -294,12 +283,9 @@@ public enum Property + " user-implementations of pluggable Accumulo features, such as the balancer" + " or volume chooser.", "2.0.0"), + GENERAL_CACHE_MANAGER_IMPL("general.block.cache.manager.class", - "org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING, - "Specifies the class name of the block cache factory implementation." - + " Alternative implementation is" - + " org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager.", - "2.1.4"), ++ TinyLfuBlockCacheManager.class.getName(), PropertyType.STRING, ++ "Specifies the class name of the block cache factory implementation.", "2.1.4"), GENERAL_DELEGATION_TOKEN_LIFETIME("general.delegation.token.lifetime", "7d", PropertyType.TIMEDURATION, "The length of time that delegation tokens and secret keys are valid.", "1.7.0"), @@@ -514,8 -538,13 +517,10 @@@ "Time to wait for clients to continue scans before closing a session.", "1.3.5"), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches.", "1.3.5"), + @Deprecated(since = "2.1.4") + @ReplacedBy(property = Property.GENERAL_CACHE_MANAGER_IMPL) - TSERV_CACHE_MANAGER_IMPL("general.cache.manager.class", - "org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING, - "Specifies the class name of the block cache factory implementation." - + " Alternative implementation is" - + " org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager.", + TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", TinyLfuBlockCacheManager.class.getName(), + PropertyType.STRING, "Specifies the class name of the block cache factory implementation.", "2.0.0"), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for RFile data blocks.", "1.3.5"), @@@ -1532,11 -1878,12 +1537,12 @@@ COMPACTOR_PORTSEARCH, TSERV_PORTSEARCH, // max message options - TSERV_MAX_MESSAGE_SIZE, GENERAL_MAX_MESSAGE_SIZE, RPC_MAX_MESSAGE_SIZE, + RPC_MAX_MESSAGE_SIZE, // block cache options - TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, - TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, + GENERAL_CACHE_MANAGER_IMPL, TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, + TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, + SSERV_SUMMARYCACHE_SIZE, // blocksize options TSERV_DEFAULT_BLOCKSIZE, SSERV_DEFAULT_BLOCKSIZE, diff --cc core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java index 655b7b3ad6,d62a5e8d46..d0b34164e5 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/impl/BlockCacheManagerFactory.java @@@ -35,11 -35,14 +35,14 @@@ public class BlockCacheManagerFactory * * @param conf accumulo configuration * @return block cache manager instance - * @throws Exception error loading block cache manager implementation class + * @throws ReflectiveOperationException error loading block cache manager implementation class */ public static synchronized BlockCacheManager getInstance(AccumuloConfiguration conf) - throws Exception { + throws ReflectiveOperationException { - String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + @SuppressWarnings("deprecation") + var cacheManagerProp = + conf.resolve(Property.GENERAL_CACHE_MANAGER_IMPL, Property.TSERV_CACHE_MANAGER_IMPL); + String impl = conf.get(cacheManagerProp); Class<? extends BlockCacheManager> clazz = ClassLoaderUtil.loadClass(impl, BlockCacheManager.class); LOG.info("Created new block cache manager of type: {}", clazz.getSimpleName()); @@@ -51,11 -54,14 +54,14 @@@ * * @param conf accumulo configuration * @return block cache manager instance - * @throws Exception error loading block cache manager implementation class + * @throws ReflectiveOperationException error loading block cache manager implementation class */ public static synchronized BlockCacheManager getClientInstance(AccumuloConfiguration conf) - throws Exception { + throws ReflectiveOperationException { - String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + @SuppressWarnings("deprecation") + var cacheManagerProp = + conf.resolve(Property.GENERAL_CACHE_MANAGER_IMPL, Property.TSERV_CACHE_MANAGER_IMPL); + String impl = conf.get(cacheManagerProp); Class<? extends BlockCacheManager> clazz = Class.forName(impl).asSubclass(BlockCacheManager.class); LOG.info("Created new block cache factory of type: {}", clazz.getSimpleName()); diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java index d65991590a,0000000000..1c9e8a6a04 mode 100644,000000..100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/AbstractRFileTest.java @@@ -1,301 -1,0 +1,301 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.accumulo.core.client.sample.Sampler; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.crypto.CryptoFactoryLoader; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; +import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; +import org.apache.accumulo.core.file.rfile.RFile.FencedReader; +import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream; +import org.apache.accumulo.core.file.rfile.bcfile.BCFile; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.sample.impl.SamplerFactory; +import org.apache.accumulo.core.spi.cache.BlockCache; +import org.apache.accumulo.core.spi.cache.BlockCacheManager; +import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; +import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; + +public abstract class AbstractRFileTest { + + protected static final SecureRandom random = new SecureRandom(); + protected static final Collection<ByteSequence> EMPTY_COL_FAMS = List.of(); + + protected AccumuloConfiguration conf = null; + + public static class TestRFile { + + protected Configuration conf = new Configuration(); + public RFile.Writer writer; + protected ByteArrayOutputStream baos; + protected FSDataOutputStream dos; + protected SeekableByteArrayInputStream bais; + protected FSDataInputStream in; + protected AccumuloConfiguration accumuloConfiguration; + public Reader reader; + public SortedKeyValueIterator<Key,Value> iter; + private BlockCacheManager manager; + + public TestRFile(AccumuloConfiguration accumuloConfiguration) { + this.accumuloConfiguration = accumuloConfiguration; + if (this.accumuloConfiguration == null) { + this.accumuloConfiguration = DefaultConfiguration.getInstance(); + } + } + + public void openWriter(boolean startDLG) throws IOException { + openWriter(startDLG, 1000); + } + + public void openWriter(boolean startDLG, int blockSize) throws IOException { + openWriter(startDLG, blockSize, 1000); + } + + public void openWriter(boolean startDLG, int blockSize, int indexBlockSize) throws IOException { + baos = new ByteArrayOutputStream(); + dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); + CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, + accumuloConfiguration.getAllCryptoProperties()); + + BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, cs); + + SamplerConfigurationImpl samplerConfig = + SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration); + Sampler sampler = null; + + if (samplerConfig != null) { + sampler = SamplerFactory.newSampler(samplerConfig, accumuloConfiguration); + } + + writer = new RFile.Writer(_cbw, blockSize, indexBlockSize, samplerConfig, sampler); + + if (startDLG) { + writer.startDefaultLocalityGroup(); + } + } + + public void openWriter() throws IOException { + openWriter(1000); + } + + public void openWriter(int blockSize) throws IOException { + openWriter(true, blockSize); + } + + public void closeWriter() throws IOException { + dos.flush(); + writer.close(); + dos.close(); + if (baos != null) { + baos.close(); + } + } + + public void openReader(Range fence) throws IOException { + openReader(true, fence); + } + + public void openReader() throws IOException { + openReader(true); + } + + public void openReader(boolean cfsi) throws IOException { + openReader(cfsi, null); + } + + public void openReader(boolean cfsi, Range fence) throws IOException { + int fileLength = 0; + byte[] data = null; + data = baos.toByteArray(); + + bais = new SeekableByteArrayInputStream(data); + in = new FSDataInputStream(bais); + fileLength = data.length; + + DefaultConfiguration dc = DefaultConfiguration.getInstance(); + ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); ++ cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); + try { + manager = BlockCacheManagerFactory.getInstance(cc); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException("Error creating BlockCacheManager", e); + } + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); + cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); + manager.start(BlockCacheConfiguration.forTabletServer(cc)); + BlockCache indexCache = manager.getBlockCache(CacheType.INDEX); + BlockCache dataCache = manager.getBlockCache(CacheType.DATA); + + CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, + accumuloConfiguration.getAllCryptoProperties()); + + CachableBuilder cb = new CachableBuilder().input(in, "source-1").length(fileLength).conf(conf) + .cacheProvider(new BasicCacheProvider(indexCache, dataCache)).cryptoService(cs); + reader = new RFile.Reader(cb); + if (cfsi) { + iter = new ColumnFamilySkippingIterator(reader); + } + if (fence != null) { + iter = new FencedReader(reader, fence); + } + + checkIndex(reader); + } + + public void closeReader() throws IOException { + reader.close(); + in.close(); + if (null != manager) { + manager.stop(); + } + } + + public void seek(Key nk) throws IOException { + iter.seek(new Range(nk, null), EMPTY_COL_FAMS, false); + } + } + + protected static void checkIndex(Reader reader) throws IOException { + FileSKVIterator indexIter = reader.getIndex(); + + if (indexIter.hasTop()) { + Key lastKey = new Key(indexIter.getTopKey()); + + if (reader.getFirstRow().compareTo(lastKey.getRow()) > 0) { + throw new IllegalStateException( + "First key out of order " + reader.getFirstRow() + " " + lastKey); + } + + indexIter.next(); + + while (indexIter.hasTop()) { + if (lastKey.compareTo(indexIter.getTopKey()) > 0) { + throw new IllegalStateException( + "Indext out of order " + lastKey + " " + indexIter.getTopKey()); + } + + lastKey = new Key(indexIter.getTopKey()); + indexIter.next(); + + } + + if (!reader.getLastRow().equals(lastKey.getRow())) { + throw new IllegalStateException( + "Last key out of order " + reader.getLastRow() + " " + lastKey); + } + } + } + + static Key newKey(String row, String cf, String cq, String cv, long ts) { + return new Key(row.getBytes(UTF_8), cf.getBytes(UTF_8), cq.getBytes(UTF_8), cv.getBytes(UTF_8), + ts); + } + + static Value newValue(String val) { + return new Value(val); + } + + static String formatString(String prefix, int i) { + return String.format(prefix + "%06d", i); + } + + protected void verify(TestRFile trf, Iterator<Key> eki, Iterator<Value> evi) throws IOException { + verify(trf.iter, eki, evi); + } + + protected void verify(SortedKeyValueIterator<Key,Value> iter, Iterator<Key> eki, + Iterator<Value> evi) throws IOException { + + while (iter.hasTop()) { + Key ek = eki.next(); + Value ev = evi.next(); + + assertEquals(ek, iter.getTopKey()); + assertEquals(ev, iter.getTopValue()); + + iter.next(); + } + + assertFalse(eki.hasNext()); + assertFalse(evi.hasNext()); + } + + protected void verifyEstimated(FileSKVIterator reader) throws IOException { + // Test estimated entries for 1 row + long estimated = reader.estimateOverlappingEntries(new KeyExtent(TableId.of("1"), + new Text(formatString("r_", 1)), new Text(formatString("r_", 0)))); + // One row contains 256 but the estimate will be more with overlapping index entries + assertEquals(264, estimated); + + // Test for 2 rows + estimated = reader.estimateOverlappingEntries(new KeyExtent(TableId.of("1"), + new Text(formatString("r_", 2)), new Text(formatString("r_", 0)))); + // Two rows contains 512 but the estimate will be more with overlapping index entries + assertEquals(516, estimated); + + // 3 rows + // Actual should be 768, estimate is 772 + estimated = reader.estimateOverlappingEntries( + new KeyExtent(TableId.of("1"), null, new Text(formatString("r_", 0)))); + assertEquals(772, estimated); + + // Tests when full number of entries should return + estimated = reader.estimateOverlappingEntries(new KeyExtent(TableId.of("1"), null, null)); + assertEquals(1024, estimated); + + estimated = reader.estimateOverlappingEntries( + new KeyExtent(TableId.of("1"), new Text(formatString("r_", 4)), null)); + assertEquals(1024, estimated); + } +} diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 81c76d58ff,33545639ef..babd621516 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@@ -1584,7 -1726,7 +1584,7 @@@ public class RFileTest extends Abstract byte[] data = baos.toByteArray(); SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); FSDataInputStream in2 = new FSDataInputStream(bais); - aconf.set(Property.TSERV_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); - aconf.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); ++ aconf.set(Property.GENERAL_CACHE_MANAGER_IMPL, TinyLfuBlockCacheManager.class.getName()); aconf.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); aconf.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); aconf.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000));