This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch no-chop-merge in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/no-chop-merge by this push: new c44fca29f1 Add file metadata ITs (#3614) c44fca29f1 is described below commit c44fca29f1cbc356375dd42f40db164d9fba2080 Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Fri Sep 8 12:14:34 2023 -0400 Add file metadata ITs (#3614) This commit adds integration tests to verify stored ranges work correctly for file metadata --- .../accumulo/test/functional/FileMetadataIT.java | 478 +++++++++++++++++++++ 1 file changed, 478 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FileMetadataIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FileMetadataIT.java new file mode 100644 index 0000000000..c642dbd052 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/FileMetadataIT.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.apache.accumulo.test.util.FileMetadataUtil.printAndVerifyFileMetadata; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.time.Duration; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.TestIngest.IngestParams; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.accumulo.test.VerifyIngest.VerifyParams; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.MoreCollectors; + +@Tag(MINI_CLUSTER_ONLY) +public class FileMetadataIT extends AccumuloClusterHarness { + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(6); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + } + + // private static final Logger log = LoggerFactory.getLogger(FileMetadataIT.class); + static final int COLS = 1; + static final String COLF = "colf"; + + public static void ingest(AccumuloClient accumuloClient, int rows, int cols, int width, + int offset, String tableName) throws Exception { + IngestParams params = new IngestParams(accumuloClient.properties(), tableName, rows); + params.cols = cols; + params.dataSize = width; + params.startRow = offset; + params.columnFamily = COLF; + params.createTable = true; + TestIngest.ingest(accumuloClient, params); + } + + private static void verify(AccumuloClient accumuloClient, int rows, int cols, int width, + int offset, String tableName) throws Exception { + VerifyParams params = new VerifyParams(accumuloClient.properties(), tableName, rows); + params.rows = rows; + params.dataSize = width; + params.startRow = offset; + params.columnFamily = COLF; + params.cols = cols; + VerifyIngest.verifyIngest(accumuloClient, params); + } + + public static Text t(String s) { + return new Text(s); + } + + public static Mutation m(String row, String cf, String cq, String value) { + Mutation m = new Mutation(t(row)); + m.put(t(cf), t(cq), new Value(value)); + return m; + } + + @Test + public void contiguousRangeTest() throws Exception { + ServerContext ctx = getCluster().getServerContext(); + + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + // Need permission to write to metadata + accumuloClient.securityOperations().grantTablePermission(accumuloClient.whoami(), + MetadataTable.NAME, TablePermission.WRITE); + + final int rows = 10000; + final String tableName = getUniqueNames(1)[0]; + accumuloClient.tableOperations().create(tableName); + final TableId tableId = + TableId.of(accumuloClient.tableOperations().tableIdMap().get(tableName)); + + // Ingest 10000 rows start with row 1 and flush and verify data + ingest(accumuloClient, rows, COLS, 10, 1, tableName); + accumuloClient.tableOperations().flush(tableName, null, null, true); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + + // Bring tablet offline so we can modify file metadata + accumuloClient.tableOperations().offline(tableName, true); + + try (TabletsMetadata tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId) + .fetch(ColumnType.FILES, ColumnType.PREV_ROW).build()) { + + // Read each file (should only be 1), and split into 4 ranges + for (TabletMetadata tabletMetadata : tabletsMetadata) { + final KeyExtent ke = tabletMetadata.getExtent(); + + // Create a mutation to delete the existing file metadata entry with infinite range + TabletMutator mutator = ctx.getAmple().mutateTablet(ke); + + // Read each of the 10 files referenced by the table, should be 1 per tablet + for (Entry<StoredTabletFile,DataFileValue> fileEntry : tabletMetadata.getFilesMap() + .entrySet()) { + StoredTabletFile file = fileEntry.getKey(); + DataFileValue value = fileEntry.getValue(); + + // Create a mutation to delete the existing file metadata entry with infinite range + mutator.deleteFile(file); + + // Add 4 contiguous ranges + final int ranges = 4; + for (int i = 1; i <= ranges; i++) { + int rowsPerRange = rows / ranges; + int endRow = i * rowsPerRange; + + mutator.putFile( + StoredTabletFile.of(file.getPath(), + new Range(new Text("row_" + String.format("%010d", endRow - rowsPerRange)), + false, new Text("row_" + String.format("%010d", endRow)), true)), + new DataFileValue(value.getSize() / ranges, value.getNumEntries() / ranges)); + } + mutator.mutate(); + } + } + } + + accumuloClient.tableOperations().online(tableName, true); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + // Should have 4 files + printAndVerifyFileMetadata(getServerContext(), tableId, 4); + + // Compact and verify the correct rows are still valid + accumuloClient.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + printAndVerifyFileMetadata(getServerContext(), tableId, 1); + } + } + + @Test + public void fencedRangeTest() throws Exception { + ServerContext ctx = getCluster().getServerContext(); + + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + // Need permission to write to metadata + accumuloClient.securityOperations().grantTablePermission(accumuloClient.whoami(), + MetadataTable.NAME, TablePermission.WRITE); + + final int rows = 10000; + final int ranges = 4; + int rowsPerRange = rows / ranges; + + final String tableName = getUniqueNames(1)[0]; + accumuloClient.tableOperations().create(tableName); + final TableId tableId = + TableId.of(accumuloClient.tableOperations().tableIdMap().get(tableName)); + + // Ingest 10000 rows start with row 1 and flush and verify data + ingest(accumuloClient, rows, COLS, 10, 1, tableName); + accumuloClient.tableOperations().flush(tableName, null, null, true); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + + // Bring tablet offline so we can modify file metadata + accumuloClient.tableOperations().offline(tableName, true); + + try (TabletsMetadata tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId) + .fetch(ColumnType.FILES, ColumnType.PREV_ROW).build()) { + + // Read each file, should just be 1. + // Split into 4 ranges and skip the second so only 3/4 of the file should be readable + for (TabletMetadata tabletMetadata : tabletsMetadata) { + var fileEntry = tabletMetadata.getFilesMap().entrySet().stream() + .collect(MoreCollectors.onlyElement()); + StoredTabletFile file = fileEntry.getKey(); + DataFileValue value = fileEntry.getValue(); + final KeyExtent ke = tabletMetadata.getExtent(); + + // Create a mutation to delete the existing file metadata entry with infinite range + TabletMutator mutator = ctx.getAmple().mutateTablet(ke); + mutator.deleteFile(file); + + // Add 3 ranges + for (int i = 1; i <= ranges; i++) { + // Skip second range + if (i == 2) { + continue; + } + int endRow = i * rowsPerRange; + mutator.putFile( + StoredTabletFile.of(file.getPath(), + new Range(new Text("row_" + String.format("%010d", endRow - rowsPerRange)), + false, new Text("row_" + String.format("%010d", endRow)), true)), + new DataFileValue(value.getSize() / ranges, value.getNumEntries() / ranges)); + } + + mutator.mutate(); + } + } + + // Write mutations to metadata and then bring the table back online + accumuloClient.tableOperations().online(tableName, true); + + // Verify rows 1 - 2500 are readable + verify(accumuloClient, rowsPerRange, COLS, 10, 1, tableName); + // Rows 2501 - 5000 should not be fenced and not visible + // Try and read 2500 rows and verify none are visible, should throw an exception with 0 rows + // read + verifyNoRows(accumuloClient, rowsPerRange, COLS, 10, rowsPerRange + 1, tableName); + // Verify rows 5001 - 10000 are readable + verify(accumuloClient, rowsPerRange * 2, COLS, 10, (rowsPerRange * 2) + 1, tableName); + + // Should have 3 rfiles in metadata + printAndVerifyFileMetadata(getServerContext(), tableId, 3); + // Compact and verify the correct rows are still valid + accumuloClient.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + // Verify rows 1 - 2500 are readable + verify(accumuloClient, rowsPerRange, COLS, 10, 1, tableName); + // Rows 2501 - 5000 should not be fenced and not visible + // Try and read 2500 rows and verify none are visible, should throw an exception with 0 + // rows read + verifyNoRows(accumuloClient, rowsPerRange, COLS, 10, rowsPerRange + 1, tableName); + // Verify rows 5001 - 10000 are readable + verify(accumuloClient, rowsPerRange * 2, COLS, 10, (rowsPerRange * 2) + 1, tableName); + // Should just have 1 file after compaction + printAndVerifyFileMetadata(getServerContext(), tableId, 1); + + } + } + + @Test + public void splitsRangeTest() throws Exception { + ServerContext ctx = getCluster().getServerContext(); + + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + // Need permission to write to metadata + accumuloClient.securityOperations().grantTablePermission(accumuloClient.whoami(), + MetadataTable.NAME, TablePermission.WRITE); + + final int rows = 100000; + final String tableName = getUniqueNames(1)[0]; + accumuloClient.tableOperations().create(tableName); + final TableId tableId = + TableId.of(accumuloClient.tableOperations().tableIdMap().get(tableName)); + + // Divide table into 10 tablets with end rows of 10000, 20000, etc. + final SortedSet<Text> splits = new TreeSet<>(); + for (int i = 1; i <= 10; i++) { + splits.add(new Text("row_" + String.format("%010d", ((i * 10000))))); + } + + // Ingest 100000 rows start with row 1 and flush and verify data + accumuloClient.tableOperations().addSplits(tableName, splits); + ingest(accumuloClient, rows, COLS, 10, 1, tableName); + accumuloClient.tableOperations().flush(tableName, null, null, true); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + + // Bring tablet offline so we can modify file metadata + accumuloClient.tableOperations().offline(tableName, true); + + try (TabletsMetadata tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId) + .fetch(ColumnType.FILES, ColumnType.PREV_ROW).build()) { + + // Read each file, should be 10 + // Split into 2 ranges + for (TabletMetadata tabletMetadata : tabletsMetadata) { + final KeyExtent ke = tabletMetadata.getExtent(); + + // Create a mutation to delete the existing file metadata entry with infinite range + TabletMutator mutator = ctx.getAmple().mutateTablet(ke); + + // Read each of the 10 files referenced by the table, should be 1 per tablet + for (Entry<StoredTabletFile,DataFileValue> fileEntry : tabletMetadata.getFilesMap() + .entrySet()) { + StoredTabletFile file = fileEntry.getKey(); + DataFileValue value = fileEntry.getValue(); + final int endRow = Integer.parseInt(ke.endRow().toString().replace("row_", "")); + + mutator.deleteFile(file); + mutator.putFile( + StoredTabletFile.of(file.getPath(), + new Range(new Text("row_" + String.format("%010d", (endRow - 10000))), false, + new Text("row_" + String.format("%010d", (endRow - 5000))), true)), + new DataFileValue(value.getSize() / 2, value.getNumEntries() / 2)); + mutator.putFile( + StoredTabletFile.of(file.getPath(), + new Range(new Text("row_" + String.format("%010d", (endRow - 5000))), false, + new Text("row_" + String.format("%010d", endRow)), true)), + new DataFileValue(value.getSize() / 2, value.getNumEntries() / 2)); + + mutator.mutate(); + } + } + } + + accumuloClient.tableOperations().online(tableName, true); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + + printAndVerifyFileMetadata(getServerContext(), tableId, 20); + // Compact and verify the correct rows are still valid + accumuloClient.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + printAndVerifyFileMetadata(getServerContext(), tableId, 10); + + } + } + + @Test + public void splitsWithExistingRangesTest() throws Exception { + ServerContext ctx = getCluster().getServerContext(); + + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + // Need permission to write to metadata + accumuloClient.securityOperations().grantTablePermission(accumuloClient.whoami(), + MetadataTable.NAME, TablePermission.WRITE); + + final int rows = 100000; + final int ranges = 4; + int rowsPerRange = rows / ranges; + + final String tableName = getUniqueNames(1)[0]; + accumuloClient.tableOperations().create(tableName); + final TableId tableId = + TableId.of(accumuloClient.tableOperations().tableIdMap().get(tableName)); + + final SortedSet<Text> splits = new TreeSet<>(); + + // Ingest 10000 rows start with row 1 and flush and verify data + ingest(accumuloClient, rows, COLS, 10, 1, tableName); + accumuloClient.tableOperations().flush(tableName, null, null, true); + verify(accumuloClient, rows, COLS, 10, 1, tableName); + + // Bring tablet offline so we can modify file metadata + accumuloClient.tableOperations().offline(tableName, true); + + try (TabletsMetadata tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId) + .fetch(ColumnType.FILES, ColumnType.PREV_ROW).build()) { + + // Read each file, should just be 1. + // Split into 4 ranges and skip the second so only 3/4 of the file should be readable + for (TabletMetadata tabletMetadata : tabletsMetadata) { + var fileEntry = tabletMetadata.getFilesMap().entrySet().stream() + .collect(MoreCollectors.onlyElement()); + StoredTabletFile file = fileEntry.getKey(); + DataFileValue value = fileEntry.getValue(); + final KeyExtent ke = tabletMetadata.getExtent(); + + // Create a mutation to delete the existing file metadata entry with infinite range + TabletMutator mutator = ctx.getAmple().mutateTablet(ke); + mutator.deleteFile(file); + + // Add 3 ranges + for (int i = 1; i <= ranges; i++) { + // Skip second range + if (i == 2) { + continue; + } + int endRow = i * rowsPerRange; + mutator.putFile( + StoredTabletFile.of(file.getPath(), + new Range(new Text("row_" + String.format("%010d", endRow - rowsPerRange)), + false, new Text("row_" + String.format("%010d", endRow)), true)), + new DataFileValue(value.getSize() / ranges, value.getNumEntries() / ranges)); + } + + mutator.mutate(); + } + } + accumuloClient.tableOperations().online(tableName, true); + printAndVerifyFileMetadata(getServerContext(), tableId, 3); + // Verify rows 1 - 2500 are readable + verify(accumuloClient, rowsPerRange, COLS, 10, 1, tableName); + // Rows 2501 - 5000 should not be fenced and not visible + // Try and read 2500 rows and verify none are visible, should throw an exception + // with 0 rows read + verifyNoRows(accumuloClient, rowsPerRange, COLS, 10, rowsPerRange + 1, tableName); + // Verify rows 5001 - 10000 are readable + verify(accumuloClient, rowsPerRange * 2, COLS, 10, (rowsPerRange * 2) + 1, tableName); + + // Divide table into 10 tablets with end rows of 10000, 20000, etc. + splits.clear(); + for (int i = 1; i <= 10; i++) { + splits.add(new Text("row_" + String.format("%010d", ((i * 10000))))); + } + + // Ingest 100000 rows start with row 1 and flush and verify data + accumuloClient.tableOperations().addSplits(tableName, splits); + accumuloClient.tableOperations().flush(tableName, null, null, true); + // Verify rows 1 - 2500 are readable + verify(accumuloClient, rowsPerRange, COLS, 10, 1, tableName); + // Rows 2501 - 5000 should not be fenced and not visible + // Try and read 2500 rows and verify none are visible, should throw an exception + // with 0 rows read + verifyNoRows(accumuloClient, rowsPerRange, COLS, 10, rowsPerRange + 1, tableName); + // Verify rows 5001 - 10000 are readable + verify(accumuloClient, rowsPerRange * 2, COLS, 10, (rowsPerRange * 2) + 1, tableName); + + // There are 3 existing ranges of 25000, and with splits of 10000 rows we end up + // with overlap and should be 9 total files for the existing data + // range: 1 - 25000; splits: 10000, 20000, 30000 + // range: 50001 - 75000; splits: 60000, 70000, 80000 + // range: 75001 - 100000; splits: 80000, 90000, 100000 + printAndVerifyFileMetadata(getServerContext(), tableId, 9); + + // // Compact and verify the correct rows are still valid + accumuloClient.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + // Verify rows 1 - 2500 are readable + verify(accumuloClient, rowsPerRange, COLS, 10, 1, tableName); + // Rows 2501 - 5000 should not be fenced and not visible + // Try and read 2500 rows and verify none are visible, should throw an exception + // with 0 rows read + verifyNoRows(accumuloClient, rowsPerRange, COLS, 10, rowsPerRange + 1, tableName); + // Verify rows 5001 - 10000 are readable + verify(accumuloClient, rowsPerRange * 2, COLS, 10, (rowsPerRange * 2) + 1, tableName); + + // After compaction should be 8 files because data was ingested into + // 1 - 25000, 50001 - 10000. 25001 - 50000 was skipped so splits of + // 40000 and 50000 do not have data or files. + printAndVerifyFileMetadata(getServerContext(), tableId, 8); + } + } + + // In the future we should probably enhance the ingest verify code to be able to better verify + // ranges + // but for now we can at least verify no rows are read by checking the exception + private static void verifyNoRows(AccumuloClient accumuloClient, int rows, int cols, int width, + int offset, String tableName) throws Exception { + try { + verify(accumuloClient, rows, cols, width, offset, tableName); + fail("Should have failed"); + } catch (AccumuloException e) { + assertTrue(e.getMessage().contains("Did not read expected number of rows. Saw 0")); + } + } +}