This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 393849281f Replaces BulkFailureIT with new test (#4162) 393849281f is described below commit 393849281f11bf796307f8c20db7fb8ff878e341 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jan 17 15:45:34 2024 -0500 Replaces BulkFailureIT with new test (#4162) BulkFailureIT excercised low level bulk import implementation details that no longer exists in the elasticity branch. The test was removed and a new test was added that operates at a much higher level. The new test runs concurrent bulk import and compactions in such a way that duplicate loads of a file could be detected. --- .../accumulo/test/functional/BulkFailureIT.java | 397 --------------------- .../apache/accumulo/test/functional/BulkNewIT.java | 92 +++++ 2 files changed, 92 insertions(+), 397 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java deleted file mode 100644 index 1c0ce9c00d..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.functional; - -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchDeleter; -import org.apache.accumulo.core.client.InvalidTabletHostingRequestException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.CompactionConfig; -import org.apache.accumulo.core.client.rfile.RFile; -import org.apache.accumulo.core.client.rfile.RFileWriter; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.ClientTabletCache; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.metadata.AccumuloTable; -import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.rpc.ThriftUtil; -import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo; -import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; -import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.tablets.UniqueNameAllocator; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.thrift.TServiceClient; -import org.apache.thrift.transport.TTransportException; -import org.apache.zookeeper.KeeperException; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.net.HostAndPort; - -@Disabled // ELASTICITY_TODO -public class BulkFailureIT extends AccumuloClusterHarness { - - private static final Logger LOG = LoggerFactory.getLogger(BulkFailureIT.class); - - interface Loader { - void load(long txid, ClientContext context, KeyExtent extent, Path path, long size, - boolean expectFailure) throws Exception; - } - - @Test - public void testImportCompactionImport() throws Exception { - String[] tables = getUniqueNames(2); - - // run test calling new bulk import RPCs - runTest(tables[1], 22222222L, BulkFailureIT::newLoad); - } - - private static Path createNewBulkDir(ServerContext context, VolumeManager fs, String sourceDir, - TableId tableId) throws IOException { - Path tableDir = fs.matchingFileSystem(new Path(sourceDir), context.getTablesDirs()); - if (tableDir == null) { - throw new IOException( - sourceDir + " is not in the same file system as any volume configured for Accumulo"); - } - - Path directory = new Path(tableDir, tableId.canonical()); - fs.mkdirs(directory); - - // only one should be able to create the lock file - // the purpose of the lock file is to avoid a race - // condition between the call to fs.exists() and - // fs.mkdirs()... if only hadoop had a mkdir() function - // that failed when the dir existed - - UniqueNameAllocator namer = context.getUniqueNameAllocator(); - - while (true) { - Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName()); - if (fs.exists(newBulkDir)) { // sanity check - throw new IOException("Dir exist when it should not " + newBulkDir); - } - if (fs.mkdirs(newBulkDir)) { - return newBulkDir; - } - - sleepUninterruptibly(3, TimeUnit.SECONDS); - } - } - - public static String prepareBulkImport(ServerContext manager, final VolumeManager fs, String dir, - TableId tableId, long tid) throws Exception { - final Path bulkDir = createNewBulkDir(manager, fs, dir, tableId); - - manager.getAmple().addBulkLoadInProgressFlag( - "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid); - - Path dirPath = new Path(dir); - FileStatus[] dataFiles = fs.listStatus(dirPath); - - final UniqueNameAllocator namer = manager.getUniqueNameAllocator(); - - AccumuloConfiguration serverConfig = manager.getConfiguration(); - int numThreads = serverConfig.getCount(Property.MANAGER_RENAME_THREADS); - ExecutorService workers = - ThreadPools.getServerThreadPools().createFixedThreadPool(numThreads, "bulk rename", false); - List<Future<Exception>> results = new ArrayList<>(); - - for (FileStatus file : dataFiles) { - final FileStatus fileStatus = file; - results.add(workers.submit(() -> { - try { - String[] sa = fileStatus.getPath().getName().split("\\."); - String extension = ""; - if (sa.length > 1) { - extension = sa[sa.length - 1]; - - if (!FileOperations.getValidExtensions().contains(extension)) { - LOG.warn("{} does not have a valid extension, ignoring", fileStatus.getPath()); - return null; - } - } else { - LOG.warn("{} does not have any extension, ignoring", fileStatus.getPath()); - return null; - } - - String newName = "I" + namer.getNextName() + "." + extension; - Path newPath = new Path(bulkDir, newName); - try { - fs.rename(fileStatus.getPath(), newPath); - LOG.debug("Moved {} to {}", fileStatus.getPath(), newPath); - } catch (IOException E1) { - LOG.error("Could not move: {} {}", fileStatus.getPath(), E1.getMessage()); - } - - } catch (Exception ex) { - return ex; - } - return null; - })); - } - workers.shutdown(); - while (!workers.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {} - - for (Future<Exception> ex : results) { - if (ex.get() != null) { - throw ex.get(); - } - } - return bulkDir.toString(); - } - - /** - * This test verifies two things. First it ensures that after a bulk imported file is compacted - * that import request are ignored. Second it ensures that after the bulk import transaction is - * canceled that import request fail. The public API for bulk import can not be used for this - * test. Internal (non public API) RPCs and Zookeeper state is manipulated directly. This is the - * only way to interleave compactions with multiple, duplicate import RPC request. - */ - protected void runTest(String table, long fateTxid, Loader loader) throws IOException, - AccumuloException, AccumuloSecurityException, TableExistsException, KeeperException, - InterruptedException, Exception, FileNotFoundException, TableNotFoundException { - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - - SortedMap<Key,Value> testData = createTestData(); - - FileSystem fs = getCluster().getFileSystem(); - String testFile = createTestFile(fateTxid, testData, fs); - - c.tableOperations().create(table); - String tableId = c.tableOperations().tableIdMap().get(table); - - // Table has no splits, so this extent corresponds to the tables single tablet - KeyExtent extent = new KeyExtent(TableId.of(tableId), null, null); - - ServerContext asCtx = getServerContext(); - - VolumeManager vm = asCtx.getVolumeManager(); - - // move the file into a directory for the table and rename the file to something unique - String bulkDir = prepareBulkImport(asCtx, vm, testFile, TableId.of(tableId), fateTxid); - assertNotNull(bulkDir); - - // determine the files new name and path - FileStatus status = fs.listStatus(new Path(bulkDir))[0]; - Path bulkLoadPath = fs.makeQualified(status.getPath()); - - // Directly ask the tablet to load the file. - loader.load(fateTxid, asCtx, extent, bulkLoadPath, status.getLen(), false); - - assertEquals(Set.of(bulkLoadPath), getFiles(c, extent)); - assertEquals(Set.of(bulkLoadPath), getLoaded(c, extent)); - assertEquals(testData, readTable(table, c)); - - // Compact the bulk imported file. Subsequent request to load the file should be ignored. - c.tableOperations().compact(table, new CompactionConfig().setWait(true)); - - Set<Path> tabletFiles = getFiles(c, extent); - assertFalse(tabletFiles.contains(bulkLoadPath)); - assertEquals(1, tabletFiles.size()); - assertEquals(Set.of(bulkLoadPath), getLoaded(c, extent)); - assertEquals(testData, readTable(table, c)); - - // this request should be ignored by the tablet - loader.load(fateTxid, asCtx, extent, bulkLoadPath, status.getLen(), false); - - assertEquals(tabletFiles, getFiles(c, extent)); - assertEquals(Set.of(bulkLoadPath), getLoaded(c, extent)); - assertEquals(testData, readTable(table, c)); - - // this is done to ensure the tablet reads the load flags from the metadata table when it - // loads - c.tableOperations().offline(table, true); - c.tableOperations().online(table, true); - - // this request should be ignored by the tablet - loader.load(fateTxid, asCtx, extent, bulkLoadPath, status.getLen(), false); - - assertEquals(tabletFiles, getFiles(c, extent)); - assertEquals(Set.of(bulkLoadPath), getLoaded(c, extent)); - assertEquals(testData, readTable(table, c)); - - c.securityOperations().grantTablePermission(c.whoami(), AccumuloTable.METADATA.tableName(), - TablePermission.WRITE); - - BatchDeleter bd = - c.createBatchDeleter(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY, 1); - bd.setRanges(Collections.singleton(extent.toMetaRange())); - bd.fetchColumnFamily(BulkFileColumnFamily.NAME); - bd.delete(); - - loader.load(fateTxid, asCtx, extent, bulkLoadPath, status.getLen(), true); - - assertEquals(tabletFiles, getFiles(c, extent)); - assertEquals(Set.of(), getLoaded(c, extent)); - assertEquals(testData, readTable(table, c)); - } - } - - private SortedMap<Key,Value> createTestData() { - SortedMap<Key,Value> testData = new TreeMap<>(); - testData.put(new Key("r001", "f002", "q009", 56), new Value("v001")); - testData.put(new Key("r001", "f002", "q019", 56), new Value("v002")); - testData.put(new Key("r002", "f002", "q009", 57), new Value("v003")); - testData.put(new Key("r002", "f002", "q019", 57), new Value("v004")); - return testData; - } - - private String createTestFile(long txid, SortedMap<Key,Value> testData, FileSystem fs) - throws IOException { - Path base = new Path(getCluster().getTemporaryPath(), "testBulk_ICI_" + txid); - - fs.delete(base, true); - fs.mkdirs(base); - Path files = new Path(base, "files"); - - try (RFileWriter writer = - RFile.newWriter().to(new Path(files, "ici_01.rf").toString()).withFileSystem(fs).build()) { - writer.append(testData.entrySet()); - } - - String filesStr = fs.makeQualified(files).toString(); - return filesStr; - } - - private SortedMap<Key,Value> readTable(String table, AccumuloClient connector) - throws TableNotFoundException { - Scanner scanner = connector.createScanner(table, Authorizations.EMPTY); - - SortedMap<Key,Value> actual = new TreeMap<>(); - - for (Entry<Key,Value> entry : scanner) { - actual.put(entry.getKey(), entry.getValue()); - } - - return actual; - } - - public static Set<Path> getLoaded(AccumuloClient connector, KeyExtent extent) - throws TableNotFoundException { - return getPaths(connector, extent, BulkFileColumnFamily.NAME); - } - - public static Set<Path> getFiles(AccumuloClient connector, KeyExtent extent) - throws TableNotFoundException { - return getPaths(connector, extent, DataFileColumnFamily.NAME); - } - - private static Set<Path> getPaths(AccumuloClient connector, KeyExtent extent, Text fam) - throws TableNotFoundException { - HashSet<Path> files = new HashSet<>(); - - Scanner scanner = - connector.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY); - scanner.setRange(extent.toMetaRange()); - scanner.fetchColumnFamily(fam); - - for (Entry<Key,Value> entry : scanner) { - files.add(StoredTabletFile.of(entry.getKey().getColumnQualifier()).getPath()); - } - - return files; - } - - private static void newLoad(long txid, ClientContext context, KeyExtent extent, Path path, - long size, boolean expectFailure) throws Exception { - - TabletIngestClientService.Iface client = getClient(context, extent); - try { - - Map<String,DataFileInfo> val = Map.of(path.getName(), new DataFileInfo(size)); - Map<KeyExtent,Map<String,DataFileInfo>> files = Map.of(extent, val); - - // ELASTICITY_TODO this used to call bulk import directly on tserver, need to look into the - // bigger picture of what this test was doing and how it can work w/ the new bulk import - throw new UnsupportedOperationException(); - /* - * client.loadFiles(TraceUtil.traceInfo(), context.rpcCreds(), txid, - * path.getParent().toString(), files.entrySet().stream().collect( Collectors.toMap(entry -> - * entry.getKey().toThrift(), Entry::getValue)), false); - * - * if (!expectFailure) { while (!getLoaded(context, extent).contains(path)) { - * Thread.sleep(100); } } - */ - } finally { - ThriftUtil.returnClient((TServiceClient) client, context); - } - } - - protected static TabletIngestClientService.Iface getClient(ClientContext context, - KeyExtent extent) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - TTransportException, InvalidTabletHostingRequestException { - ClientTabletCache locator = ClientTabletCache.getInstance(context, extent.tableId()); - - locator.invalidateCache(extent); - - HostAndPort location = HostAndPort.fromString(locator - .findTabletWithRetry(context, new Text(""), false, ClientTabletCache.LocationNeed.REQUIRED) - .getTserverLocation().orElseThrow()); - - long timeInMillis = context.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT); - TabletIngestClientService.Iface client = - ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, location, context, timeInMillis); - return client; - } -} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index 18edf22e3f..cb04fc7464 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -47,7 +47,11 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -105,6 +109,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.google.common.util.concurrent.MoreExecutors; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class BulkNewIT extends SharedMiniClusterBase { @@ -621,6 +627,92 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + @Test + public void testConcurrentCompactions() throws Exception { + // run test with bulk imports happening in parallel + testConcurrentCompactions(true); + // run the test with bulk imports happening serially + testConcurrentCompactions(false); + } + + private void testConcurrentCompactions(boolean parallelBulkImports) throws Exception { + // Tests compactions running concurrently with bulk import to ensure that data is not bulk + // imported twice. Doing a large number of bulk imports should naturally cause compactions to + // happen. This test ensures that compactions running concurrently with bulk import does not + // cause duplicate imports of a files. For example if a files is imported into a tablet and then + // compacted away then the file should not be imported again by the FATE operation doing the + // bulk import. The test is structured in such a way that duplicate imports would be detected. + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().delete(tableName); + // Create table without versioning iterator. This done to detect the same file being imported + // more than once. + c.tableOperations().create(tableName, new NewTableConfiguration().withoutDefaultIterators()); + + addSplits(c, tableName, "0999 1999 2999 3999 4999 5999 6999 7999 8999"); + + String dir = getDir("/testBulkFile-"); + + final int N = 100; + + ExecutorService executor; + if (parallelBulkImports) { + executor = Executors.newFixedThreadPool(16); + } else { + // execute the bulk imports in the current thread which will cause them to run serially + executor = MoreExecutors.newDirectExecutorService(); + } + + // Do N bulk imports of the exact same data. + var futures = IntStream.range(0, N).mapToObj(i -> executor.submit(() -> { + try { + String iterationDir = dir + "/iteration" + i; + // Create 10 files for the bulk import. + for (int f = 0; f < 10; f++) { + writeData(iterationDir + "/f" + f + ".", aconf, f * 1000, (f + 1) * 1000 - 1); + } + c.tableOperations().importDirectory(iterationDir).to(tableName).tableTime(true).load(); + getCluster().getFileSystem().delete(new Path(iterationDir), true); + } catch (Exception e) { + throw new IllegalStateException(e); + } + })).collect(Collectors.toList()); + + // wait for all bulk imports and check for errors in background threads + for (var future : futures) { + future.get(); + } + + executor.shutdown(); + + try (var scanner = c.createScanner(tableName)) { + // Count the number of times each row is seen. + Map<String,Long> rowCounts = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + var expectedRows = IntStream.range(0, 10000).mapToObj(i -> String.format("%04d", i)) + .collect(Collectors.toSet()); + assertEquals(expectedRows, rowCounts.keySet()); + // Each row should be duplicated once for each bulk import. If a file were imported twice, + // then would see a higher count. + assertTrue(rowCounts.values().stream().allMatch(l -> l == N)); + } + + // Its expected that compactions ran while the bulk imports were running. If no compactions + // ran, then each tablet would have N files. Verify each tablet has less than N files. + try (var scanner = c.createScanner("accumulo.metadata")) { + scanner.setRange(MetadataSchema.TabletsSection + .getRange(getCluster().getServerContext().getTableId(tableName))); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + // Get the count of files for each tablet. + Map<String,Long> rowCounts = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + assertTrue(rowCounts.values().stream().allMatch(l -> l < N)); + // expect to see 10 tablets + assertEquals(10, rowCounts.size()); + } + } + } + @Test public void testExceptionInMetadataUpdate() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {