This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit bb673da62f321d02e597ab8ea33f57a95c7607ab Merge: 18e9a3453b 238a5226d3 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Jun 15 12:14:07 2023 +0000 Merge branch 'main' into elasticity .../apache/accumulo/core/metadata/TabletFile.java | 13 +- pom.xml | 3 +- .../accumulo/server/conf/util/ZooInfoViewer.java | 162 +---------- .../accumulo/server/conf/util/ZooPropEditor.java | 305 +++++++++++++++++++++ .../accumulo/server/conf/util/ZooPropUtils.java | 140 ++++++++++ .../server/conf/util/ZooInfoViewerTest.java | 86 +----- .../server/conf/util/ZooPropEditorTest.java | 30 +- .../server/conf/util/ZooPropUtilsTest.java | 68 +++++ .../accumulo/manager/TabletGroupWatcher.java | 21 +- .../manager/tableOps/bulkVer2/PrepBulkImport.java | 2 +- .../accumulo/monitor/resources/css/screen.css | 3 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 35 ++- .../accumulo/start/spi/KeywordExecutable.java | 5 +- .../accumulo/test/conf/util/ZooPropEditorIT.java | 138 ++++++++++ .../test/functional/GarbageCollectorTrashBase.java | 2 +- ...ageCollectorTrashEnabledWithCustomPolicyIT.java | 2 +- .../apache/accumulo/test/functional/SplitIT.java | 76 +++++ .../apache/accumulo/test/start/KeywordStartIT.java | 71 +++-- 18 files changed, 853 insertions(+), 309 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 44fb1ba38f,807387d87e..b5f46e70f2 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -170,12 -145,11 +170,11 @@@ abstract class TabletGroupWatcher exten private static class TabletLists { private final List<Assignment> assignments = new ArrayList<>(); private final List<Assignment> assigned = new ArrayList<>(); - private final List<TabletLocationState> assignedToDeadServers = new ArrayList<>(); - private final List<TabletLocationState> suspendedToGoneServers = new ArrayList<>(); + private final List<TabletMetadata> assignedToDeadServers = new ArrayList<>(); + private final List<TabletMetadata> suspendedToGoneServers = new ArrayList<>(); private final Map<KeyExtent,UnassignedTablet> unassigned = new HashMap<>(); private final Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>(); - // read only lists of tablet servers - private final SortedMap<TServerInstance,TabletServerStatus> currentTServers; + // read only list of tablet servers that are not shutting down private final SortedMap<TServerInstance,TabletServerStatus> destinations; public TabletLists(Manager m, SortedMap<TServerInstance,TabletServerStatus> curTServers) { diff --cc test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index 8678ff5f49,6ca58a4fe1..b60c5e153c --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@@ -18,34 -18,27 +18,40 @@@ */ package org.apache.accumulo.test.functional; + import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; import java.time.Duration; +import java.util.ArrayList; + import java.util.Base64; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; + import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.Scanner; + import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; + import org.apache.accumulo.core.client.rfile.RFile; + import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@@ -65,15 -56,15 +71,18 @@@ import org.apache.accumulo.test.TestIng import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + public class SplitIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(SplitIT.class); @@@ -223,121 -213,71 +232,188 @@@ } } + @Test + public void testLargeSplit() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName, new NewTableConfiguration() + .setProperties(Map.of(Property.TABLE_MAX_END_ROW_SIZE.getKey(), "10K"))); + + byte[] okSplit = new byte[4096]; + for (int i = 0; i < okSplit.length; i++) { + okSplit[i] = (byte) (i % 256); + } + + var splits1 = new TreeSet<Text>(List.of(new Text(okSplit))); + + c.tableOperations().addSplits(tableName, splits1); + + assertEquals(splits1, new TreeSet<>(c.tableOperations().listSplits(tableName))); + + byte[] bigSplit = new byte[4096 * 4]; + for (int i = 0; i < bigSplit.length; i++) { + bigSplit[i] = (byte) (i % 256); + } + + var splits2 = new TreeSet<Text>(List.of(new Text(bigSplit))); + // split should fail because it exceeds the configured max split size + assertThrows(AccumuloException.class, + () -> c.tableOperations().addSplits(tableName, splits2)); + + // ensure the large split is not there + assertEquals(splits1, new TreeSet<>(c.tableOperations().listSplits(tableName))); + } + } + + @Test + public void concurrentSplit() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + final String tableName = getUniqueNames(1)[0]; + + log.debug("Creating table {}", tableName); + c.tableOperations().create(tableName); + + final int numRows = 100_000; + log.debug("Ingesting {} rows into {}", numRows, tableName); + VerifyParams params = new VerifyParams(getClientProps(), tableName, numRows); + TestIngest.ingest(c, params); + + log.debug("Verifying {} rows ingested into {}", numRows, tableName); + VerifyIngest.verifyIngest(c, params); + + log.debug("Creating futures that add random splits to the table"); + ExecutorService es = Executors.newFixedThreadPool(10); + final int totalFutures = 100; + final int splitsPerFuture = 4; + final Set<Text> totalSplits = new HashSet<>(); + List<Callable<Void>> tasks = new ArrayList<>(totalFutures); + for (int i = 0; i < totalFutures; i++) { + final Pair<Integer,Integer> splitBounds = getRandomSplitBounds(numRows); + final TreeSet<Text> splits = TestIngest.getSplitPoints(splitBounds.getFirst().longValue(), + splitBounds.getSecond().longValue(), splitsPerFuture); + totalSplits.addAll(splits); + tasks.add(() -> { + c.tableOperations().addSplits(tableName, splits); + return null; + }); + } + + log.debug("Submitting futures"); + List<Future<Void>> futures = + tasks.parallelStream().map(es::submit).collect(Collectors.toList()); + + log.debug("Waiting for futures to complete"); + for (Future<?> f : futures) { + f.get(); + } + es.shutdown(); + + log.debug("Checking that {} splits were created ", totalSplits.size()); + + assertEquals(totalSplits, new HashSet<>(c.tableOperations().listSplits(tableName)), + "Did not see expected splits"); + + // ELASTICITY_TODO the following could be removed after #3309. Currently scanning an ondemand + // table with lots of tablets will cause the test to timeout. + c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); + + log.debug("Verifying {} rows ingested into {}", numRows, tableName); + VerifyIngest.verifyIngest(c, params); + } + } + + /** + * Generates a pair of integers that represent the start and end of a range of splits. The start + * and end are randomly generated between 0 and upperBound. The start is guaranteed to be less + * than the end and the two bounds are guaranteed to be different values. + * + * @param upperBound the upper bound of the range of splits + * @return a pair of integers that represent the start and end of a range of splits + */ + private Pair<Integer,Integer> getRandomSplitBounds(int upperBound) { + Preconditions.checkArgument(upperBound > 1, "upperBound must be greater than 1"); + + int start = random.nextInt(upperBound); + int end = random.nextInt(upperBound - 1); + + // ensure start is less than end and that end is not equal to start + if (end >= start) { + end += 1; + } else { + int tmp = start; + start = end; + end = tmp; + } + + return new Pair<>(start, end); + } + + private String getDir() throws Exception { + var rootPath = getCluster().getTemporaryPath().toString(); + String dir = rootPath + "/" + getUniqueNames(1)[0]; + getCluster().getFileSystem().delete(new Path(dir), true); + return dir; + } + + @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, + justification = "predictable random with specific seed is intended for this test") + @Test + public void bulkImportThatCantSplitHangsCompaction() throws Exception { + + /* + * There was a bug where a bulk import into a tablet with the following conditions would cause + * compactions to hang. + * + * 1. Tablet where the files sizes indicates its needs to split + * + * 2. Row with many columns in the tablet that is unsplittable + * + * This happened because the bulk import plus an attempted split would leave the tablet in a bad + * internal state for compactions. + */ + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + c.tableOperations().create(tableName, new NewTableConfiguration() + .setProperties(singletonMap(Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"))); + + Random random = new Random(); + byte[] val = new byte[100]; + + String dir = getDir(); + String file = dir + "/f1.rf"; + + // create a file with a single row and lots of columns. The files size will exceed the split + // threshold configured above. + try ( + RFileWriter writer = RFile.newWriter().to(file).withFileSystem(getFileSystem()).build()) { + writer.startDefaultLocalityGroup(); + for (int i = 0; i < 1000; i++) { + random.nextBytes(val); + writer.append(new Key("r1", "f1", String.format("%09d", i)), + new Value(Base64.getEncoder().encodeToString(val))); + } + } + + // import the file + c.tableOperations().importDirectory(dir).to(tableName).load(); + + // tablet should not be able to split + assertEquals(0, c.tableOperations().listSplits(tableName).size()); + + Thread.sleep(1000); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + // should have over 100K of data in the values + assertTrue( + c.createScanner(tableName).stream().mapToLong(entry -> entry.getValue().getSize()).sum() + > 100_000); + + // should have 1000 entries + assertEquals(1000, c.createScanner(tableName).stream().count()); + } + } } diff --cc test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java index 1c8771fa52,bab15197eb..01298502d4 --- a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java @@@ -117,7 -129,9 +128,8 @@@ public class KeywordStartIT expectSet.put("admin", Admin.class); expectSet.put("check-compaction-config", CheckCompactionConfig.class); expectSet.put("check-server-config", CheckServerConfig.class); - expectSet.put("compaction-coordinator", CoordinatorExecutable.class); expectSet.put("compactor", CompactorExecutable.class); + expectSet.put("create-empty", CreateEmpty.class); expectSet.put("create-token", CreateToken.class); expectSet.put("dump-zoo", DumpZookeeper.class); expectSet.put("ec-admin", ECAdmin.class);