This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 5a7e6eba0c Use single mutation during no chop fence upgrade (#3954) 5a7e6eba0c is described below commit 5a7e6eba0c390ddbdecef95a2dc2cad6f64a83f3 Author: EdColeman <d...@etcoleman.com> AuthorDate: Fri Nov 24 18:41:41 2023 -0500 Use single mutation during no chop fence upgrade (#3954) - Uses single mutation during no chop fence upgrade - Add guard to prevent trying empty mutation --- .../accumulo/manager/upgrade/Upgrader11to12.java | 158 ++++---- .../manager/upgrade/Upgrader11to12Test.java | 406 ++++++++++++++------- 2 files changed, 351 insertions(+), 213 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java index 2d9f6b4a07..72127a4d4c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java @@ -20,27 +20,31 @@ package org.apache.accumulo.manager.upgrade; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; -import static org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily; import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING; -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; +import java.util.Arrays; +import java.util.Map; + import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; +import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.checkerframework.checker.nullness.qual.NonNull; @@ -48,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; public class Upgrader11to12 implements Upgrader { @@ -84,111 +89,100 @@ public class Upgrader11to12 implements Upgrader { public void upgradeRoot(@NonNull ServerContext context) { log.debug("Upgrade root: upgrading to data version {}", METADATA_FILE_JSON_ENCODING); var rootName = Ample.DataLevel.METADATA.metaTable(); - processReferences(context, rootName); + // not using ample to avoid StoredTabletFile because old file ref is incompatible + try (BatchWriter batchWriter = context.createBatchWriter(rootName); Scanner scanner = + new IsolatedScanner(context.createScanner(rootName, Authorizations.EMPTY))) { + processReferences(batchWriter, scanner, rootName); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Failed to find table " + rootName, ex); + } catch (MutationsRejectedException mex) { + log.warn("Failed to update reference for table: " + rootName); + log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries()); + throw new IllegalStateException("Failed to process table: " + rootName, mex); + } } @Override public void upgradeMetadata(@NonNull ServerContext context) { log.debug("Upgrade metadata: upgrading to data version {}", METADATA_FILE_JSON_ENCODING); var metaName = Ample.DataLevel.USER.metaTable(); - processReferences(context, metaName); + try (BatchWriter batchWriter = context.createBatchWriter(metaName); Scanner scanner = + new IsolatedScanner(context.createScanner(metaName, Authorizations.EMPTY))) { + processReferences(batchWriter, scanner, metaName); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Failed to find table " + metaName, ex); + } catch (MutationsRejectedException mex) { + log.warn("Failed to update reference for table: " + metaName); + log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries()); + throw new IllegalStateException("Failed to process table: " + metaName, mex); + } } - private void processReferences(ServerContext context, String tableName) { - // not using ample to avoid StoredTabletFile because old file ref is incompatible - try (AccumuloClient c = Accumulo.newClient().from(context.getProperties()).build(); - BatchWriter batchWriter = c.createBatchWriter(tableName); Scanner scanner = - new IsolatedScanner(context.createScanner(tableName, Authorizations.EMPTY))) { + void processReferences(BatchWriter batchWriter, Scanner scanner, String tableName) { + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + try { + Mutation update = null; + for (Map.Entry<Key,Value> entry : scanner) { + Key key = entry.getKey(); + Value value = entry.getValue(); + Preconditions.checkState(key.getColumnVisibilityData().length() == 0, + "Expected empty visibility, saw %s ", key.getColumnVisibilityData()); + // on new row, write current mutation and prepare a new one. + Text r = key.getRow(); + if (update == null) { + update = new Mutation(r); + } else if (!Arrays.equals(update.getRow(), TextUtil.getBytes(r))) { + if (log.isTraceEnabled()) { + log.trace("table: {}, update: {}", tableName, update.prettyPrint()); + } + if (!update.getUpdates().isEmpty()) { + batchWriter.addMutation(update); + } + update = new Mutation(r); + } - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - scanner.forEach((k, v) -> { - var family = k.getColumnFamily(); + var family = key.getColumnFamily(); if (family.equals(DataFileColumnFamily.NAME)) { - upgradeDataFileCF(k, v, batchWriter, tableName); + upgradeDataFileCF(key, value, update); } else if (family.equals(ChoppedColumnFamily.NAME)) { - removeChoppedCF(k, batchWriter, tableName); + log.warn( + "Deleting chopped reference from:{}. Previous split or delete may not have completed cleanly. Ref: {}", + tableName, key.getRow()); + update.at().family(ChoppedColumnFamily.STR_NAME).qualifier(ChoppedColumnFamily.STR_NAME) + .delete(); } else if (family.equals(ExternalCompactionColumnFamily.NAME)) { - removeExternalCompactionCF(k, batchWriter, tableName); + log.debug( + "Deleting external compaction reference from:{}. Previous compaction may not have completed. Ref: {}", + tableName, key.getRow()); + update.at().family(ExternalCompactionColumnFamily.NAME) + .qualifier(key.getColumnQualifier()).delete(); } else { throw new IllegalStateException("Processing: " + tableName + " Received unexpected column family processing references: " + family); } - }); + } + // send last mutation + if (update != null && !update.getUpdates().isEmpty()) { + log.trace("table: {}, update: {}", tableName, update.prettyPrint()); + batchWriter.addMutation(update); + } } catch (MutationsRejectedException mex) { log.warn("Failed to update reference for table: " + tableName); log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries()); throw new IllegalStateException("Failed to process table: " + tableName, mex); - } catch (Exception ex) { - throw new IllegalStateException("Failed to process table: " + tableName, ex); } } @VisibleForTesting - void upgradeDataFileCF(final Key key, final Value value, final BatchWriter batchWriter, - final String tableName) { + void upgradeDataFileCF(final Key key, final Value value, final Mutation m) { String file = key.getColumnQualifier().toString(); // filter out references if they are in the correct format already. if (fileNeedsConversion(file)) { var fileJson = StoredTabletFile.of(new Path(file)).getMetadataText(); - try { - Mutation update = new Mutation(key.getRow()); - update.at().family(DataFileColumnFamily.STR_NAME).qualifier(fileJson).put(value); - log.trace("table: {}, adding: {}", tableName, update.prettyPrint()); - batchWriter.addMutation(update); - - Mutation delete = new Mutation(key.getRow()); - delete.at().family(DataFileColumnFamily.STR_NAME).qualifier(file).delete(); - log.trace("table {}: deleting: {}", tableName, delete.prettyPrint()); - batchWriter.addMutation(delete); - } catch (MutationsRejectedException ex) { - // include constraint violation info in log - but stop upgrade - log.warn( - "Failed to update file reference for table: " + tableName + ". row: " + key.getRow()); - log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries()); - throw new IllegalStateException("File conversion failed. Aborting upgrade", ex); - } - } - } - - @VisibleForTesting - void removeChoppedCF(final Key key, final BatchWriter batchWriter, final String tableName) { - Mutation delete = null; - try { - delete = new Mutation(key.getRow()).at().family(ChoppedColumnFamily.STR_NAME) - .qualifier(ChoppedColumnFamily.STR_NAME).delete(); - log.warn( - "Deleting chopped reference from:{}. Previous split or delete may not have completed cleanly. Ref: {}", - tableName, delete.prettyPrint()); - batchWriter.addMutation(delete); - } catch (MutationsRejectedException ex) { - log.warn("Failed to delete obsolete chopped CF reference for table: " + tableName + ". Ref: " - + delete.prettyPrint() + ". Will try to continue. Ref may need to be manually removed"); - log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries()); - throw new IllegalStateException( - "Failed to delete obsolete chopped CF reference for table: " + tableName, ex); - } - } - - @VisibleForTesting - void removeExternalCompactionCF(final Key key, final BatchWriter batchWriter, - final String tableName) { - Mutation delete = null; - try { - delete = new Mutation(key.getRow()).at().family(ExternalCompactionColumnFamily.NAME) - .qualifier(key.getColumnQualifier()).delete(); - log.debug( - "Deleting external compaction reference from:{}. Previous compaction may not have completed. Ref: {}", - tableName, delete.prettyPrint()); - batchWriter.addMutation(delete); - } catch (MutationsRejectedException ex) { - log.warn("Failed to delete obsolete external compaction CF reference for table: " + tableName - + ". Ref: " + delete.prettyPrint() - + ". Will try to continue. Ref may need to be manually removed"); - log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries()); - throw new IllegalStateException( - "Failed to delete obsolete external compaction CF reference for table: " + tableName, ex); + m.at().family(DataFileColumnFamily.STR_NAME).qualifier(fileJson).put(value); + m.at().family(DataFileColumnFamily.STR_NAME).qualifier(file).delete(); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java index da12f4ede2..ee45534dab 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java @@ -19,14 +19,12 @@ package org.apache.accumulo.manager.upgrade; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; -import static org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.newCapture; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; @@ -35,12 +33,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.Key; @@ -48,6 +49,10 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -62,198 +67,337 @@ public class Upgrader11to12Test { private static final Logger LOG = LoggerFactory.getLogger(Upgrader11to12Test.class); @Test - void upgradeDataFileCFTest() throws Exception { + void upgradeDataFileCF2Test() { Upgrader11to12 upgrader = new Upgrader11to12(); - BatchWriter bw = createMock(BatchWriter.class); - Capture<Mutation> capturedAdd = newCapture(); - bw.addMutation(capture(capturedAdd)); - expectLastCall(); - - Capture<Mutation> capturedDelete = newCapture(); - bw.addMutation(capture(capturedDelete)); - expectLastCall(); - - replay(bw); - String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf"; Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME) .qualifier(new Text(fileName)).build(); Value v = new Value("1234,5678"); - upgrader.upgradeDataFileCF(k, v, bw, "aTable"); + Mutation upgrade = new Mutation(k.getRow()); + upgrader.upgradeDataFileCF(k, v, upgrade); - StoredTabletFile stf = StoredTabletFile.of(new Path(fileName)); - Mutation add = new Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME) - .qualifier(stf.getMetadataText()).put(v); - LOG.debug("add mutation to be expected: {}", add.prettyPrint()); + var pending = upgrade.getUpdates(); + assertEquals(2, pending.size()); + // leverage sort order for "expected" values + // check file entry converted is in the mutation + Iterator<ColumnUpdate> m = pending.iterator(); + var cu1 = m.next(); + assertEquals("file", new Text(cu1.getColumnFamily()).toString()); - Mutation delete = new Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME) - .qualifier(new Text(fileName)).delete(); - LOG.debug("delete mutation to be expected: {}", delete.prettyPrint()); + StoredTabletFile oldFileEntry = StoredTabletFile.of(new Path(fileName)); + StoredTabletFile updateEnry = StoredTabletFile.of(new String(cu1.getColumnQualifier(), UTF_8)); - assertEquals(add, capturedAdd.getValue()); - assertEquals(delete, capturedDelete.getValue()); + assertEquals(oldFileEntry, updateEnry); + assertFalse(cu1.isDeleted()); - verify(bw); - } + // check old file entry is deleted is in the mutation - @Test - void upgradeDataFileCFSkipConvertedTest() { - Upgrader11to12 upgrader = new Upgrader11to12(); + var cu2 = m.next(); + assertEquals("file", new Text(cu1.getColumnFamily()).toString()); + assertEquals(fileName, new String(cu2.getColumnQualifier(), UTF_8)); + assertTrue(cu2.isDeleted()); - BatchWriter bw = createMock(BatchWriter.class); + } - replay(bw); + @Test + public void processReferencesTest() throws Exception { + BatchWriter batchWriter = mock(BatchWriter.class); + Capture<Mutation> capturedUpdate1 = newCapture(); + batchWriter.addMutation(capture(capturedUpdate1)); + expectLastCall().once(); + + Capture<Mutation> capturedUpdate2 = newCapture(); + batchWriter.addMutation(capture(capturedUpdate2)); + expectLastCall().once(); + + // create sample data "served" by the mocked scanner + TreeMap<Key,Value> scanData = new TreeMap<>(); + Text row1 = new Text("123"); + + String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf"; + Key key1 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build(); + Value value1 = new Value("123,456"); + scanData.put(key1, value1); + + String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf"; + Key key2 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build(); + Value value2 = new Value("321,654"); + scanData.put(key2, value2); + + Key chop1 = Key.builder(false).row(row1).family(ChoppedColumnFamily.NAME) + .qualifier(ChoppedColumnFamily.NAME).build(); + scanData.put(chop1, null); + + Key extern1 = Key.builder(false).row(row1).family(ExternalCompactionColumnFamily.NAME) + .qualifier(ExternalCompactionColumnFamily.NAME).build(); + scanData.put(extern1, null); - String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf"; - StoredTabletFile stf = StoredTabletFile.of(new Path(fileName)); + Text row2 = new Text("234"); - Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME) - .qualifier(stf.getMetadataText()).build(); - Value v = new Value("1234,5678"); + String fileName3 = "hdfs://localhost:8020/accumulo/tables/13/default_tablet/C000000v.rf"; + Key key3 = + Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName3).build(); + Value value3 = new Value("1,2"); + scanData.put(key3, value3); - upgrader.upgradeDataFileCF(k, v, bw, "aTable"); + Scanner scanner = mock(Scanner.class); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + expectLastCall(); - // with file entry in correct formation, no mutations are expected. - verify(bw); - } + expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); + replay(batchWriter, scanner); - @Test - void upgradeDataFileCFInvalidMutationTest() throws Exception { Upgrader11to12 upgrader = new Upgrader11to12(); + upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"); - BatchWriter bw = createMock(BatchWriter.class); - Capture<Mutation> capturedAdd = newCapture(); - bw.addMutation(capture(capturedAdd)); - expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(), - 0, new NullPointerException())); + LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint()); + var u1 = capturedUpdate1.getValue(); + // 2 file add, 2 file delete. 1 chop delete, 1 ext comp delete + assertEquals(6, u1.getUpdates().size()); - replay(bw); + LOG.info("c:{}", capturedUpdate2.getValue().prettyPrint()); + var u2 = capturedUpdate2.getValue(); + // 1 add, 1 delete + assertEquals(2, u2.getUpdates().size()); + assertEquals(1, u2.getUpdates().stream().filter(ColumnUpdate::isDeleted).count()); - String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf"; - Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME) - .qualifier(new Text(fileName)).build(); - Value v = new Value("1234,5678"); - - assertThrows(IllegalStateException.class, () -> upgrader.upgradeDataFileCF(k, v, bw, "aTable")); + verify(batchWriter, scanner); - verify(bw); } @Test - void upgradeDataFileCFInvalidPathTest() { - Upgrader11to12 upgrader = new Upgrader11to12(); - - BatchWriter bw = createMock(BatchWriter.class); - - replay(bw); + public void skipConvertedFileTest() throws Exception { + BatchWriter batchWriter = mock(BatchWriter.class); + Capture<Mutation> capturedUpdate1 = newCapture(); + batchWriter.addMutation(capture(capturedUpdate1)); + expectLastCall().once(); + // create sample data "served" by the mocked scanner + TreeMap<Key,Value> scanData = new TreeMap<>(); + Text row1 = new Text("123"); + + // reference already in expected form with fence info. + String fileName1 = + "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}"; + Key key1 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build(); + Value value1 = new Value("123,456"); + scanData.put(key1, value1); + + String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf"; + Key key2 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build(); + Value value2 = new Value("321,654"); + scanData.put(key2, value2); + + Scanner scanner = mock(Scanner.class); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + expectLastCall(); - String invalidPath = "badPath"; + expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); + replay(batchWriter, scanner); - Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME) - .qualifier(new Text(invalidPath)).build(); - Value v = new Value("1234,5678"); + Upgrader11to12 upgrader = new Upgrader11to12(); + upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"); - assertThrows(IllegalArgumentException.class, - () -> upgrader.upgradeDataFileCF(k, v, bw, "aTable")); + LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint()); + var u1 = capturedUpdate1.getValue(); + // 1 add, 1 delete + assertEquals(2, u1.getUpdates().size()); + assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count()); - verify(bw); + verify(batchWriter, scanner); } @Test - void removeChoppedCFTest() throws Exception { - Upgrader11to12 upgrader = new Upgrader11to12(); + void failOnMutationErrorTest() throws Exception { - Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME) - .qualifier(ExternalCompactionColumnFamily.NAME).build(); + BatchWriter batchWriter = mock(BatchWriter.class); + Capture<Mutation> capturedUpdate1 = newCapture(); + batchWriter.addMutation(capture(capturedUpdate1)); + expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(), + 0, new NullPointerException())).once(); - BatchWriter bw = createMock(BatchWriter.class); - Capture<Mutation> captured = newCapture(); - bw.addMutation(capture(captured)); - expectLastCall(); + TreeMap<Key,Value> scanData = new TreeMap<>(); + Text row1 = new Text("123"); - replay(bw); + // reference already in expected form with fence info. + String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf"; + Key key1 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build(); + Value value1 = new Value("123,456"); + scanData.put(key1, value1); - upgrader.removeChoppedCF(k, bw, "aTable"); + Scanner scanner = mock(Scanner.class); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + expectLastCall(); - Mutation delete = new Mutation(k.getRow()).at().family(ChoppedColumnFamily.NAME) - .qualifier(ChoppedColumnFamily.NAME).delete(); + expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); + replay(batchWriter, scanner); + Upgrader11to12 upgrader = new Upgrader11to12(); - assertEquals(delete, captured.getValue()); + assertThrows(IllegalStateException.class, + () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata")); - verify(bw); + verify(batchWriter, scanner); } @Test - void removeChoppedCFContinuesTest() throws Exception { - Upgrader11to12 upgrader = new Upgrader11to12(); - - Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME) - .qualifier(ExternalCompactionColumnFamily.NAME).build(); - - BatchWriter bw = createMock(BatchWriter.class); - Capture<Mutation> captured = newCapture(); - bw.addMutation(capture(captured)); - expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(), - 0, new NullPointerException())); + void upgradeDataFileCFInvalidPathTest() throws Exception { + + BatchWriter batchWriter = mock(BatchWriter.class); + Capture<Mutation> capturedUpdate1 = newCapture(); + batchWriter.addMutation(capture(capturedUpdate1)); + // expecting that exception will be called before mutation is updated. + expectLastCall().andThrow(new UnsupportedOperationException()).anyTimes(); + + // create sample data "served" by the mocked scanner + TreeMap<Key,Value> scanData = new TreeMap<>(); + Text row1 = new Text("123"); + + String fileName1 = "bad path"; + Key key1 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build(); + Value value1 = new Value("123,456"); + scanData.put(key1, value1); + + String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf"; + Key key2 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build(); + Value value2 = new Value("321,654"); + scanData.put(key2, value2); + + Scanner scanner = mock(Scanner.class); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + expectLastCall(); - replay(bw); + expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); + replay(batchWriter, scanner); - assertThrows(IllegalStateException.class, () -> upgrader.removeChoppedCF(k, bw, "aTable")); + Upgrader11to12 upgrader = new Upgrader11to12(); + assertThrows(IllegalArgumentException.class, + () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata")); - verify(bw); + verify(batchWriter, scanner); } @Test - void removeExternalCompactionCFTest() throws Exception { - Upgrader11to12 upgrader = new Upgrader11to12(); + void unexpectedColFailsTest() throws Exception { - Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME) - .qualifier(new Text("ECID:1234")).build(); + BatchWriter batchWriter = mock(BatchWriter.class); + Capture<Mutation> capturedUpdate1 = newCapture(); + batchWriter.addMutation(capture(capturedUpdate1)); + // expecting that exception will be called before mutation is updated. + expectLastCall().andThrow(new UnsupportedOperationException()).anyTimes(); - BatchWriter bw = createMock(BatchWriter.class); - Capture<Mutation> captured = newCapture(); - bw.addMutation(capture(captured)); - expectLastCall(); + // create sample data "served" by the mocked scanner + TreeMap<Key,Value> scanData = new TreeMap<>(); + Text row1 = new Text("123"); - replay(bw); + Key key1 = Key.builder(false).row(row1).family(LastLocationColumnFamily.NAME).qualifier("srv1") + .build(); + Value value1 = new Value("123,456"); + scanData.put(key1, value1); - upgrader.removeExternalCompactionCF(k, bw, "aTable"); + Scanner scanner = mock(Scanner.class); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + expectLastCall(); - Mutation delete = new Mutation(k.getRow()).at().family(ExternalCompactionColumnFamily.NAME) - .qualifier(new Text("ECID:1234")).delete(); + expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); + replay(batchWriter, scanner); - assertEquals(delete, captured.getValue()); + Upgrader11to12 upgrader = new Upgrader11to12(); + assertThrows(IllegalStateException.class, + () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata")); - for (ColumnUpdate update : captured.getValue().getUpdates()) { - assertEquals(ExternalCompactionColumnFamily.STR_NAME, - new String(update.getColumnFamily(), UTF_8)); - assertEquals("ECID:1234", new String(update.getColumnQualifier(), UTF_8)); - assertTrue(update.isDeleted()); - } - verify(bw); + verify(batchWriter, scanner); } + /** + * process 3 rows, 2 should result in no mutations and batch writer addMutation should not be + * called for those rows + */ @Test - void removeExternalCompactionCFContinuesTest() throws Exception { - Upgrader11to12 upgrader = new Upgrader11to12(); - - Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME) - .qualifier(new Text("ECID:1234")).build(); + public void verifyEmptyMutation() throws Exception { + BatchWriter batchWriter = mock(BatchWriter.class); + Capture<Mutation> capturedUpdate1 = newCapture(); + batchWriter.addMutation(capture(capturedUpdate1)); + expectLastCall().once(); + // create sample data "served" by the mocked scanner + TreeMap<Key,Value> scanData = new TreeMap<>(); + + Text row1 = new Text("1"); + + String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/1111000v.rf"; + Key key1 = + Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build(); + Value value1 = new Value("111,222"); + scanData.put(key1, value1); + + Text row2 = new Text("a"); + + // reference already in expected form with fence info. + String fileName2 = + "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}"; + Key key2 = + Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName2).build(); + Value value2 = new Value("222,333"); + scanData.put(key2, value2); + + Text row3 = new Text("b"); + + // reference already in expected form with fence info. + String fileName3 = + "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/BBBB000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}"; + Key key3 = + Key.builder(false).row(row3).family(DataFileColumnFamily.NAME).qualifier(fileName3).build(); + Value value3 = new Value("333,444"); + scanData.put(key3, value3); + + Scanner scanner = mock(Scanner.class); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + expectLastCall(); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + expectLastCall(); - BatchWriter bw = createMock(BatchWriter.class); - Capture<Mutation> captured = newCapture(); - bw.addMutation(capture(captured)); - expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(), - 0, new NullPointerException())); + expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); + replay(batchWriter, scanner); - replay(bw); + Upgrader11to12 upgrader = new Upgrader11to12(); + upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"); - assertThrows(IllegalStateException.class, - () -> upgrader.removeExternalCompactionCF(k, bw, "aTable")); + LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint()); + var u1 = capturedUpdate1.getValue(); + // 1 add, 1 delete + assertEquals(2, u1.getUpdates().size()); + assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count()); - verify(bw); + verify(batchWriter, scanner); } @Test