This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 6e974f1b4d3a8a4e72cfbf3d84e44612e65d97de Merge: 37279e8008 6bcb4abcdc Author: Dave Marion <[email protected]> AuthorDate: Thu Jun 12 12:41:56 2025 +0000 Merge branch '2.1' .../java/org/apache/accumulo/core/data/Column.java | 6 +- .../apache/accumulo/core/dataImpl/RangeImpl.java | 12 ++-- .../file/blockfile/impl/CachableBlockFile.java | 4 -- .../system/ColumnQualifierFilter.java | 8 +-- .../accumulo/core/conf/SiteConfigurationTest.java | 4 +- .../apache/accumulo/core/file/rfile/RFileTest.java | 13 ---- pom.xml | 5 +- .../AuthenticationTokenSecretManager.java | 3 + .../rest/tservers/TabletServerInformation.java | 4 ++ .../monitor/rest/tservers/TabletServers.java | 4 ++ .../org/apache/accumulo/tserver/log/DfsLogger.java | 44 +++++++------- .../org/apache/accumulo/tserver/log/LogSorter.java | 4 +- .../accumulo/tserver/log/RecoveryLogsIterator.java | 2 +- .../accumulo/tserver/log/SortedLogRecovery.java | 71 +++++++++++----------- .../apache/accumulo/tserver/logger/LogFileKey.java | 61 +++++++++++++++++-- .../accumulo/tserver/logger/LogFileValue.java | 10 ++- .../apache/accumulo/tserver/logger/LogReader.java | 12 ++-- .../apache/accumulo/tserver/util/CreateEmpty.java | 4 +- .../accumulo/tserver/log/LogFileKeyTest.java | 8 +-- .../tserver/log/RecoveryLogsIteratorTest.java | 44 +++++++------- .../tserver/log/SortedLogRecoveryTest.java | 16 ++--- .../accumulo/tserver/logger/LogFileTest.java | 66 ++++++++++---------- .../accumulo/tserver/util/CreateEmptyTest.java | 6 +- .../main/java/org/apache/accumulo/shell/Shell.java | 2 +- .../apache/accumulo/test/TestRandomDeletes.java | 2 +- .../shell/ShellAuthenticatorIT_SimpleSuite.java | 4 +- .../accumulo/test/shell/ShellIT_SimpleSuite.java | 4 +- 27 files changed, 238 insertions(+), 185 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index f58e749ab2,854c74154b..50f3e0fcd8 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@@ -44,10 -47,9 +44,8 @@@ import org.apache.hadoop.fs.Path import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.cache.Cache; +import com.github.benmanes.caffeine.cache.Cache; - import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - /** * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks * and metadatablocks diff --cc core/src/test/java/org/apache/accumulo/core/conf/SiteConfigurationTest.java index 0f03c1397f,3850626037..b502cb9427 --- a/core/src/test/java/org/apache/accumulo/core/conf/SiteConfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/SiteConfigurationTest.java @@@ -38,11 -38,9 +38,9 @@@ public class SiteConfigurationTest @Test public void testOnlySensitivePropertiesExtractedFromCredentialProvider() throws SecurityException { - // site-cfg.jceks={'ignored.property'=>'ignored', 'instance.secret'=>'mysecret', - // 'general.rpc.timeout'=>'timeout'} URL keystore = SiteConfigurationTest.class.getResource("/site-cfg.jceks"); assertNotNull(keystore); - String credProvPath = "jceks://file" + new File(keystore.getFile()).getAbsolutePath(); + String credProvPath = "jceks://file" + Path.of(keystore.getFile()).toAbsolutePath(); var overrides = Map.of(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvPath); diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 01e7b0f21f,4f04025eb2..e840b213f0 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@@ -172,19 -179,169 +172,6 @@@ public class RFileTest extends Abstract } - private static void checkIndex(Reader reader) throws IOException { - FileSKVIterator indexIter = reader.getIndex(); - - if (indexIter.hasTop()) { - Key lastKey = new Key(indexIter.getTopKey()); - - if (reader.getFirstKey().compareTo(lastKey) > 0) { - throw new RuntimeException( - "First key out of order " + reader.getFirstKey() + " " + lastKey); - } - - indexIter.next(); - - while (indexIter.hasTop()) { - if (lastKey.compareTo(indexIter.getTopKey()) > 0) { - throw new RuntimeException( - "Indext out of order " + lastKey + " " + indexIter.getTopKey()); - } - - lastKey = new Key(indexIter.getTopKey()); - indexIter.next(); - - } - - if (!reader.getLastKey().equals(lastKey)) { - throw new RuntimeException("Last key out of order " + reader.getLastKey() + " " + lastKey); - } - } - } - - public static class TestRFile { - - protected Configuration conf = new Configuration(); - public RFile.Writer writer; - protected ByteArrayOutputStream baos; - protected FSDataOutputStream dos; - protected SeekableByteArrayInputStream bais; - protected FSDataInputStream in; - protected AccumuloConfiguration accumuloConfiguration; - public Reader reader; - public SortedKeyValueIterator<Key,Value> iter; - private BlockCacheManager manager; - - public TestRFile(AccumuloConfiguration accumuloConfiguration) { - this.accumuloConfiguration = accumuloConfiguration; - if (this.accumuloConfiguration == null) { - this.accumuloConfiguration = DefaultConfiguration.getInstance(); - } - } - - public void openWriter(boolean startDLG) throws IOException { - openWriter(startDLG, 1000); - } - - public void openWriter(boolean startDLG, int blockSize) throws IOException { - baos = new ByteArrayOutputStream(); - dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); - CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, - accumuloConfiguration.getAllCryptoProperties()); - - BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, cs); - - SamplerConfigurationImpl samplerConfig = - SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration); - Sampler sampler = null; - - if (samplerConfig != null) { - sampler = SamplerFactory.newSampler(samplerConfig, accumuloConfiguration); - } - - writer = new RFile.Writer(_cbw, blockSize, 1000, samplerConfig, sampler); - - if (startDLG) { - writer.startDefaultLocalityGroup(); - } - } - - public void openWriter() throws IOException { - openWriter(1000); - } - - public void openWriter(int blockSize) throws IOException { - openWriter(true, blockSize); - } - - public void closeWriter() throws IOException { - dos.flush(); - writer.close(); - dos.close(); - if (baos != null) { - baos.close(); - } - } - - public void openReader() throws IOException { - openReader(true); - } - - public void openReader(boolean cfsi) throws IOException { - int fileLength = 0; - byte[] data = null; - data = baos.toByteArray(); - - bais = new SeekableByteArrayInputStream(data); - in = new FSDataInputStream(bais); - fileLength = data.length; - - DefaultConfiguration dc = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.GENERAL_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName()); - try { - manager = BlockCacheManagerFactory.getInstance(cc); - } catch (Exception e) { - throw new RuntimeException("Error creating BlockCacheManager", e); - } - cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); - cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); - cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); - manager.start(BlockCacheConfiguration.forTabletServer(cc)); - LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); - LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA); - - CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, - accumuloConfiguration.getAllCryptoProperties()); - - CachableBuilder cb = new CachableBuilder().input(in, "source-1").length(fileLength).conf(conf) - .cacheProvider(new BasicCacheProvider(indexCache, dataCache)).cryptoService(cs); - reader = new RFile.Reader(cb); - if (cfsi) { - iter = new ColumnFamilySkippingIterator(reader); - } - - checkIndex(reader); - } - - public void closeReader() throws IOException { - reader.close(); - in.close(); - if (null != manager) { - manager.stop(); - } - } - - public void seek(Key nk) throws IOException { - iter.seek(new Range(nk, null), EMPTY_COL_FAMS, false); - } - } - -- static Key newKey(String row, String cf, String cq, String cv, long ts) { -- return new Key(row.getBytes(UTF_8), cf.getBytes(UTF_8), cq.getBytes(UTF_8), cv.getBytes(UTF_8), -- ts); -- } -- -- static Value newValue(String val) { -- return new Value(val); -- } -- -- static String formatString(String prefix, int i) { -- return String.format(prefix + "%06d", i); -- } - - private AccumuloConfiguration conf = null; -- @Test public void test1() throws IOException { diff --cc pom.xml index b07534b8aa,a46d68dfd2..4073010521 --- a/pom.xml +++ b/pom.xml @@@ -695,8 -737,8 +695,9 @@@ <effort>Max</effort> <failOnError>true</failOnError> <includeTests>true</includeTests> + <maxHeap>1024</maxHeap> <maxRank>16</maxRank> + <omitVisitors>ConstructorThrow,SharedVariableAtomicityDetector</omitVisitors> <jvmArgs>-Dcom.overstock.findbugs.ignore=com.google.common.util.concurrent.RateLimiter,com.google.common.hash.Hasher,com.google.common.hash.HashCode,com.google.common.hash.HashFunction,com.google.common.hash.Hashing,com.google.common.cache.Cache,com.google.common.io.CountingOutputStream,com.google.common.io.ByteStreams,com.google.common.cache.LoadingCache,com.google.common.base.Stopwatch,com.google.common.cache.RemovalNotification,com.google.common.util.concurrent.Uninterrup [...] <plugins combine.children="append"> <plugin> diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index b2a1d8ba1c,f2a1a5c990..ae387ec484 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@@ -169,8 -178,8 +169,8 @@@ public class RecoveryLogsIterato if (iterator.hasNext()) { Key firstKey = iterator.next().getKey(); LogFileKey key = LogFileKey.fromKey(firstKey); - if (key.event != LogEvents.OPEN) { + if (key.getEvent() != LogEvents.OPEN) { - throw new IllegalStateException("First log entry is not OPEN " + fullLogPath); + throw new IllegalStateException("First log entry is not OPEN " + sortedLogs); } } } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index cf6649d0ac,40e8496ba7..4c836b2e01 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@@ -265,10 -258,10 +268,10 @@@ public class SortedLogRecovery return recoverySeq; } - private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, int tabletId, - long recoverySeq) throws IOException { + private void playbackMutations(List<ResolvedSortedLog> recoveryLogs, MutationReceiver mr, + int tabletId, long recoverySeq) throws IOException { LogFileKey start = minKey(MUTATION, tabletId); - start.seq = recoverySeq; + start.setSeq(recoverySeq); LogFileKey end = maxKey(MUTATION, tabletId); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java index 2f15d2a382,0000000000..b9f5d9fac9 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java @@@ -1,198 -1,0 +1,198 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.util; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.tserver.log.DfsLogger.LOG_FILE_HEADER_V4; +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl; +import org.apache.accumulo.core.crypto.CryptoUtils; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFileOperations; +import org.apache.accumulo.core.file.rfile.bcfile.Compression; +import org.apache.accumulo.core.metadata.UnreferencedTabletFile; +import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; +import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.accumulo.tserver.logger.LogFileKey; +import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.IParameterValidator; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.auto.service.AutoService; + +/** + * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally + * to a path. + */ +@AutoService(KeywordExecutable.class) +public class CreateEmpty implements KeywordExecutable { + private static final Logger LOG = LoggerFactory.getLogger(CreateEmpty.class); + public static final String RF_EXTENSION = ".rf"; + public static final String WAL_EXTENSION = ".wal"; + + public static class MatchesValidFileExtension implements IParameterValidator { + @Override + public void validate(String name, String value) throws ParameterException { + if (value.endsWith(RF_EXTENSION) || value.endsWith(WAL_EXTENSION)) { + return; + } + throw new ParameterException("File must end with either " + RF_EXTENSION + " or " + + WAL_EXTENSION + " and '" + value + "' does not."); + } + } + + public static class IsSupportedCompressionAlgorithm implements IParameterValidator { + @Override + public void validate(String name, String value) throws ParameterException { + List<String> algorithms = Compression.getSupportedAlgorithms(); + if (!algorithms.contains(value)) { + throw new ParameterException("Compression codec must be one of " + algorithms); + } + } + } + + static class Opts extends ConfigOpts { + @Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.", + validateWith = IsSupportedCompressionAlgorithm.class) + String codec = new NoCompression().getName(); + @Parameter( + description = " <path> { <path> ... } Each path given is a URL." + + " Relative paths are resolved according to the default filesystem defined in" + + " your Hadoop configuration, which is usually an HDFS instance.", + required = true, validateWith = MatchesValidFileExtension.class) + List<String> files = new ArrayList<>(); + + public enum OutFileType { + RF, WAL + } + + // rfile as default keeps previous behaviour + @Parameter(names = "--type") + public OutFileType fileType = OutFileType.RF; + + } + + public static void main(String[] args) throws Exception { + new CreateEmpty().execute(args); + } + + @Override + public String keyword() { + return "create-empty"; + } + + @Override + public String description() { + return "Creates empty RFiles (RF) or empty write-ahead log (WAL) files for emergency recovery"; + } + + @Override + public void execute(String[] args) throws Exception { + + Opts opts = new Opts(); + opts.parseArgs("accumulo create-empty", args); + + var siteConfig = opts.getSiteConfiguration(); + try (ServerContext context = new ServerContext(siteConfig)) { + switch (opts.fileType) { + case RF: + createEmptyRFile(opts, context); + break; + case WAL: + createEmptyWal(opts, context); + break; + default: + throw new ParameterException("file type must be RF or WAL, received: " + opts.fileType); + } + } + } + + void createEmptyRFile(final Opts opts, final ServerContext context) throws IOException { + var vm = context.getVolumeManager(); + + CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.TABLE); + CryptoService cryptoService = context.getCryptoFactory().getService(env, + context.getConfiguration().getAllCryptoProperties()); + + for (String filename : opts.files) { + Path path = new Path(filename); + checkFileExists(path, vm); + UnreferencedTabletFile tabletFile = + UnreferencedTabletFile.of(vm.getFileSystemByPath(path), path); + LOG.info("Writing to file '{}'", tabletFile); + FileSKVWriter writer = new RFileOperations().newWriterBuilder() + .forFile(tabletFile, vm.getFileSystemByPath(path), context.getHadoopConf(), cryptoService) + .withTableConfiguration(DefaultConfiguration.getInstance()).withCompression(opts.codec) + .build(); + writer.close(); + } + } + + void createEmptyWal(Opts opts, ServerContext context) throws IOException { + final LogFileValue EMPTY = new LogFileValue(); + + var vm = context.getVolumeManager(); + + for (String filename : opts.files) { + Path path = new Path(filename); + checkFileExists(path, vm); + try (var out = new DataOutputStream(vm.create(path))) { + LOG.info("Output file: {}", path); + + out.write(LOG_FILE_HEADER_V4.getBytes(UTF_8)); + + CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL); + CryptoService cryptoService = context.getCryptoFactory().getService(env, + context.getConfiguration().getAllCryptoProperties()); + + byte[] cryptoParams = cryptoService.getFileEncrypter(env).getDecryptionParameters(); + CryptoUtils.writeParams(cryptoParams, out); + + LogFileKey key = new LogFileKey(); - key.event = OPEN; - key.tserverSession = ""; ++ key.setEvent(OPEN); ++ key.setTserverSession(""); + + key.write(out); + EMPTY.write(out); + } + } + } + + private void checkFileExists(final Path path, final VolumeManager vm) throws IOException { + if (vm.exists(path)) { + throw new IllegalArgumentException(path + " exists"); + } + } +} diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java index e841bd6c38,0000000000..5235acd15c mode 100644,000000..100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java @@@ -1,263 -1,0 +1,263 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.util; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.tserver.log.DfsLogger.LOG_FILE_HEADER_V4; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl; +import org.apache.accumulo.core.crypto.CryptoUtils; +import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; +import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.tserver.WithTestNames; +import org.apache.accumulo.tserver.log.DfsLogger; +import org.apache.accumulo.tserver.logger.LogEvents; +import org.apache.accumulo.tserver.logger.LogFileKey; +import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public class CreateEmptyTest extends WithTestNames { + @TempDir + private static java.nio.file.Path tempDir; + + private java.nio.file.Path perTestTempSubDir; + + private ServerContext context; + + @BeforeEach + public void init() throws IOException { + ConfigurationCopy config = new ConfigurationCopy(DefaultConfiguration.getInstance()); + config.set(Property.INSTANCE_VOLUMES.getKey(), "file:///"); + + context = mock(ServerContext.class); + expect(context.getCryptoFactory()).andReturn(new GenericCryptoServiceFactory()).anyTimes(); + expect(context.getConfiguration()).andReturn(config).anyTimes(); + expect(context.getHadoopConf()).andReturn(new Configuration()).anyTimes(); + VolumeManager volumeManager = VolumeManagerImpl.get(config, new Configuration()); + expect(context.getVolumeManager()).andReturn(volumeManager).anyTimes(); + replay(context); + + perTestTempSubDir = Files.createDirectories(tempDir.resolve(testName())); + } + + @AfterEach + public void verifyMock() { + verify(context); + } + + @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test") + @Test + public void exceptionOnFileExistsTest() throws Exception { + CreateEmpty createEmpty = new CreateEmpty(); + + // create the file so it exists + java.nio.file.Path f = Files.createFile(perTestTempSubDir.resolve("empty.wal")); + + String[] walArgs = {"--type", "WAL", f.toString()}; + CreateEmpty.Opts walOpts = new CreateEmpty.Opts(); + walOpts.parseArgs("accumulo create-empty", walArgs); + + assertThrows(IllegalArgumentException.class, + () -> createEmpty.createEmptyWal(walOpts, context)); + + // create the file so it exists + java.nio.file.Path f2 = Files.createFile(perTestTempSubDir.resolve("empty.rf")); + + String[] rfArgs = {"--type", "RF", f2.toString()}; + CreateEmpty.Opts rfOpts = new CreateEmpty.Opts(); + rfOpts.parseArgs("accumulo create-empty", rfArgs); + assertThrows(IllegalArgumentException.class, + () -> createEmpty.createEmptyRFile(walOpts, context)); + } + + @Test + public void createRfileTest() throws Exception { + CreateEmpty createEmpty = new CreateEmpty(); + + String file1 = perTestTempSubDir.resolve("empty1.rf").toString(); + String file2 = perTestTempSubDir.resolve("empty2.rf").toString(); + + String[] args = {"--type", "RF", file1, file2}; + CreateEmpty.Opts opts = new CreateEmpty.Opts(); + opts.parseArgs("accumulo create-empty", args); + + createEmpty.createEmptyRFile(opts, context); + VolumeManager vm = context.getVolumeManager(); + assertTrue(vm.exists(new Path(file1))); + try (var scanner = RFile.newScanner().from(file1).build()) { + assertEquals(0, scanner.stream().count()); + } + + assertTrue(vm.exists(new Path(file2))); + try (var scanner = RFile.newScanner().from(file2).build()) { + assertEquals(0, scanner.stream().count()); + } + + } + + /** + * Validate that the default type is RF (RecoveryWithEmptyRFileIT also needs this( + */ + @Test + public void createRfileDefaultTest() throws Exception { + CreateEmpty createEmpty = new CreateEmpty(); + + String file1 = perTestTempSubDir.resolve("empty.rf").toString(); + + String[] args = {file1}; + CreateEmpty.Opts opts = new CreateEmpty.Opts(); + opts.parseArgs("accumulo create-empty", args); + + createEmpty.createEmptyRFile(opts, context); + VolumeManager vm = context.getVolumeManager(); + assertTrue(vm.exists(new Path(file1))); + try (var scanner = RFile.newScanner().from(file1).build()) { + assertEquals(0, scanner.stream().count()); + } + } + + @Test + public void createWalTest() throws Exception { + CreateEmpty createEmpty = new CreateEmpty(); + + String file1 = perTestTempSubDir.resolve("empty1.wal").toString(); + String file2 = perTestTempSubDir.resolve("empty2.wal").toString(); + + String[] args = {"--type", "WAL", file1, file2}; + CreateEmpty.Opts opts = new CreateEmpty.Opts(); + opts.parseArgs("accumulo create-empty", args); + + createEmpty.createEmptyWal(opts, context); + + checkWalContext(file1); + readLogFile(file1); + + checkWalContext(file2); + } + + /** + * Reads the log file and looks for specific information (crypto id, event == OPEN) + */ + private void checkWalContext(final String expected) throws IOException { + Path path = new Path(expected); + VolumeManager vm = context.getVolumeManager(); + assertTrue(vm.exists(path)); + + vm.open(path); + try (InputStream inputStream = vm.open(path).getWrappedStream(); + DataInputStream dis = new DataInputStream(inputStream)) { + byte[] headerBuf = new byte[1024]; + int len = dis.read(headerBuf, 0, LOG_FILE_HEADER_V4.length()); + assertEquals(LOG_FILE_HEADER_V4.length(), len); + assertEquals(LOG_FILE_HEADER_V4, + new String(headerBuf, 0, LOG_FILE_HEADER_V4.length(), UTF_8)); + + CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL); + CryptoService cryptoService = context.getCryptoFactory().getService(env, + context.getConfiguration().getAllCryptoProperties()); + + byte[] decryptionParams = cryptoService.getFileEncrypter(env).getDecryptionParameters(); + + var cryptParams = CryptoUtils.readParams(dis); + assertArrayEquals(decryptionParams, cryptParams); + + LogFileKey key = new LogFileKey(); + key.readFields(dis); + - assertEquals(key.event, LogEvents.OPEN); - assertEquals("", key.tserverSession); - assertNull(key.filename); ++ assertEquals(key.getEvent(), LogEvents.OPEN); ++ assertEquals("", key.getTserverSession()); ++ assertNull(key.getFilename()); + } + } + + /** + * Scan through log file and check that there is one event. + */ + private void readLogFile(final String filename) throws Exception { + Path path = new Path(filename); + LogFileKey key = new LogFileKey(); + LogFileValue value = new LogFileValue(); + + FileSystem fs = context.getVolumeManager().getFileSystemByPath(path); + + CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL); + CryptoService cryptoService = context.getCryptoFactory().getService(env, + context.getConfiguration().getAllCryptoProperties()); + + int eventCount = 0; + try (final FSDataInputStream fsinput = fs.open(path); + DataInputStream input = DfsLogger.getDecryptingStream(fsinput, cryptoService)) { + while (true) { + try { + key.readFields(input); + value.readFields(input); + } catch (EOFException ex) { + break; + } + eventCount++; + } + } catch (DfsLogger.LogHeaderIncompleteException e) { + fail("Could not read header for {}" + path); + } finally { + // empty wal has 1 event (OPEN) + assertEquals(1, eventCount); + } + } + + // tempDir is per test suite - generate a one-up count file for each call. + private static final AtomicInteger fileCount = new AtomicInteger(0); + + private String genFilename(final String prefix, final String extension) { + return prefix + fileCount.incrementAndGet() + extension; + } +} diff --cc test/src/main/java/org/apache/accumulo/test/shell/ShellIT_SimpleSuite.java index 0ea3e5c16a,27b659a37c..ce86fd890f --- a/test/src/main/java/org/apache/accumulo/test/shell/ShellIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellIT_SimpleSuite.java @@@ -111,9 -116,9 +111,9 @@@ public class ShellIT_SimpleSuite extend private StringInputStream input; private TestOutputStream output; private Shell shell; - private File config; + private Path config; - public LineReader reader; - public Terminal terminal; + private LineReader reader; + private Terminal terminal; void execExpectList(String cmd, boolean expecteGoodExit, List<String> expectedStrings) throws IOException {
