This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new dc0d4f3132 refactors upgrade to use same code for all tablet metadata (#3988) dc0d4f3132 is described below commit dc0d4f3132e7febea17b1968f79f4294f4fd6bb6 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Nov 28 13:36:38 2023 -0500 refactors upgrade to use same code for all tablet metadata (#3988) The upgrade code called different code depending on where tablet metadata was stored. This change updates the code to use the same code to process all tabletmetadata whether its stored in ZK or an accumulo table. A secondary purpose of this changes is pull all upgrade code into the Upgrader11to12 class so that when that class is eventually deleted, no lingering code is left in the code base related to that specific upgrade. RootTabletMetadataTest was deleted and some of its test were migrated to Upgrader11to12Test --- .../core/metadata/schema/RootTabletMetadata.java | 58 ++---- .../metadata/schema/RootTabletMetadataTest.java | 119 ------------- .../accumulo/manager/upgrade/Upgrader11to12.java | 55 ++++-- .../manager/upgrade/Upgrader11to12Test.java | 195 ++++++++++----------- 4 files changed, 139 insertions(+), 288 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java index 8a39e34798..19285bebdb 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java @@ -36,7 +36,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.hadoop.io.Text; @@ -59,11 +58,7 @@ public class RootTabletMetadata { || fam.equals(FutureLocationColumnFamily.STR_NAME); }; - // JSON Mapping Version 1. Released with Accumulo version 2.1.0 - private static final int VERSION_1 = 1; - // JSON Mapping Version 2. Released with Accumulo version 3,1 - private static final int VERSION_2 = 2; - private static final int VERSION = VERSION_2; + private static final int VERSION = 1; // This class is used to serialize and deserialize root tablet metadata using GSon. Any changes to // this class must consider persisted data. @@ -126,40 +121,6 @@ public class RootTabletMetadata { data = new Data(VERSION, new TreeMap<>()); } - public static RootTabletMetadata upgrade(final String json) { - Data data = GSON.get().fromJson(json, Data.class); - int currVersion = data.getVersion(); - switch (currVersion) { - case VERSION_1: - RootTabletMetadata rtm = new RootTabletMetadata(); - Mutation m = convert1To2(data); - rtm.update(m); - return rtm; - case VERSION_2: - log.debug("no metadata version conversion required for {}", currVersion); - return new RootTabletMetadata(data); - default: - throw new IllegalArgumentException("Unsupported data version: " + currVersion); - } - } - - private static Mutation convert1To2(final Data data) { - Mutation mutation = - MetadataSchema.TabletsSection.TabletColumnFamily.createPrevRowMutation(RootTable.EXTENT); - data.columnValues.forEach((colFam, colQuals) -> { - if (colFam.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.STR_NAME)) { - colQuals.forEach((colQual, value) -> { - mutation.put(colFam, StoredTabletFile.serialize(colQual), value); - }); - } else { - colQuals.forEach((colQual, value) -> { - mutation.put(colFam, colQual, value); - }); - } - }); - return mutation; - } - /** * Apply a metadata table mutation to update internal entries. */ @@ -194,18 +155,21 @@ public class RootTabletMetadata { } } - /** - * Convert this class to a {@link TabletMetadata} - */ - public TabletMetadata toTabletMetadata() { + public Stream<SimpleImmutableEntry<Key,Value>> toKeyValues() { String row = RootTable.EXTENT.toMetaRow().toString(); - // use a stream so we don't have to re-sort in a new TreeMap<Key,Value> structure - Stream<SimpleImmutableEntry<Key,Value>> entries = data.columnValues.entrySet().stream() + return data.columnValues.entrySet().stream() .flatMap(famToQualVal -> famToQualVal.getValue().entrySet().stream() .map(qualVal -> new SimpleImmutableEntry<>( new Key(row, famToQualVal.getKey(), qualVal.getKey(), 1), new Value(qualVal.getValue())))); - return TabletMetadata.convertRow(entries.iterator(), + } + + /** + * Convert this class to a {@link TabletMetadata} + */ + public TabletMetadata toTabletMetadata() { + // use a stream so we don't have to re-sort in a new TreeMap<Key,Value> structure + return TabletMetadata.convertRow(toKeyValues().iterator(), EnumSet.allOf(TabletMetadata.ColumnType.class), false); } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadataTest.java deleted file mode 100644 index 6a0a81f93b..0000000000 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadataTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.metadata.schema; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RootTabletMetadataTest { - private static final Logger LOG = LoggerFactory.getLogger(RootTabletMetadataTest.class); - - @Test - public void convertRoot1File() { - String root21ZkData = - "{\"version\":1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A000000v.rf\":\"1368,61\"},\"last\":{\"100025091780006\":\"localhost:9997\"},\"loc\":{\"100025091780006\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"3\",\"lock\":\"tservers/localhost:9997/zlock#9db8961a-4ee9-400e-8e80-3353148baadd#0000000000$100025091780006\",\"time\":\"L53\"},\"~tab\":{\"~pr\":\"\\u0000\"}}}"; - - RootTabletMetadata rtm = RootTabletMetadata.upgrade(root21ZkData); - LOG.debug("converted column values: {}", rtm.toTabletMetadata().getFiles()); - - var files = rtm.toTabletMetadata().getFiles(); - LOG.info("FILES: {}", rtm.toTabletMetadata().getFilesMap()); - - assertEquals(1, files.size()); - assertTrue(files.contains(StoredTabletFile - .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A000000v.rf")))); - } - - @Test - public void convertRoot2Files() { - String root212ZkData2Files = - "{\"version\":1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/00000_00000.rf\":\"0,0\",\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000c.rf\":\"926,18\"},\"last\":{\"10001a84d7d0005\":\"localhost:9997\"},\"loc\":{\"10001a84d7d0005\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"2\",\"lock\":\"tservers/localhost:9997/zlock#d21adaa4-0f97-4004-9ff8-cce9dbb6687f#0000000000$10001a84d7d0005\",\"time\":\"L6\"},\"~tab\ [...] - - RootTabletMetadata rtm = RootTabletMetadata.upgrade(root212ZkData2Files); - LOG.debug("converted column values: {}", rtm.toTabletMetadata()); - - var files = rtm.toTabletMetadata().getFiles(); - LOG.info("FILES: {}", rtm.toTabletMetadata().getFilesMap()); - - assertEquals(2, files.size()); - assertTrue(files.contains(StoredTabletFile - .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/00000_00000.rf")))); - assertTrue(files.contains(StoredTabletFile - .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000c.rf")))); - } - - @Test - public void needsUpgradeTest() { - String root212ZkData2Files = - "{\"version\":1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/00000_00000.rf\":\"0,0\",\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000c.rf\":\"926,18\"},\"last\":{\"10001a84d7d0005\":\"localhost:9997\"},\"loc\":{\"10001a84d7d0005\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"2\",\"lock\":\"tservers/localhost:9997/zlock#d21adaa4-0f97-4004-9ff8-cce9dbb6687f#0000000000$10001a84d7d0005\",\"time\":\"L6\"},\"~tab\ [...] - assertTrue(RootTabletMetadata.needsUpgrade(root212ZkData2Files)); - - String converted = - "{\"version\":2,\"columnValues\":{\"file\":{\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A0000013.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"974,19\",\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F0000014.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"708,8\"},\"last\":{\"100024ec6110005\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"6\",\"lock\":\"tservers/localhost:9997/ [...] - - assertFalse(RootTabletMetadata.needsUpgrade(converted)); - } - - @Test - public void ignoresConvertedTest() { - String converted = - "{\"version\":2,\"columnValues\":{\"file\":{\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A0000013.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"974,19\",\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F0000014.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"708,8\"},\"last\":{\"100024ec6110005\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"6\",\"lock\":\"tservers/localhost:9997/ [...] - - assertFalse(RootTabletMetadata.needsUpgrade(converted)); - - RootTabletMetadata rtm = RootTabletMetadata.upgrade(converted); - var files = rtm.toTabletMetadata().getFiles(); - assertEquals(2, files.size()); - assertTrue(files.contains(StoredTabletFile - .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A0000013.rf")))); - assertTrue(files.contains(StoredTabletFile - .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F0000014.rf")))); - - } - - @Test - public void invalidVersionTest() { - String valid = - "{\"version\":1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A000000v.rf\":\"1368,61\"},\"last\":{\"100025091780006\":\"localhost:9997\"},\"loc\":{\"100025091780006\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"3\",\"lock\":\"tservers/localhost:9997/zlock#9db8961a-4ee9-400e-8e80-3353148baadd#0000000000$100025091780006\",\"time\":\"L53\"},\"~tab\":{\"~pr\":\"\\u0000\"}}}"; - // only version changed to invalid value - String invalid = - "{\"version\":-1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A000000v.rf\":\"1368,61\"},\"last\":{\"100025091780006\":\"localhost:9997\"},\"loc\":{\"100025091780006\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"3\",\"lock\":\"tservers/localhost:9997/zlock#9db8961a-4ee9-400e-8e80-3353148baadd#0000000000$100025091780006\",\"time\":\"L53\"},\"~tab\":{\"~pr\":\"\\u0000\"}}}"; - - assertTrue(RootTabletMetadata.needsUpgrade(valid)); - - RootTabletMetadata rtm = RootTabletMetadata.upgrade(valid); - var files = rtm.toTabletMetadata().getFiles(); - assertEquals(1, files.size()); - assertTrue(files.contains(StoredTabletFile - .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A000000v.rf")))); - - // valid json with files, so try conversion - assertTrue(RootTabletMetadata.needsUpgrade(invalid)); - - assertThrows(IllegalArgumentException.class, () -> RootTabletMetadata.upgrade(invalid)); - } -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java index 72127a4d4c..90a6eb4b62 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java @@ -22,8 +22,11 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET; import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING; +import java.util.ArrayList; import java.util.Arrays; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IsolatedScanner; @@ -36,6 +39,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; @@ -58,6 +62,10 @@ public class Upgrader11to12 implements Upgrader { private static final Logger log = LoggerFactory.getLogger(Upgrader11to12.class); + @VisibleForTesting + static final Set<Text> UPGRADE_FAMILIES = Set.of(DataFileColumnFamily.NAME, + ChoppedColumnFamily.NAME, ExternalCompactionColumnFamily.NAME); + @Override public void upgradeZookeeper(@NonNull ServerContext context) { log.debug("Upgrade ZooKeeper: upgrading to data version {}", METADATA_FILE_JSON_ENCODING); @@ -69,9 +77,21 @@ public class Upgrader11to12 implements Upgrader { byte[] rootData = zrw.getData(rootBase, stat); String json = new String(rootData, UTF_8); - if (RootTabletMetadata.needsUpgrade(json)) { + + var rtm = new RootTabletMetadata(json); + + TreeMap<Key,Value> entries = new TreeMap<>(); + rtm.toKeyValues().filter(e -> UPGRADE_FAMILIES.contains(e.getKey().getColumnFamily())) + .forEach(entry -> entries.put(entry.getKey(), entry.getValue())); + ArrayList<Mutation> mutations = new ArrayList<>(); + + processReferences(mutations::add, entries.entrySet(), "root_table_metadata"); + + Preconditions.checkState(mutations.size() <= 1); + + if (!mutations.isEmpty()) { log.info("Root metadata in ZooKeeper before upgrade: {}", json); - RootTabletMetadata rtm = RootTabletMetadata.upgrade(json); + rtm.update(mutations.get(0)); zrw.overwritePersistentData(rootBase, rtm.toJson().getBytes(UTF_8), stat.getVersion()); log.info("Root metadata in ZooKeeper after upgrade: {}", rtm.toJson()); } @@ -85,30 +105,31 @@ public class Upgrader11to12 implements Upgrader { } } + interface MutationWriter { + void addMutation(Mutation m) throws MutationsRejectedException; + } + @Override public void upgradeRoot(@NonNull ServerContext context) { log.debug("Upgrade root: upgrading to data version {}", METADATA_FILE_JSON_ENCODING); var rootName = Ample.DataLevel.METADATA.metaTable(); - // not using ample to avoid StoredTabletFile because old file ref is incompatible - try (BatchWriter batchWriter = context.createBatchWriter(rootName); Scanner scanner = - new IsolatedScanner(context.createScanner(rootName, Authorizations.EMPTY))) { - processReferences(batchWriter, scanner, rootName); - } catch (TableNotFoundException ex) { - throw new IllegalStateException("Failed to find table " + rootName, ex); - } catch (MutationsRejectedException mex) { - log.warn("Failed to update reference for table: " + rootName); - log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries()); - throw new IllegalStateException("Failed to process table: " + rootName, mex); - } + upgradeTabletsMetadata(context, rootName); } @Override public void upgradeMetadata(@NonNull ServerContext context) { log.debug("Upgrade metadata: upgrading to data version {}", METADATA_FILE_JSON_ENCODING); var metaName = Ample.DataLevel.USER.metaTable(); + upgradeTabletsMetadata(context, metaName); + } + + private void upgradeTabletsMetadata(@NonNull ServerContext context, String metaName) { + // not using ample to avoid StoredTabletFile because old file ref is incompatible try (BatchWriter batchWriter = context.createBatchWriter(metaName); Scanner scanner = new IsolatedScanner(context.createScanner(metaName, Authorizations.EMPTY))) { - processReferences(batchWriter, scanner, metaName); + UPGRADE_FAMILIES.forEach(scanner::fetchColumnFamily); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + processReferences(batchWriter::addMutation, scanner, metaName); } catch (TableNotFoundException ex) { throw new IllegalStateException("Failed to find table " + metaName, ex); } catch (MutationsRejectedException mex) { @@ -118,10 +139,8 @@ public class Upgrader11to12 implements Upgrader { } } - void processReferences(BatchWriter batchWriter, Scanner scanner, String tableName) { - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + void processReferences(MutationWriter batchWriter, Iterable<Map.Entry<Key,Value>> scanner, + String tableName) { try { Mutation update = null; for (Map.Entry<Key,Value> entry : scanner) { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java index ee45534dab..9fedfe5813 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.upgrade; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.manager.upgrade.Upgrader11to12.UPGRADE_FAMILIES; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.eq; @@ -33,15 +34,16 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.Key; @@ -52,6 +54,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.fs.Path; @@ -103,14 +106,6 @@ public class Upgrader11to12Test { @Test public void processReferencesTest() throws Exception { - BatchWriter batchWriter = mock(BatchWriter.class); - Capture<Mutation> capturedUpdate1 = newCapture(); - batchWriter.addMutation(capture(capturedUpdate1)); - expectLastCall().once(); - - Capture<Mutation> capturedUpdate2 = newCapture(); - batchWriter.addMutation(capture(capturedUpdate2)); - expectLastCall().once(); // create sample data "served" by the mocked scanner TreeMap<Key,Value> scanData = new TreeMap<>(); @@ -144,41 +139,28 @@ public class Upgrader11to12Test { Value value3 = new Value("1,2"); scanData.put(key3, value3); - Scanner scanner = mock(Scanner.class); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - expectLastCall(); - - expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); - replay(batchWriter, scanner); + ArrayList<Mutation> mutations = new ArrayList<>(); Upgrader11to12 upgrader = new Upgrader11to12(); - upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"); + upgrader.processReferences(mutations::add, scanData.entrySet(), "accumulo.metadata"); + + assertEquals(2, mutations.size()); - LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint()); - var u1 = capturedUpdate1.getValue(); + var u1 = mutations.get(0); + LOG.info("c:{}", u1.prettyPrint()); // 2 file add, 2 file delete. 1 chop delete, 1 ext comp delete assertEquals(6, u1.getUpdates().size()); - LOG.info("c:{}", capturedUpdate2.getValue().prettyPrint()); - var u2 = capturedUpdate2.getValue(); + var u2 = mutations.get(1); + LOG.info("c:{}", u2.prettyPrint()); // 1 add, 1 delete assertEquals(2, u2.getUpdates().size()); assertEquals(1, u2.getUpdates().stream().filter(ColumnUpdate::isDeleted).count()); - verify(batchWriter, scanner); - } @Test public void skipConvertedFileTest() throws Exception { - BatchWriter batchWriter = mock(BatchWriter.class); - Capture<Mutation> capturedUpdate1 = newCapture(); - batchWriter.addMutation(capture(capturedUpdate1)); - expectLastCall().once(); // create sample data "served" by the mocked scanner TreeMap<Key,Value> scanData = new TreeMap<>(); Text row1 = new Text("123"); @@ -197,27 +179,18 @@ public class Upgrader11to12Test { Value value2 = new Value("321,654"); scanData.put(key2, value2); - Scanner scanner = mock(Scanner.class); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - expectLastCall(); - - expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); - replay(batchWriter, scanner); + ArrayList<Mutation> mutations = new ArrayList<>(); Upgrader11to12 upgrader = new Upgrader11to12(); - upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"); + upgrader.processReferences(mutations::add, scanData.entrySet(), "accumulo.metadata"); + + assertEquals(1, mutations.size()); - LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint()); - var u1 = capturedUpdate1.getValue(); + var u1 = mutations.get(0); + LOG.info("c:{}", u1.prettyPrint()); // 1 add, 1 delete assertEquals(2, u1.getUpdates().size()); assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count()); - - verify(batchWriter, scanner); } @Test @@ -239,22 +212,13 @@ public class Upgrader11to12Test { Value value1 = new Value("123,456"); scanData.put(key1, value1); - Scanner scanner = mock(Scanner.class); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - expectLastCall(); - - expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); - replay(batchWriter, scanner); + replay(batchWriter); Upgrader11to12 upgrader = new Upgrader11to12(); - assertThrows(IllegalStateException.class, - () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata")); + assertThrows(IllegalStateException.class, () -> upgrader + .processReferences(batchWriter::addMutation, scanData.entrySet(), "accumulo.metadata")); - verify(batchWriter, scanner); + verify(batchWriter); } @Test @@ -282,22 +246,13 @@ public class Upgrader11to12Test { Value value2 = new Value("321,654"); scanData.put(key2, value2); - Scanner scanner = mock(Scanner.class); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - expectLastCall(); - - expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); - replay(batchWriter, scanner); + replay(batchWriter); Upgrader11to12 upgrader = new Upgrader11to12(); - assertThrows(IllegalArgumentException.class, - () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata")); + assertThrows(IllegalArgumentException.class, () -> upgrader + .processReferences(batchWriter::addMutation, scanData.entrySet(), "accumulo.metadata")); - verify(batchWriter, scanner); + verify(batchWriter); } @Test @@ -318,22 +273,13 @@ public class Upgrader11to12Test { Value value1 = new Value("123,456"); scanData.put(key1, value1); - Scanner scanner = mock(Scanner.class); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - expectLastCall(); - - expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); - replay(batchWriter, scanner); + replay(batchWriter); Upgrader11to12 upgrader = new Upgrader11to12(); - assertThrows(IllegalStateException.class, - () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata")); + assertThrows(IllegalStateException.class, () -> upgrader + .processReferences(batchWriter::addMutation, scanData.entrySet(), "accumulo.metadata")); - verify(batchWriter, scanner); + verify(batchWriter); } /** @@ -342,10 +288,6 @@ public class Upgrader11to12Test { */ @Test public void verifyEmptyMutation() throws Exception { - BatchWriter batchWriter = mock(BatchWriter.class); - Capture<Mutation> capturedUpdate1 = newCapture(); - batchWriter.addMutation(capture(capturedUpdate1)); - expectLastCall().once(); // create sample data "served" by the mocked scanner TreeMap<Key,Value> scanData = new TreeMap<>(); @@ -377,27 +319,17 @@ public class Upgrader11to12Test { Value value3 = new Value("333,444"); scanData.put(key3, value3); - Scanner scanner = mock(Scanner.class); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); - expectLastCall(); - scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); - expectLastCall(); - - expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once(); - replay(batchWriter, scanner); + ArrayList<Mutation> mutations = new ArrayList<>(); Upgrader11to12 upgrader = new Upgrader11to12(); - upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"); + upgrader.processReferences(mutations::add, scanData.entrySet(), "accumulo.metadata"); - LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint()); - var u1 = capturedUpdate1.getValue(); + assertEquals(1, mutations.size()); + var u1 = mutations.get(0); + LOG.info("c:{}", u1.prettyPrint()); // 1 add, 1 delete assertEquals(2, u1.getUpdates().size()); assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count()); - - verify(batchWriter, scanner); } @Test @@ -408,7 +340,7 @@ public class Upgrader11to12Test { "{\"version\":1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A0000030.rf\":\"856,15\",\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000r.rf\":\"308,2\"},\"last\":{\"100017f46240004\":\"localhost:9997\"},\"loc\":{\"100017f46240004\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"16\",\"lock\":\"tservers/localhost:9997/zlock#f6a582b9-9583-4553-b179-a7a3852c8332#0000000000$100017f46240004\",\"time\":\"L42\"},\"~tab [...] .getBytes(UTF_8); final String zKRootV2 = - "{\"version\":2,\"columnValues\":{\"file\":{\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A0000030.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"856,15\",\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000r.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"308,2\"},\"last\":{\"100017f46240004\":\"localhost:9997\"},\"loc\":{\"100017f46240004\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\" [...] + "{\"version\":1,\"columnValues\":{\"file\":{\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A0000030.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"856,15\",\"{\\\"path\\\":\\\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000r.rf\\\",\\\"startRow\\\":\\\"\\\",\\\"endRow\\\":\\\"\\\"}\":\"308,2\"},\"last\":{\"100017f46240004\":\"localhost:9997\"},\"loc\":{\"100017f46240004\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\" [...] InstanceId iid = InstanceId.of(UUID.randomUUID()); Upgrader11to12 upgrader = new Upgrader11to12(); @@ -458,4 +390,59 @@ public class Upgrader11to12Test { assertFalse(upgrader.fileNeedsConversion(s31)); assertFalse(upgrader.fileNeedsConversion(s31_untrimmed)); } + + @Test + public void convertRoot1File() { + String root21ZkData = + "{\"version\":1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A000000v.rf\":\"1368,61\"},\"last\":{\"100025091780006\":\"localhost:9997\"},\"loc\":{\"100025091780006\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"3\",\"lock\":\"tservers/localhost:9997/zlock#9db8961a-4ee9-400e-8e80-3353148baadd#0000000000$100025091780006\",\"time\":\"L53\"},\"~tab\":{\"~pr\":\"\\u0000\"}}}"; + + RootTabletMetadata rtm = new RootTabletMetadata(root21ZkData); + ArrayList<Mutation> mutations = new ArrayList<>(); + Upgrader11to12 upgrader = new Upgrader11to12(); + upgrader.processReferences(mutations::add, + rtm.toKeyValues().filter(e -> UPGRADE_FAMILIES.contains(e.getKey().getColumnFamily())) + .collect(Collectors.toList()), + "accumulo.metadata"); + assertEquals(1, mutations.size()); + var mutation = mutations.get(0); + rtm.update(mutation); + + LOG.debug("converted column values: {}", rtm.toTabletMetadata().getFiles()); + + var files = rtm.toTabletMetadata().getFiles(); + LOG.info("FILES: {}", rtm.toTabletMetadata().getFilesMap()); + + assertEquals(1, files.size()); + assertTrue(files.contains(StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/A000000v.rf")))); + } + + @Test + public void convertRoot2Files() { + String root212ZkData2Files = + "{\"version\":1,\"columnValues\":{\"file\":{\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/00000_00000.rf\":\"0,0\",\"hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000c.rf\":\"926,18\"},\"last\":{\"10001a84d7d0005\":\"localhost:9997\"},\"loc\":{\"10001a84d7d0005\":\"localhost:9997\"},\"srv\":{\"dir\":\"root_tablet\",\"flush\":\"2\",\"lock\":\"tservers/localhost:9997/zlock#d21adaa4-0f97-4004-9ff8-cce9dbb6687f#0000000000$10001a84d7d0005\",\"time\":\"L6\"},\"~tab\ [...] + + RootTabletMetadata rtm = new RootTabletMetadata(root212ZkData2Files); + ArrayList<Mutation> mutations = new ArrayList<>(); + Upgrader11to12 upgrader = new Upgrader11to12(); + upgrader.processReferences(mutations::add, + rtm.toKeyValues().filter(e -> UPGRADE_FAMILIES.contains(e.getKey().getColumnFamily())) + .collect(Collectors.toList()), + "accumulo.metadata"); + assertEquals(1, mutations.size()); + var mutation = mutations.get(0); + rtm.update(mutation); + + LOG.debug("converted column values: {}", rtm.toTabletMetadata()); + + var files = rtm.toTabletMetadata().getFiles(); + LOG.info("FILES: {}", rtm.toTabletMetadata().getFilesMap()); + + assertEquals(2, files.size()); + assertTrue(files.contains(StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/00000_00000.rf")))); + assertTrue(files.contains(StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/+r/root_tablet/F000000c.rf")))); + } + }