This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch no-chop-merge in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/no-chop-merge by this push: new 2e8abb41d2 No Chop Merges (#3640) 2e8abb41d2 is described below commit 2e8abb41d23b1d1f3a38d08b38d0ff357429a630 Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Sun Aug 27 13:22:41 2023 -0400 No Chop Merges (#3640) This commit disables chop compactions on merge and modifies the tablet metadata update in TabletGroupWatcher to fence of each tablet file to its tablet range as its copied to the highest tablet on merge. By fencing off the files in a tablet on merge chop compactions are no longer necessary as this prevents data from coming back on merge as scans will be fenced to the provided range on the metadata updates. --- .../accumulo/server/manager/state/MergeInfo.java | 8 +- .../server/manager/state/MergeInfoTest.java | 20 -- .../accumulo/manager/TabletGroupWatcher.java | 67 ++++- .../test/functional/FileNormalizationIT.java | 4 +- .../apache/accumulo/test/functional/MergeIT.java | 334 ++++++++++++++++++++- .../apache/accumulo/test/manager/MergeStateIT.java | 18 +- .../accumulo/test/util/FileMetadataUtil.java | 76 +++++ 7 files changed, 489 insertions(+), 38 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeInfo.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeInfo.java index e1128004f8..24ad4fd90a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeInfo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MergeInfo.java @@ -82,15 +82,13 @@ public class MergeInfo implements Writable { } public boolean needsToBeChopped(KeyExtent otherExtent) { + // TODO: For now only Deletes still need chops // During a delete, the block after the merge will be stretched to cover the deleted area. // Therefore, it needs to be chopped - if (!otherExtent.tableId().equals(extent.tableId())) { - return false; - } - if (isDelete()) { + if (isDelete() && otherExtent.tableId().equals(extent.tableId())) { return otherExtent.prevEndRow() != null && otherExtent.prevEndRow().equals(extent.endRow()); } else { - return this.extent.overlaps(otherExtent); + return false; } } diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java index f6712381ee..01ae58bf24 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/MergeInfoTest.java @@ -100,18 +100,6 @@ public class MergeInfoTest { assertFalse(mi.needsToBeChopped(keyExtent2)); } - @Test - public void testNeedsToBeChopped_NotDelete() { - expect(keyExtent.tableId()).andReturn(TableId.of("table1")); - KeyExtent keyExtent2 = createMock(KeyExtent.class); - expect(keyExtent2.tableId()).andReturn(TableId.of("table1")); - replay(keyExtent2); - expect(keyExtent.overlaps(keyExtent2)).andReturn(true); - replay(keyExtent); - mi = new MergeInfo(keyExtent, MergeInfo.Operation.MERGE); - assertTrue(mi.needsToBeChopped(keyExtent2)); - } - @Test public void testNeedsToBeChopped_Delete_NotFollowing() { testNeedsToBeChopped_Delete("somerow", false); @@ -213,14 +201,6 @@ public class MergeInfoTest { assertFalse(info.needsToBeChopped(ke("y", "c", "b"))); assertFalse(info.needsToBeChopped(ke("x", "c", "bb"))); assertFalse(info.needsToBeChopped(ke("x", "b", "a"))); - info = new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.MERGE); - assertTrue(info.needsToBeChopped(ke("x", "c", "a"))); - assertTrue(info.needsToBeChopped(ke("x", "aa", "a"))); - assertTrue(info.needsToBeChopped(ke("x", null, null))); - assertFalse(info.needsToBeChopped(ke("x", "c", "b"))); - assertFalse(info.needsToBeChopped(ke("y", "c", "b"))); - assertFalse(info.needsToBeChopped(ke("x", "c", "bb"))); - assertTrue(info.needsToBeChopped(ke("x", "b", "a"))); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index ee54284a25..b3affa47da 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -775,11 +775,17 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { AccumuloClient client = manager.getContext(); + KeyExtent stopExtent = KeyExtent.fromMetaRow(stop.toMetaRow()); + KeyExtent previousKeyExtent = null; + KeyExtent lastExtent = null; + try (BatchWriter bw = client.createBatchWriter(targetSystemTable)) { long fileCount = 0; // Make file entries in highest tablet Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); - scanner.setRange(scanRange); + // Update to set the range to include the highest tablet + scanner.setRange( + new Range(TabletsSection.encodeRow(range.tableId(), start), false, stopRow, true)); TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); ServerColumnFamily.TIME_COLUMN.fetch(scanner); ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); @@ -789,8 +795,63 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { for (Entry<Key,Value> entry : scanner) { Key key = entry.getKey(); Value value = entry.getValue(); + + final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow()); + + // Keep track of the last Key Extent seen so we can use it to fence + // of RFiles when merging the metadata + if (lastExtent != null && !keyExtent.equals(lastExtent)) { + previousKeyExtent = lastExtent; + } + + // Special case for now to handle the highest/stop tablet which is where files are + // merged to so we need to handle the deletes on update here as it won't be handled later + // TODO: Can this be re-written to not have a special edge case and make it simpler? + if (keyExtent.equals(stopExtent)) { + if (previousKeyExtent != null + && key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + + // Fence off existing files by the end row of the previous tablet and current tablet + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + // The end row should be inclusive for the current tablet and the previous end row + // should be exclusive for the start row + Range fenced = new Range(previousKeyExtent.endRow(), false, keyExtent.endRow(), true); + + // Clip range if exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + // If the existing metadata does not match then we need to delete the old + // and replace with a new range + if (!existing.equals(newFile)) { + m.putDelete(DataFileColumnFamily.NAME, existing.getMetadataText()); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + } + + fileCount++; + } + // For the highest tablet we only care about the DataFileColumnFamily + continue; + } + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - m.put(key.getColumnFamily(), key.getColumnQualifier(), value); + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + // TODO: Should we try and be smart and eventually collapse overlapping ranges to reduce + // the metadata? The fenced reader will already collapse ranges when reading. + + // Fence off files by the previous tablet and current tablet that is being merged + // The end row should be inclusive for the current tablet and the previous end row should + // be exclusive for the start row. + Range fenced = new Range(previousKeyExtent != null ? previousKeyExtent.endRow() : null, + false, keyExtent.endRow(), true); + + // Clip range with the tablet range if the range already exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + // Move the file and range to the last tablet + StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + fileCount++; } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) { @@ -803,6 +864,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { var allVolumesDir = new AllVolumesDirectory(range.tableId(), value.toString()); bw.addMutation(manager.getContext().getAmple().createDeleteMutation(allVolumesDir)); } + + lastExtent = keyExtent; } // read the logical time from the last tablet in the merge range, it is not included in diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FileNormalizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FileNormalizationIT.java index 789fd5804c..37f2f9f008 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FileNormalizationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FileNormalizationIT.java @@ -174,7 +174,7 @@ public class FileNormalizationIT extends SharedMiniClusterBase { scanner.forEach((k, v) -> { var qual = k.getColumnQualifierData().toString(); assertTrue(qual.contains("//tables//")); - filesBeforeMerge.add(qual); + filesBeforeMerge.add(StoredTabletFile.of(qual).getMetadataPath()); }); } @@ -188,7 +188,7 @@ public class FileNormalizationIT extends SharedMiniClusterBase { scanner.forEach((k, v) -> { // should only see the default tablet assertTrue(k.getRow().toString().endsWith("<")); - filesAfterMerge.add(k.getColumnQualifierData().toString()); + filesAfterMerge.add(StoredTabletFile.of(k.getColumnQualifier()).getMetadataPath()); }); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java index 82a7264207..c30a0d3bbc 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java @@ -18,34 +18,56 @@ */ package org.apache.accumulo.test.functional; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.test.util.FileMetadataUtil.printAndVerifyFileMetadata; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.core.util.Merge; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MergeIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(MergeIT.class); + SortedSet<Text> splits(String[] points) { SortedSet<Text> result = new TreeSet<>(); for (String point : points) { @@ -102,6 +124,306 @@ public class MergeIT extends AccumuloClusterHarness { } } + @Test + public void noChopMergeTest() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + final TableId tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + // First write 1000 rows to a file in the default tablet + ingest(c, 1000, 1, tableName); + c.tableOperations().flush(tableName, null, null, true); + + log.debug("Metadata after Ingest"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 1); + + // Add splits so we end up with 4 tablets + final SortedSet<Text> splits = new TreeSet<>(); + for (int i = 250; i <= 750; i += 250) { + splits.add(new Text("row_" + String.format("%010d", i))); + } + c.tableOperations().addSplits(tableName, splits); + + log.debug("Metadata after Split"); + verify(c, 1000, 1, tableName); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 4); + + // Go through and delete two blocks of rows, 101 - 200 + // and also 301 - 400 so we can test that the data doesn't come + // back on merge + try (BatchWriter bw = c.createBatchWriter(tableName)) { + byte[] COL_PREFIX = "col_".getBytes(UTF_8); + Text colq = new Text(FastFormat.toZeroPaddedString(0, 7, 10, COL_PREFIX)); + + for (int i = 101; i <= 200; i++) { + Mutation m = new Mutation(new Text("row_" + String.format("%010d", i))); + m.putDelete(new Text("colf"), colq); + bw.addMutation(m); + } + for (int i = 301; i <= 400; i++) { + Mutation m = new Mutation(new Text("row_" + String.format("%010d", i))); + m.putDelete(new Text("colf"), colq); + bw.addMutation(m); + } + } + + c.tableOperations().flush(tableName, null, null, true); + + // compact the first 2 tablets so the new files with the deletes are gone + // so we can test that the data does not come back when the 3rd tablet is + // merged back with the other tablets as it still contains the original file + c.tableOperations().compact(tableName, new CompactionConfig().setStartRow(null) + .setEndRow(List.copyOf(splits).get(1)).setWait(true)); + log.debug("Metadata after deleting rows 101 - 200 and 301 - 400"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 4); + + // Merge and print results + c.tableOperations().merge(tableName, null, null); + log.debug("Metadata after Merge"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 4); + + // Verify that the deleted rows can't be read after merge + verify(c, 100, 1, tableName); + verifyNoRows(c, 100, 101, tableName); + verify(c, 100, 201, tableName); + verifyNoRows(c, 100, 301, tableName); + verify(c, 600, 401, tableName); + } + } + + @Test + public void noChopMergeDeleteAcrossTablets() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + // disable compactions + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999"); + final TableId tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + // First write 1000 rows to a file in the default tablet + ingest(c, 1000, 1, tableName); + c.tableOperations().flush(tableName, null, null, true); + + log.debug("Metadata after Ingest"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 1); + + // Add splits so we end up with 10 tablets + final SortedSet<Text> splits = new TreeSet<>(); + for (int i = 100; i <= 900; i += 100) { + splits.add(new Text("row_" + String.format("%010d", i))); + } + c.tableOperations().addSplits(tableName, splits); + + log.debug("Metadata after Split"); + verify(c, 1000, 1, tableName); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 10); + + // Go through and delete three blocks of rows + // 151 - 250, 451 - 550, 751 - 850 + try (BatchWriter bw = c.createBatchWriter(tableName)) { + byte[] COL_PREFIX = "col_".getBytes(UTF_8); + Text colq = new Text(FastFormat.toZeroPaddedString(0, 7, 10, COL_PREFIX)); + + for (int j = 0; j <= 2; j++) { + for (int i = 151; i <= 250; i++) { + Mutation m = new Mutation(new Text("row_" + String.format("%010d", i + (j * 300)))); + m.putDelete(new Text("colf"), colq); + bw.addMutation(m); + } + } + } + + c.tableOperations().flush(tableName, null, null, true); + + log.debug("Metadata after deleting rows 151 - 250, 451 - 550, 751 - 850"); + // compact some of the tablets with deletes so we can test that the data does not come back + c.tableOperations().compact(tableName, + new CompactionConfig().setStartRow(new Text("row_" + String.format("%010d", 150))) + .setEndRow(new Text("row_" + String.format("%010d", 250))).setWait(true)); + c.tableOperations().compact(tableName, + new CompactionConfig().setStartRow(new Text("row_" + String.format("%010d", 750))) + .setEndRow(new Text("row_" + String.format("%010d", 850))).setWait(true)); + // Should be 16 files (10 for the original splits plus 2 extra files per deletion range across + // tablets) + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 12); + + c.tableOperations().merge(tableName, null, null); + log.debug("Metadata after Merge"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 12); + + // Verify that the deleted rows can't be read after merge + verify(c, 150, 1, tableName); + verifyNoRows(c, 100, 151, tableName); + verify(c, 200, 251, tableName); + verifyNoRows(c, 100, 451, tableName); + verify(c, 200, 551, tableName); + verifyNoRows(c, 100, 751, tableName); + verify(c, 150, 851, tableName); + + // Compact and verify we clean up all the files and only 1 left + // Verify only 700 entries + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + log.debug("Metadata after compact"); + // Should just be 1 file with infinite range + Map<StoredTabletFile,DataFileValue> files = + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 1); + assertEquals(new Range(), files.keySet().stream().findFirst().orElseThrow().getRange()); + assertEquals(700, files.values().stream().findFirst().orElseThrow().getNumEntries()); + } + } + + // Multiple splits/deletes/merges to show ranges work and carry forward + // Testing that we can split -> delete, merge, split -> delete, merge + // with deletions across boundaries + @Test + public void noChopMergeDeleteAcrossTabletsMultiple() throws Exception { + // Run initial test to populate table and merge which adds ranges to files + noChopMergeDeleteAcrossTablets(); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + final TableId tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + log.debug("Metadata after initial test run"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, -1); + + // Add splits so we end up with 10 tablets + final SortedSet<Text> splits = new TreeSet<>(); + for (int i = 100; i <= 900; i += 100) { + splits.add(new Text("row_" + String.format("%010d", i))); + } + c.tableOperations().addSplits(tableName, splits); + + log.debug("Metadata after Split for second time"); + // Verify that the deleted rows can't be read after merge + verify(c, 150, 1, tableName); + verifyNoRows(c, 100, 151, tableName); + verify(c, 200, 251, tableName); + verifyNoRows(c, 100, 451, tableName); + verify(c, 200, 551, tableName); + verifyNoRows(c, 100, 751, tableName); + verify(c, 150, 851, tableName); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, -1); + + c.tableOperations().flush(tableName, null, null, true); + + // Go through and also delete 651 - 700 + try (BatchWriter bw = c.createBatchWriter(tableName)) { + byte[] COL_PREFIX = "col_".getBytes(UTF_8); + Text colq = new Text(FastFormat.toZeroPaddedString(0, 7, 10, COL_PREFIX)); + + for (int i = 651; i <= 700; i++) { + Mutation m = new Mutation(new Text("row_" + String.format("%010d", i))); + m.putDelete(new Text("colf"), colq); + bw.addMutation(m); + } + } + + c.tableOperations().flush(tableName, null, null, true); + + log.debug("Metadata after deleting rows 151 - 250, 451 - 550, 651 - 700, 751 - 850"); + // compact some of the tablets with deletes so we can test that the data does not come back + c.tableOperations().compact(tableName, + new CompactionConfig().setStartRow(new Text("row_" + String.format("%010d", 150))) + .setEndRow(new Text("row_" + String.format("%010d", 700))).setWait(true)); + + // Re-merge a second time after deleting more rows + c.tableOperations().merge(tableName, null, null); + log.debug("Metadata after second Merge"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, -1); + + // Verify that the deleted rows can't be read after merge + verify(c, 150, 1, tableName); + verifyNoRows(c, 100, 151, tableName); + verify(c, 200, 251, tableName); + verifyNoRows(c, 100, 451, tableName); + verify(c, 100, 551, tableName); + verifyNoRows(c, 50, 651, tableName); + verify(c, 50, 701, tableName); + verifyNoRows(c, 100, 751, tableName); + verify(c, 150, 851, tableName); + } + } + + // Tests that after we merge and fence files, we can split and then + // merge a second time the same table which shows splits/merges work + // for files that already have ranges + @Test + public void noChopMergeTestMultipleMerges() throws Exception { + // Do initial merge which will fence off files + noChopMergeTest(); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + final TableId tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + log.debug("Metadata after initial no chop merge test"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, 4); + + // Add splits so we end up with 4 tablets + final SortedSet<Text> splits = new TreeSet<>(); + for (int i = 250; i <= 750; i += 250) { + splits.add(new Text("row_" + String.format("%010d", i))); + } + c.tableOperations().addSplits(tableName, splits); + + log.debug("Metadata after Split"); + // Verify after splitting for the second time + verify(c, 100, 1, tableName); + verifyNoRows(c, 100, 101, tableName); + verify(c, 100, 201, tableName); + verifyNoRows(c, 100, 301, tableName); + verify(c, 600, 401, tableName); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, -1); + + // Re-Merge and print results. This tests merging with files + // that already have a range + c.tableOperations().merge(tableName, null, null); + log.debug("Metadata after Merge"); + printAndVerifyFileMetadata(getCluster().getServerContext(), tableId, -1); + + // Verify that the deleted rows can't be read after merge + verify(c, 100, 1, tableName); + verifyNoRows(c, 100, 101, tableName); + verify(c, 100, 201, tableName); + verifyNoRows(c, 100, 301, tableName); + verify(c, 600, 401, tableName); + } + } + + public static void ingest(AccumuloClient accumuloClient, int rows, int offset, String tableName) + throws Exception { + IngestParams params = new IngestParams(accumuloClient.properties(), tableName, rows); + params.cols = 1; + params.dataSize = 10; + params.startRow = offset; + params.columnFamily = "colf"; + params.createTable = true; + TestIngest.ingest(accumuloClient, params); + } + + private static void verify(AccumuloClient accumuloClient, int rows, int offset, String tableName) + throws Exception { + VerifyParams params = new VerifyParams(accumuloClient.properties(), tableName, rows); + params.rows = rows; + params.dataSize = 10; + params.startRow = offset; + params.columnFamily = "colf"; + params.cols = 1; + VerifyIngest.verifyIngest(accumuloClient, params); + } + + private static void verifyNoRows(AccumuloClient accumuloClient, int rows, int offset, + String tableName) throws Exception { + try { + verify(accumuloClient, rows, offset, tableName); + fail("Should have failed"); + } catch (AccumuloException e) { + assertTrue(e.getMessage().contains("Did not read expected number of rows. Saw 0")); + } + } + private String[] toStrings(Collection<Text> listSplits) { String[] result = new String[listSplits.size()]; int i = 0; @@ -166,7 +488,7 @@ public class MergeIT extends AccumuloClusterHarness { private void runMergeTest(AccumuloClient client, String table, String[] splits, String[] expectedSplits, String[] inserts, String start, String end) throws Exception { - System.out.println( + log.debug( "Running merge test " + table + " " + Arrays.asList(splits) + " " + start + " " + end); SortedSet<Text> splitSet = splits(splits); @@ -187,9 +509,19 @@ public class MergeIT extends AccumuloClusterHarness { } } + log.debug("Before Merge"); + client.tableOperations().flush(table, null, null, true); + printAndVerifyFileMetadata(getCluster().getServerContext(), + TableId.of(client.tableOperations().tableIdMap().get(table))); + client.tableOperations().merge(table, start == null ? null : new Text(start), end == null ? null : new Text(end)); + client.tableOperations().flush(table, null, null, true); + log.debug("After Merge"); + printAndVerifyFileMetadata(getCluster().getServerContext(), + TableId.of(client.tableOperations().tableIdMap().get(table))); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { HashSet<String> observed = new HashSet<>(); diff --git a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java index 1c4541ff17..d050df00e0 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java @@ -190,14 +190,16 @@ public class MergeStateIT extends ConfigurableMacBase { metaDataStateStore .setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer, null))); - // onos... there's a new tablet online - stats = scan(state, metaDataStateStore); - assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(accumuloClient, state)); - - // chop it - m = TabletColumnFamily.createPrevRowMutation(tablet); - ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk")); - update(accumuloClient, m); + // No more chop compactions for merges + // + // // onos... there's a new tablet online + // stats = scan(state, metaDataStateStore); + // assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(accumuloClient, state)); + // + // // chop it + // m = TabletColumnFamily.createPrevRowMutation(tablet); + // ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk")); + // update(accumuloClient, m); stats = scan(state, metaDataStateStore); assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(accumuloClient, state)); diff --git a/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java new file mode 100644 index 0000000000..f84d5138b8 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/util/FileMetadataUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.server.ServerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileMetadataUtil { + + private static final Logger log = LoggerFactory.getLogger(FileMetadataUtil.class); + + public static Map<StoredTabletFile,DataFileValue> + printAndVerifyFileMetadata(final ServerContext ctx, TableId tableId) { + return printAndVerifyFileMetadata(ctx, tableId, -1); + } + + public static Map<StoredTabletFile,DataFileValue> + printAndVerifyFileMetadata(final ServerContext ctx, TableId tableId, int expectedFiles) { + final Map<StoredTabletFile,DataFileValue> files = new HashMap<>(); + + try (TabletsMetadata tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId) + .fetch(ColumnType.FILES, ColumnType.PREV_ROW).build()) { + + // Read each file referenced by that table + int i = 0; + for (TabletMetadata tabletMetadata : tabletsMetadata) { + for (Entry<StoredTabletFile,DataFileValue> fileEntry : tabletMetadata.getFilesMap() + .entrySet()) { + StoredTabletFile file = fileEntry.getKey(); + DataFileValue dfv = fileEntry.getValue(); + files.put(file, dfv); + log.debug("Extent: " + tabletMetadata.getExtent() + "; File Name: " + file.getFileName() + + "; Range: " + file.getRange() + "; Entries: " + dfv.getNumEntries() + ", Size: " + + dfv.getSize()); + i++; + } + } + + log.debug(""); + if (expectedFiles >= 0) { + assertEquals(expectedFiles, i); + } + } + + return files; + } +}