This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 4fef06761c2da98299414809f691ed85d937b4ed Merge: e6799c9cf7 1aa81d81b0 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Wed Dec 13 17:58:00 2023 +0000 Merge branch 'main' into elasticity .../core/client/MutationsRejectedException.java | 11 ++-- .../core/metadata/schema/TabletMutatorBase.java | 2 +- .../accumulo/core/tabletserver/log/LogEntry.java | 61 ++++++++++------------ .../core/metadata/schema/TabletMetadataTest.java | 11 ++-- .../core/tabletserver/log/LogEntryTest.java | 11 ++-- .../server/constraints/MetadataConstraints.java | 2 +- .../org/apache/accumulo/server/fs/VolumeUtil.java | 10 ++-- .../accumulo/server/util/ListVolumesUsed.java | 2 +- .../constraints/MetadataConstraintsTest.java | 56 +++++--------------- .../apache/accumulo/server/fs/VolumeUtilTest.java | 6 +-- .../server/manager/state/TabletManagementTest.java | 7 +-- .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 2 +- .../accumulo/manager/recovery/RecoveryManager.java | 2 +- .../org/apache/accumulo/tserver/TabletServer.java | 2 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- .../test/MissingWalHeaderCompletesRecoveryIT.java | 6 +-- .../functional/TabletManagementIteratorIT.java | 4 +- 17 files changed, 77 insertions(+), 120 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index ea13271f6f,4f7f9c230c..2792397ab7 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@@ -179,10 -173,10 +179,10 @@@ public abstract class TabletMutatorBase } @Override - public Ample.TabletMutator putWal(LogEntry logEntry) { + public T putWal(LogEntry logEntry) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); - mutation.put(LogColumnFamily.NAME, logEntry.getColumnQualifier(), logEntry.getValue()); + logEntry.addToMutation(mutation); - return this; + return getThis(); } @Override diff --cc server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 190f7e0e3b,617bf20e86..9fad119948 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@@ -416,17 -374,9 +416,17 @@@ public class MetadataConstraints implem case 7: return "Lock not held in zookeeper by writer"; case 8: - return "Bulk load transaction no longer running"; + return "Bulk load mutation contains either inconsistent files or multiple fateTX ids"; case 9: + return "Malformed operation id"; + case 10: + return "Malformed hosting goal"; + case 11: + return "Malformed file selection value"; + case 12: return "Invalid data file metadata format"; + case 13: + return "Invalid compacted column"; } return null; } diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 2bb653cb70,29f470ebe6..5c3ac18584 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@@ -130,59 -128,37 +130,59 @@@ public class VolumeUtil } } - /** - * This method does two things. First, it switches any volumes a tablet is using that are - * configured in instance.volumes.replacements. Second, if a tablet dir is no longer configured - * for use it chooses a new tablet directory. - */ - public static TabletFiles updateTabletVolumes(ServerContext context, ServiceLock zooLock, - KeyExtent extent, TabletFiles tabletFiles) { - List<Pair<Path,Path>> replacements = context.getVolumeReplacements(); + public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm) { if (replacements.isEmpty()) { - return tabletFiles; + return false; } - log.trace("Using volume replacements: {}", replacements); - List<LogEntry> logsToRemove = new ArrayList<>(); - List<LogEntry> logsToAdd = new ArrayList<>(); + MutableBoolean needsReplacement = new MutableBoolean(false); + + Consumer<LogEntry> consumer = le -> needsReplacement.setTrue(); + + volumeReplacementEvaluation(replacements, tm, consumer, consumer, + f -> needsReplacement.setTrue(), (f, dfv) -> needsReplacement.setTrue()); + + return needsReplacement.booleanValue(); + } - List<StoredTabletFile> filesToRemove = new ArrayList<>(); - SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new TreeMap<>(); + public static class VolumeReplacements { + public final TabletMetadata tabletMeta; + public final List<LogEntry> logsToRemove = new ArrayList<>(); + public final List<LogEntry> logsToAdd = new ArrayList<>(); + public final List<StoredTabletFile> filesToRemove = new ArrayList<>(); + public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new HashMap<>(); - TabletFiles ret = new TabletFiles(); + public VolumeReplacements(TabletMetadata tabletMeta) { + this.tabletMeta = tabletMeta; + } + } - for (LogEntry logEntry : tabletFiles.logEntries) { + public static VolumeReplacements + computeVolumeReplacements(final List<Pair<Path,Path>> replacements, final TabletMetadata tm) { + var vr = new VolumeReplacements(tm); + volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, vr.logsToAdd::add, + vr.filesToRemove::add, vr.filesToAdd::put); + return vr; + } + + public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm, final Consumer<LogEntry> logsToRemove, + final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> filesToRemove, + final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) { + if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && tm.getLogs().isEmpty())) { + return; + } + + log.trace("Using volume replacements: {}", replacements); + for (LogEntry logEntry : tm.getLogs()) { + log.trace("Evaluating walog {} for replacement.", logEntry); LogEntry switchedLogEntry = switchVolumes(logEntry, replacements); if (switchedLogEntry != null) { - logsToRemove.add(logEntry); - logsToAdd.add(switchedLogEntry); - ret.logEntries.add(switchedLogEntry); - log.debug("Replacing volume {} : {} -> {}", extent, logEntry.getLogReference(), + logsToRemove.accept(logEntry); + logsToAdd.accept(switchedLogEntry); - log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getFilePath(), - switchedLogEntry.getFilePath()); ++ log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getLogReference(), + switchedLogEntry.getLogReference()); - } else { - ret.logEntries.add(logEntry); } } diff --cc server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java index f1223ee71b,0000000000..fafa79f696 mode 100644,000000..100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java @@@ -1,191 -1,0 +1,188 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; - import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class TabletManagementTest { + + private SortedMap<Key,Value> toRowMap(Mutation mutation) { + SortedMap<Key,Value> rowMap = new TreeMap<>(); + mutation.getUpdates().forEach(cu -> { + Key k = new Key(mutation.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), + cu.getTimestamp()); + Value v = new Value(cu.getValue()); + rowMap.put(k, v); + }); + return rowMap; + } + + private SortedMap<Key,Value> createMetadataEntryKV(KeyExtent extent) { + + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + + DIRECTORY_COLUMN.put(mutation, new Value("t-0001757")); + FLUSH_COLUMN.put(mutation, new Value("6")); + TIME_COLUMN.put(mutation, new Value("M123456789")); + + StoredTabletFile bf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/bf1")).insert(); + StoredTabletFile bf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/bf2")).insert(); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf1.getMetadata()) + .put(FateTxId.formatTid(56)); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf2.getMetadata()) + .put(FateTxId.formatTid(59)); + + mutation.at().family(ClonedColumnFamily.NAME).qualifier("").put("OK"); + + DataFileValue dfv1 = new DataFileValue(555, 23); + StoredTabletFile tf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/df1.rf")).insert(); + StoredTabletFile tf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/df2.rf")).insert(); + mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf1.getMetadata()).put(dfv1.encode()); + DataFileValue dfv2 = new DataFileValue(234, 13); + mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf2.getMetadata()).put(dfv2.encode()); + + mutation.at().family(CurrentLocationColumnFamily.NAME).qualifier("s001").put("server1:8555"); + + mutation.at().family(LastLocationColumnFamily.NAME).qualifier("s000").put("server2:8555"); + + LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID()); - mutation.at().family(LogColumnFamily.NAME).qualifier(le1.getColumnQualifier()) - .put(le1.getValue()); ++ le1.addToMutation(mutation); + LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID()); - mutation.at().family(LogColumnFamily.NAME).qualifier(le2.getColumnQualifier()) - .put(le2.getValue()); ++ le2.addToMutation(mutation); + + StoredTabletFile sf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert(); + StoredTabletFile sf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")).insert(); + mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf1.getMetadata()).put(""); + mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put(""); + + return toRowMap(mutation); + + } + + @Test + public void testEncodeDecodeWithReasons() throws Exception { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + final Set<ManagementAction> actions = + Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, ManagementAction.NEEDS_SPLITTING); + + final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); + + TabletManagement.addActions(entries, actions); + Key key = entries.firstKey(); + Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), + new ArrayList<>(entries.values())); + + // Remove the REASONS column from the entries map for the comparison check + // below + entries.remove(new Key(key.getRow().toString(), "REASONS", "")); + + TabletManagement tmi = new TabletManagement(key, val, true); + assertEquals(entries, tmi.getTabletMetadata().getKeyValues()); + assertEquals(actions, tmi.getActions()); + } + + @Test + public void testEncodeDecodeWithErrors() throws Exception { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); + + TabletManagement.addError(entries, new UnsupportedOperationException("Not supported.")); + Key key = entries.firstKey(); + Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), + new ArrayList<>(entries.values())); + + // Remove the ERROR column from the entries map for the comparison check + // below + entries.remove(new Key(key.getRow().toString(), "ERROR", "")); + + TabletManagement tmi = new TabletManagement(key, val, true); + assertEquals(entries, tmi.getTabletMetadata().getKeyValues()); + assertEquals("Not supported.", tmi.getErrorMessage()); + } + + @Test + public void testBinary() throws Exception { + // test end row with non ascii data + Text endRow = new Text(new byte[] {'m', (byte) 0xff}); + KeyExtent extent = new KeyExtent(TableId.of("5"), endRow, new Text("da")); + + final Set<ManagementAction> actions = + Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, ManagementAction.NEEDS_SPLITTING); + + final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); + + TabletManagement.addActions(entries, actions); + Key key = entries.firstKey(); + Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), + new ArrayList<>(entries.values())); + + assertTrue(entries.keySet().stream().allMatch(k -> k.getRow().equals(extent.toMetaRow()))); + + // Remove the REASONS column from the entries map for the comparison check + // below + entries.remove(new Key(key.getRow(), new Text("REASONS"), new Text(""))); + + TabletManagement tmi = new TabletManagement(key, val, true); + assertEquals(entries, tmi.getTabletMetadata().getKeyValues()); + assertEquals(actions, tmi.getActions()); + + } +} diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 493b64afb4,807ba9f690..3c533c2691 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@@ -294,15 -286,16 +294,15 @@@ public class GarbageCollectWriteAheadLo } // Tablet is being recovered and has WAL references, remove all the WALs for the dead server // that made the WALs. - for (Collection<String> wals : state.walogs) { - for (String wal : wals) { - UUID walUUID = path2uuid(new Path(wal)); - TServerInstance dead = result.get(walUUID); - // There's a reference to a log file, so skip that server's logs - Set<UUID> idsToIgnore = candidates.remove(dead); - if (idsToIgnore != null) { - result.keySet().removeAll(idsToIgnore); - recoveryLogs.keySet().removeAll(idsToIgnore); - } + for (LogEntry wals : tabletMetadata.getLogs()) { - String wal = wals.getFilePath(); ++ String wal = wals.getLogReference(); + UUID walUUID = path2uuid(new Path(wal)); + TServerInstance dead = result.get(walUUID); + // There's a reference to a log file, so skip that server's logs + Set<UUID> idsToIgnore = candidates.remove(dead); + if (idsToIgnore != null) { + result.keySet().removeAll(idsToIgnore); + recoveryLogs.keySet().removeAll(idsToIgnore); } } } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 9cb41a90c1,86bc06930c..3c8d7922f2 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@@ -158,70 -156,72 +158,70 @@@ public class RecoveryManager } } - public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) - throws IOException { + public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) throws IOException { boolean recoveryNeeded = false; - for (Collection<String> logs : walogs) { - for (String walog : logs) { - - Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), FileType.WAL, - manager.getContext().getVolumeReplacements()); - if (switchedWalog != null) { - // replaces the volume used for sorting, but do not change entry in metadata table. When - // the tablet loads it will change the metadata table entry. If - // the tablet has the same replacement config, then it will find the sorted log. - log.info("Volume replaced {} -> {}", walog, switchedWalog); - walog = switchedWalog.toString(); - } + for (LogEntry entry : walogs) { - String walog = entry.getFilePath(); ++ String walog = entry.getLogReference(); + + Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), FileType.WAL, + manager.getContext().getVolumeReplacements()); + if (switchedWalog != null) { + // replaces the volume used for sorting, but do not change entry in metadata table. When + // the tablet loads it will change the metadata table entry. If + // the tablet has the same replacement config, then it will find the sorted log. + log.info("Volume replaced {} -> {}", walog, switchedWalog); + walog = switchedWalog.toString(); + } - String[] parts = walog.split("/"); - String sortId = parts[parts.length - 1]; - String filename = new Path(walog).toString(); - String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); + String[] parts = walog.split("/"); + String sortId = parts[parts.length - 1]; + String filename = new Path(walog).toString(); + String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); + + boolean sortQueued; + synchronized (this) { + sortQueued = sortsQueued.contains(sortId); + } - boolean sortQueued; + if (sortQueued + && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) + == null) { synchronized (this) { - sortQueued = sortsQueued.contains(sortId); + sortsQueued.remove(sortId); } + } - if (sortQueued - && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) - == null) { - synchronized (this) { - sortsQueued.remove(sortId); - } + if (exists(SortedLogState.getFinishedMarkerPath(dest))) { + synchronized (this) { + closeTasksQueued.remove(sortId); + recoveryDelay.remove(sortId); + sortsQueued.remove(sortId); } + continue; + } - if (exists(SortedLogState.getFinishedMarkerPath(dest))) { - synchronized (this) { - closeTasksQueued.remove(sortId); - recoveryDelay.remove(sortId); - sortsQueued.remove(sortId); + recoveryNeeded = true; + synchronized (this) { + if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { + AccumuloConfiguration aconf = manager.getConfiguration(); + LogCloser closer = Property.createInstanceFromPropertyName(aconf, + Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); + Long delay = recoveryDelay.get(sortId); + if (delay == null) { + delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); + } else { + delay = Math.min(2 * delay, 1000 * 60 * 5L); } - continue; - } - recoveryNeeded = true; - synchronized (this) { - if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { - AccumuloConfiguration aconf = manager.getConfiguration(); - LogCloser closer = Property.createInstanceFromPropertyName(aconf, - Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); - Long delay = recoveryDelay.get(sortId); - if (delay == null) { - delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); - } else { - delay = Math.min(2 * delay, 1000 * 60 * 5L); - } - - log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, - (delay / 1000), extent); - - ScheduledFuture<?> future = executor.schedule( - new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); - closeTasksQueued.add(sortId); - recoveryDelay.put(sortId, delay); - } + log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, + (delay / 1000), extent); + + ScheduledFuture<?> future = executor.schedule( + new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); + closeTasksQueued.add(sortId); + recoveryDelay.put(sortId, delay); } } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 7785aa58ca,0000000000..ae8b24ad9c mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@@ -1,486 -1,0 +1,484 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.ManagerState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; - import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.manager.state.TabletManagementIterator; +import org.apache.accumulo.server.manager.state.TabletManagementParameters; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +/** + * Test to ensure that the {@link TabletManagementIterator} properly skips over tablet information + * in the metadata table when there is no work to be done on the tablet (see ACCUMULO-3580) + */ +public class TabletManagementIteratorIT extends AccumuloClusterHarness { + private final static Logger log = LoggerFactory.getLogger(TabletManagementIteratorIT.class); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(3); + } + + @Test + public void test() throws AccumuloException, AccumuloSecurityException, TableExistsException, + TableNotFoundException, IOException { + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + String[] tables = getUniqueNames(8); + final String t1 = tables[0]; + final String t2 = tables[1]; + final String t3 = tables[2]; + final String t4 = tables[3]; + final String metaCopy1 = tables[4]; + final String metaCopy2 = tables[5]; + final String metaCopy3 = tables[6]; + final String metaCopy4 = tables[7]; + + // create some metadata + createTable(client, t1, true); + createTable(client, t2, false); + createTable(client, t3, true); + createTable(client, t4, true); + + // Scan table t3 which will cause it's tablets + // to be hosted. Then, remove the location. + Scanner s = client.createScanner(t3); + s.setRange(new Range()); + @SuppressWarnings("unused") + var unused = Iterables.size(s); // consume all the data + + // examine a clone of the metadata table, so we can manipulate it + copyTable(client, MetadataTable.NAME, metaCopy1); + + TabletManagementParameters tabletMgmtParams = createParameters(client); + int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); + while (tabletsInFlux > 0) { + log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1); + UtilWaitThread.sleep(500); + copyTable(client, MetadataTable.NAME, metaCopy1); + tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); + } + assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "No tables should need attention"); + + // The metadata table stabilized and metaCopy1 contains a copy suitable for testing. Before + // metaCopy1 is modified, copy it for subsequent test. + copyTable(client, metaCopy1, metaCopy2); + copyTable(client, metaCopy1, metaCopy3); + copyTable(client, metaCopy1, metaCopy4); + + // t1 is unassigned, setting to always will generate a change to host tablets + setTabletHostingGoal(client, metaCopy1, t1, TabletHostingGoal.ALWAYS.name()); + // t3 is hosted, setting to never will generate a change to unhost tablets + setTabletHostingGoal(client, metaCopy1, t3, TabletHostingGoal.NEVER.name()); + tabletMgmtParams = createParameters(client); + assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have four tablets with hosting goal changes"); + + // test the assigned case (no location) + removeLocation(client, metaCopy1, t3); + assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have two tablets without a loc"); + + // Test setting the operation id on one of the tablets in table t1. Table t1 has two tablets + // w/o a location. Only one should need attention because of the operation id. + setOperationId(client, metaCopy1, t1, new Text("some split"), TabletOperationType.SPLITTING); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have tablets needing attention because of operation id"); + + // test the cases where the assignment is to a dead tserver + reassignLocation(client, metaCopy2, t3); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, tabletMgmtParams), + "Only 1 of 2 tablets in table t1 should be returned"); + + // Remove location and set merge operation id on both tablets + // These tablets should not need attention as they have no WALs + setTabletHostingGoal(client, metaCopy4, t4, TabletHostingGoal.ALWAYS.name()); + removeLocation(client, metaCopy4, t4); + assertEquals(2, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Tablets have no location and a hosting goal of always, so they should need attention"); + + // Test MERGING and SPLITTING do not need attention with no location or wals + setOperationId(client, metaCopy4, t4, null, TabletOperationType.MERGING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for merge as they have no location"); + setOperationId(client, metaCopy4, t4, null, TabletOperationType.SPLITTING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for merge as they have no location"); + + // Create a log entry for one of the tablets, this tablet will now need attention + // for both MERGING and SPLITTING + setOperationId(client, metaCopy4, t4, null, TabletOperationType.MERGING); + createLogEntry(client, metaCopy4, t4); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have a tablet needing attention because of wals"); + // Switch op to SPLITTING which should also need attention like MERGING + setOperationId(client, metaCopy4, t4, null, TabletOperationType.SPLITTING); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have a tablet needing attention because of wals"); + + // Switch op to delete, no tablets should need attention even with WALs + setOperationId(client, metaCopy4, t4, null, TabletOperationType.DELETING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for delete"); + + // test the bad tablet location state case (inconsistent metadata) + tabletMgmtParams = createParameters(client); + addDuplicateLocation(client, metaCopy3, t3); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, tabletMgmtParams), + "Should have 1 tablet that needs a metadata repair"); + + // test the volume replacements case. Need to insert some files into + // the metadata for t4, then run the TabletManagementIterator with + // volume replacements + addFiles(client, metaCopy4, t4); + List<Pair<Path,Path>> replacements = new ArrayList<>(); + replacements.add(new Pair<Path,Path>(new Path("file:/vol1/accumulo/inst_id"), + new Path("file:/vol2/accumulo/inst_id"))); + tabletMgmtParams = createParameters(client, replacements); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have one tablet that needs a volume replacement"); + + // clean up + dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, metaCopy4); + } + } + + private void setTabletHostingGoal(AccumuloClient client, String table, String tableNameToModify, + String state) throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); + for (Entry<Key,Value> entry : scanner) { + Mutation m = new Mutation(entry.getKey().getRow()); + m.put(HostingColumnFamily.GOAL_COLUMN.getColumnFamily(), + HostingColumnFamily.GOAL_COLUMN.getColumnQualifier(), entry.getKey().getTimestamp() + 1, + new Value(state)); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + } + } + + private void addDuplicateLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); + m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new Value("fake:9005")); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + + private void addFiles(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); + m.put(DataFileColumnFamily.NAME, + new Text(StoredTabletFile + .serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")), + new Value(new DataFileValue(0, 0, 0).encode())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + try { + client.createScanner(table).iterator() + .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + e.getValue())); + } catch (TableNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private void reassignLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); + scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + Entry<Key,Value> entry = scanner.iterator().next(); + Mutation m = new Mutation(entry.getKey().getRow()); + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(), + entry.getKey().getTimestamp()); + m.put(entry.getKey().getColumnFamily(), new Text("1234567"), + entry.getKey().getTimestamp() + 1, new Value("fake:9005")); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + } + + // Sets an operation type on all tablets up to the end row + private void setOperationId(AccumuloClient client, String table, String tableNameToModify, + Text end, TabletOperationType opType) throws TableNotFoundException { + var opid = TabletOperationId.from(opType, 42L); + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + try (TabletsMetadata tabletsMetadata = + getServerContext().getAmple().readTablets().forTable(tableIdToModify) + .overlapping(null, end != null ? TabletsSection.encodeRow(tableIdToModify, end) : null) + .fetch(ColumnType.PREV_ROW).build()) { + for (TabletMetadata tabletMetadata : tabletsMetadata) { + Mutation m = new Mutation(tabletMetadata.getExtent().toMetaRow()); + MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, + new Value(opid.canonical())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + } + } + + private void removeLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + BatchDeleter deleter = client.createBatchDeleter(table, Authorizations.EMPTY, 1); + deleter + .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, null).toMetaRange())); + deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + deleter.delete(); + deleter.close(); + } + + private int findTabletsNeedingAttention(AccumuloClient client, String table, + TabletManagementParameters tabletMgmtParams) throws TableNotFoundException, IOException { + int results = 0; + List<KeyExtent> resultList = new ArrayList<>(); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + TabletManagementIterator.configureScanner(scanner, tabletMgmtParams); + scanner.updateScanIteratorOption("tabletChange", "debug", "1"); + for (Entry<Key,Value> e : scanner) { + if (e != null) { + TabletManagement mti = TabletManagementIterator.decode(e); + results++; + log.debug("Found tablets that changed state: {}", mti.getTabletMetadata().getExtent()); + log.debug("metadata: {}", mti.getTabletMetadata()); + resultList.add(mti.getTabletMetadata().getExtent()); + } + } + } + log.debug("Tablets in flux: {}", resultList); + return results; + } + + private void createTable(AccumuloClient client, String t, boolean online) + throws AccumuloSecurityException, AccumuloException, TableNotFoundException, + TableExistsException { + SortedSet<Text> partitionKeys = new TreeSet<>(); + partitionKeys.add(new Text("some split")); + NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitionKeys); + client.tableOperations().create(t, ntc); + client.tableOperations().online(t); + if (!online) { + client.tableOperations().offline(t, true); + } + } + + /** + * Create a copy of the source table by first gathering all the rows of the source in a list of + * mutations. Then create the copy of the table and apply the mutations to the copy. + */ + private void copyTable(AccumuloClient client, String source, String copy) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException { + try { + dropTables(client, copy); + } catch (TableNotFoundException ex) { + // ignored + } + + log.info("Gathering rows to copy {} ", source); + List<Mutation> mutations = new ArrayList<>(); + + try (Scanner scanner = client.createScanner(source, Authorizations.EMPTY)) { + RowIterator rows = new RowIterator(new IsolatedScanner(scanner)); + + while (rows.hasNext()) { + Iterator<Entry<Key,Value>> row = rows.next(); + Mutation m = null; + + while (row.hasNext()) { + Entry<Key,Value> entry = row.next(); + Key k = entry.getKey(); + if (m == null) { + m = new Mutation(k.getRow()); + } + + m.put(k.getColumnFamily(), k.getColumnQualifier(), k.getColumnVisibilityParsed(), + k.getTimestamp(), entry.getValue()); + } + + mutations.add(m); + } + } + + // metadata should be stable with only 6 rows (2 for each table) + log.debug("Gathered {} rows to create copy {}", mutations.size(), copy); + assertEquals(8, mutations.size(), "Metadata should have 8 rows (2 for each table)"); + client.tableOperations().create(copy); + + try (BatchWriter writer = client.createBatchWriter(copy)) { + for (Mutation m : mutations) { + writer.addMutation(m); + } + } + + log.info("Finished creating copy " + copy); + } + + private void dropTables(AccumuloClient client, String... tables) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + for (String t : tables) { + client.tableOperations().delete(t); + } + } + + // Creates a log entry on the "some split" extent, this could be modified easily to support + // other extents + private void createLogEntry(AccumuloClient client, String table, String tableNameToModify) + throws MutationsRejectedException, TableNotFoundException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), null); + Mutation m = new Mutation(extent.toMetaRow()); + String fileName = "file:/accumulo/wal/localhost+9997/" + UUID.randomUUID().toString(); + LogEntry logEntry = new LogEntry(fileName); - m.at().family(LogColumnFamily.NAME).qualifier(logEntry.getColumnQualifier()) - .put(logEntry.getValue()); ++ logEntry.addToMutation(m); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + + private static TabletManagementParameters createParameters(AccumuloClient client) { + return createParameters(client, List.of()); + } + + private static TabletManagementParameters createParameters(AccumuloClient client, + List<Pair<Path,Path>> replacements) { + var context = (ClientContext) client; + Set<TableId> onlineTables = Sets.filter(context.getTableIdToNameMap().keySet(), + tableId -> context.getTableState(tableId) == TableState.ONLINE); + + HashSet<TServerInstance> tservers = new HashSet<>(); + for (String tserver : context.instanceOperations().getTabletServers()) { + try { + var zPath = ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId()) + + Constants.ZTSERVERS + "/" + tserver); + long sessionId = ServiceLock.getSessionId(context.getZooCache(), zPath); + tservers.add(new TServerInstance(tserver, sessionId)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return new TabletManagementParameters(ManagerState.NORMAL, + Map.of( + Ample.DataLevel.ROOT, true, Ample.DataLevel.USER, true, Ample.DataLevel.METADATA, true), + onlineTables, + new LiveTServerSet.LiveTServersSnapshot(tservers, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)), + Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements); + } +}