This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f88461430580f39d4868e2f530fececd1e7b7e95 Merge: 5edcccc277 f9897862dd Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Tue Dec 19 07:18:50 2023 -0500 Merge branch 'main' into elasticity core/pom.xml | 4 +- .../accumulo/core/conf/ClientConfigGenerate.java | 8 +- .../accumulo/core/conf/ConfigurationDocGen.java | 4 +- .../apache/accumulo/core/logging/TabletLogger.java | 3 +- .../accumulo/core/metadata/schema/Ample.java | 2 - .../metadata/schema/TabletMetadataBuilder.java | 5 - .../core/metadata/schema/TabletMutatorBase.java | 10 +- .../accumulo/core/tabletserver/log/LogEntry.java | 137 +++++++++++++------- .../core/metadata/schema/TabletMetadataTest.java | 11 +- .../core/tabletserver/log/LogEntryTest.java | 139 ++++++++++----------- .../org/apache/accumulo/server/fs/VolumeUtil.java | 21 +--- .../manager/state/AbstractTabletStateStore.java | 2 +- .../accumulo/server/util/ListVolumesUsed.java | 2 +- .../apache/accumulo/server/fs/VolumeUtilTest.java | 12 +- .../server/manager/state/TabletManagementTest.java | 4 +- .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 2 +- .../accumulo/manager/recovery/RecoveryManager.java | 2 +- .../accumulo/tserver/TabletClientHandler.java | 8 +- .../org/apache/accumulo/tserver/TabletServer.java | 17 +-- .../org/apache/accumulo/tserver/log/DfsLogger.java | 91 +++++--------- .../accumulo/tserver/log/TabletServerLogger.java | 33 +++-- .../org/apache/accumulo/tserver/tablet/Tablet.java | 39 +++--- .../accumulo/tserver/WalRemovalOrderTest.java | 43 ++++--- .../shell/commands/ListTabletsCommandTest.java | 4 +- .../test/MissingWalHeaderCompletesRecoveryIT.java | 4 +- .../test/functional/AmpleConditionalWriterIT.java | 12 +- .../functional/TabletManagementIteratorIT.java | 2 +- 27 files changed, 294 insertions(+), 327 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java index da10ce2676,44832db7f1..0bdf7e2055 --- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java @@@ -23,7 -23,7 +23,8 @@@ import static java.util.stream.Collecto import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.SortedSet; + import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.admin.CompactionConfig; @@@ -161,9 -160,9 +162,9 @@@ public class TabletLogger fileLog.debug("Imported {} {} ", extent, file); } - public static void recovering(KeyExtent extent, List<LogEntry> logEntries) { + public static void recovering(KeyExtent extent, Collection<LogEntry> logEntries) { if (recoveryLog.isDebugEnabled()) { - List<String> logIds = logEntries.stream().map(LogEntry::getUniqueID).collect(toList()); + List<UUID> logIds = logEntries.stream().map(LogEntry::getUniqueID).collect(toList()); recoveryLog.debug("For {} recovering data from walogs: {}", extent, logIds); } } diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index d8bb5d62e9,d551b342af..55abba98b1 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@@ -344,81 -265,50 +344,79 @@@ public interface Ample /** * Interface for changing a tablets persistent data. */ - interface TabletMutator { - TabletMutator putPrevEndRow(Text per); + interface TabletUpdates<T> { + T putPrevEndRow(Text per); + + T putFile(ReferencedTabletFile path, DataFileValue dfv); + + T putFile(StoredTabletFile path, DataFileValue dfv); + + T deleteFile(StoredTabletFile path); + + T putScan(StoredTabletFile path); + + T deleteScan(StoredTabletFile path); - TabletMutator putFile(ReferencedTabletFile path, DataFileValue dfv); + T putFlushId(long flushId); - TabletMutator putFile(StoredTabletFile path, DataFileValue dfv); + T putLocation(Location location); - TabletMutator deleteFile(StoredTabletFile path); + T deleteLocation(Location location); - TabletMutator putScan(StoredTabletFile path); + T putZooLock(String zookeeperRoot, ServiceLock zooLock); - TabletMutator deleteScan(StoredTabletFile path); + T putDirName(String dirName); - TabletMutator putCompactionId(long compactionId); + T putWal(LogEntry logEntry); - T deleteWal(String wal); - - TabletMutator putFlushId(long flushId); + T deleteWal(LogEntry logEntry); - TabletMutator putLocation(Location location); + T putTime(MetadataTime time); - TabletMutator deleteLocation(Location location); + T putBulkFile(ReferencedTabletFile bulkref, long tid); - TabletMutator putZooLock(ServiceLock zooLock); + T deleteBulkFile(StoredTabletFile bulkref); - TabletMutator putDirName(String dirName); + T putSuspension(TServerInstance tserver, long suspensionTime); - TabletMutator putWal(LogEntry logEntry); + T deleteSuspension(); - TabletMutator deleteWal(LogEntry wal); + T putExternalCompaction(ExternalCompactionId ecid, ExternalCompactionMetadata ecMeta); - TabletMutator putTime(MetadataTime time); + T deleteExternalCompaction(ExternalCompactionId ecid); - TabletMutator putBulkFile(ReferencedTabletFile bulkref, long tid); + T putCompacted(long fateTxid); - TabletMutator deleteBulkFile(StoredTabletFile bulkref); + T deleteCompacted(long fateTxid); - TabletMutator putSuspension(TServerInstance tserver, long suspensionTime); + T putHostingGoal(TabletHostingGoal goal); - TabletMutator deleteSuspension(); + T setHostingRequested(); - TabletMutator putExternalCompaction(ExternalCompactionId ecid, - ExternalCompactionMetadata ecMeta); + T deleteHostingRequested(); - TabletMutator deleteExternalCompaction(ExternalCompactionId ecid); + T putOperation(TabletOperationId opId); + + T deleteOperation(); + + T putSelectedFiles(SelectedFiles selectedFiles); + + T deleteSelectedFiles(); + /** + * Deletes all the columns in the keys. + * + * @throws IllegalArgumentException if rows in keys do not match tablet row or column visibility + * is not empty + */ + T deleteAll(Set<Key> keys); + + T setMerged(); + + T deleteMerged(); + } + + interface TabletMutator extends TabletUpdates<TabletMutator> { /** * This method persist (or queues for persisting) previous put and deletes against this object. * Unless this method is called, previous calls will never be persisted. The purpose of this diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index f2bd30f15e,0000000000..7d102ea7ef mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@@ -1,303 -1,0 +1,298 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_GOAL; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.hadoop.io.Text; + +public class TabletMetadataBuilder implements Ample.TabletUpdates<TabletMetadataBuilder> { + + public static class InternalBuilder extends TabletMutatorBase<InternalBuilder> { + protected InternalBuilder(KeyExtent extent) { + super(extent); + } + + @Override + public Mutation getMutation() { + return super.getMutation(); + } + } + + private final InternalBuilder internalBuilder; + EnumSet<TabletMetadata.ColumnType> fetched; + + protected TabletMetadataBuilder(KeyExtent extent) { + internalBuilder = new InternalBuilder(extent); + fetched = EnumSet.noneOf(TabletMetadata.ColumnType.class); + putPrevEndRow(extent.prevEndRow()); + } + + @Override + public TabletMetadataBuilder putPrevEndRow(Text per) { + fetched.add(PREV_ROW); + internalBuilder.putPrevEndRow(per); + return this; + } + + @Override + public TabletMetadataBuilder putFile(ReferencedTabletFile path, DataFileValue dfv) { + fetched.add(FILES); + internalBuilder.putFile(path, dfv); + return this; + } + + @Override + public TabletMetadataBuilder putFile(StoredTabletFile path, DataFileValue dfv) { + fetched.add(FILES); + internalBuilder.putFile(path, dfv); + return this; + } + + @Override + public TabletMetadataBuilder deleteFile(StoredTabletFile path) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putScan(StoredTabletFile path) { + fetched.add(SCANS); + internalBuilder.putScan(path); + return this; + } + + @Override + public TabletMetadataBuilder deleteScan(StoredTabletFile path) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putFlushId(long flushId) { + fetched.add(FLUSH_ID); + internalBuilder.putFlushId(flushId); + return this; + } + + @Override + public TabletMetadataBuilder putLocation(TabletMetadata.Location location) { + fetched.add(LOCATION); + internalBuilder.putLocation(location); + return this; + } + + @Override + public TabletMetadataBuilder deleteLocation(TabletMetadata.Location location) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putZooLock(String zookeeperRoot, ServiceLock zooLock) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putDirName(String dirName) { + fetched.add(DIR); + internalBuilder.putDirName(dirName); + return this; + } + + @Override + public TabletMetadataBuilder putWal(LogEntry logEntry) { + fetched.add(LOGS); + internalBuilder.putWal(logEntry); + return this; + } + - @Override - public TabletMetadataBuilder deleteWal(String wal) { - throw new UnsupportedOperationException(); - } - + @Override + public TabletMetadataBuilder deleteWal(LogEntry logEntry) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putTime(MetadataTime time) { + fetched.add(TIME); + internalBuilder.putTime(time); + return this; + } + + @Override + public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, long tid) { + fetched.add(LOADED); + internalBuilder.putBulkFile(bulkref, tid); + return this; + } + + @Override + public TabletMetadataBuilder deleteBulkFile(StoredTabletFile bulkref) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putSuspension(TServerInstance tserver, long suspensionTime) { + fetched.add(SUSPEND); + internalBuilder.putSuspension(tserver, suspensionTime); + return this; + } + + @Override + public TabletMetadataBuilder deleteSuspension() { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putExternalCompaction(ExternalCompactionId ecid, + ExternalCompactionMetadata ecMeta) { + fetched.add(ECOMP); + internalBuilder.putExternalCompaction(ecid, ecMeta); + return this; + } + + @Override + public TabletMetadataBuilder deleteExternalCompaction(ExternalCompactionId ecid) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putCompacted(long fateTxId) { + fetched.add(COMPACTED); + internalBuilder.putCompacted(fateTxId); + return this; + } + + @Override + public TabletMetadataBuilder deleteCompacted(long fateTxId) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putHostingGoal(TabletHostingGoal goal) { + fetched.add(HOSTING_GOAL); + internalBuilder.putHostingGoal(goal); + return this; + } + + @Override + public TabletMetadataBuilder setHostingRequested() { + fetched.add(HOSTING_REQUESTED); + internalBuilder.setHostingRequested(); + return this; + } + + @Override + public TabletMetadataBuilder deleteHostingRequested() { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putOperation(TabletOperationId opId) { + fetched.add(OPID); + internalBuilder.putOperation(opId); + return this; + } + + @Override + public TabletMetadataBuilder deleteOperation() { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder putSelectedFiles(SelectedFiles selectedFiles) { + fetched.add(SELECTED); + internalBuilder.putSelectedFiles(selectedFiles); + return this; + } + + @Override + public TabletMetadataBuilder deleteSelectedFiles() { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder deleteAll(Set<Key> keys) { + throw new UnsupportedOperationException(); + } + + @Override + public TabletMetadataBuilder setMerged() { + fetched.add(MERGED); + internalBuilder.setMerged(); + return this; + } + + @Override + public TabletMetadataBuilder deleteMerged() { + throw new UnsupportedOperationException(); + } + + /** + * @param extraFetched Anything that was put on the builder will automatically be added to the + * fetched set. However, for the case where something was not put and it needs to be + * fetched it can be passed here. For example to simulate a tablet w/o a location it, no + * location will be put and LOCATION would be passed in via this argument. + */ + public TabletMetadata build(TabletMetadata.ColumnType... extraFetched) { + var mutation = internalBuilder.getMutation(); + + SortedMap<Key,Value> rowMap = new TreeMap<>(); + mutation.getUpdates().forEach(cu -> { + Key k = new Key(mutation.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), + cu.getTimestamp()); + Value v = new Value(cu.getValue()); + rowMap.put(k, v); + }); + + fetched.addAll(Arrays.asList(extraFetched)); + + return TabletMetadata.convertRow(rowMap.entrySet().iterator(), fetched, true, false); + } + +} diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index 2792397ab7,036c9ac342..a0b83c00a8 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@@ -40,10 -37,7 +40,9 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; - import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; @@@ -186,17 -179,10 +185,10 @@@ public abstract class TabletMutatorBase } @Override - public Ample.TabletMutator deleteWal(LogEntry logEntry) { + public T deleteWal(LogEntry logEntry) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); - mutation.putDelete(LogColumnFamily.NAME, logEntry.getColumnQualifier()); - return getThis(); - } - - @Override - public T deleteWal(String wal) { - Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); - mutation.putDelete(LogColumnFamily.STR_NAME, wal); + logEntry.deleteFromMutation(mutation); - return this; + return getThis(); } @Override diff --cc core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java index 73a9ecf979,dea6460eaf..3343d750fc --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java @@@ -106,24 -147,33 +147,32 @@@ public final class LogEntry @Override public int hashCode() { - return Objects.hash(filePath); + return Objects.hash(path); } - public static LogEntry fromMetaWalEntry(Entry<Key,Value> entry) { - String qualifier = entry.getKey().getColumnQualifier().toString(); - String[] parts = qualifier.split("/", 2); - Preconditions.checkArgument(parts.length == 2 && parts[0].equals("-"), - "Malformed write-ahead log %s", qualifier); - return new LogEntry(parts[1]); + /** + * Get the Text that should be used as the column qualifier to store this as a metadata entry. + */ - @VisibleForTesting - Text getColumnQualifier() { ++ public Text getColumnQualifier() { + return new Text("-/" + getPath()); } - public String getUniqueID() { - String[] parts = filePath.split("/"); - return parts[parts.length - 1]; + /** + * Put a delete marker in the provided mutation for this LogEntry. + * + * @param mutation the mutation to update + */ + public void deleteFromMutation(Mutation mutation) { + mutation.putDelete(LogColumnFamily.NAME, getColumnQualifier()); } - public Text getColumnQualifier() { - return new Text("-/" + filePath); + /** + * Put this LogEntry into the provided mutation. + * + * @param mutation the mutation to update + */ + public void addToMutation(Mutation mutation) { + mutation.put(LogColumnFamily.NAME, getColumnQualifier(), new Value()); } } diff --cc core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 2cbc25f57c,5287d689ba..a5f47e6a20 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@@ -318,97 -293,4 +317,97 @@@ public class TabletMetadataTest }); return rowMap; } + + @Test + public void testBuilder() { + TServerInstance ser1 = new TServerInstance(HostAndPort.fromParts("server1", 8555), "s001"); + + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + StoredTabletFile sf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert(); + DataFileValue dfv1 = new DataFileValue(89, 67); + + StoredTabletFile sf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")).insert(); + DataFileValue dfv2 = new DataFileValue(890, 670); + + ReferencedTabletFile rf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/imp1.rf")); + ReferencedTabletFile rf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/imp2.rf")); + + StoredTabletFile sf3 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf3.rf")).insert(); + StoredTabletFile sf4 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf4.rf")).insert(); + + TabletMetadata tm = TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.NEVER) + .putLocation(Location.future(ser1)).putFile(sf1, dfv1).putFile(sf2, dfv2) + .putBulkFile(rf1, 25).putBulkFile(rf2, 35).putFlushId(27).putDirName("dir1").putScan(sf3) + .putScan(sf4).putCompacted(17).putCompacted(23).build(ECOMP, HOSTING_REQUESTED, MERGED); + + assertEquals(extent, tm.getExtent()); + assertEquals(TabletHostingGoal.NEVER, tm.getHostingGoal()); + assertEquals(Location.future(ser1), tm.getLocation()); + assertEquals(27L, tm.getFlushId().orElse(-1)); + assertEquals(Map.of(sf1, dfv1, sf2, dfv2), tm.getFilesMap()); + assertEquals(Map.of(rf1.insert(), 25L, rf2.insert(), 35L), tm.getLoaded()); + assertEquals("dir1", tm.getDirName()); + assertEquals(Set.of(sf3, sf4), Set.copyOf(tm.getScans())); + assertEquals(Set.of(), tm.getExternalCompactions().keySet()); + assertEquals(Set.of(17L, 23L), tm.getCompacted()); + assertFalse(tm.getHostingRequested()); + assertFalse(tm.hasMerged()); + assertThrows(IllegalStateException.class, tm::getOperationId); + assertThrows(IllegalStateException.class, tm::getSuspend); + assertThrows(IllegalStateException.class, tm::getTime); + + TabletOperationId opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, 55); + TabletMetadata tm2 = TabletMetadata.builder(extent).putOperation(opid1).build(LOCATION); + + assertEquals(extent, tm2.getExtent()); + assertEquals(opid1, tm2.getOperationId()); + assertNull(tm2.getLocation()); + assertThrows(IllegalStateException.class, tm2::getFiles); + assertThrows(IllegalStateException.class, tm2::getHostingGoal); + assertThrows(IllegalStateException.class, tm2::getFlushId); + assertThrows(IllegalStateException.class, tm2::getFiles); + assertThrows(IllegalStateException.class, tm2::getLogs); + assertThrows(IllegalStateException.class, tm2::getLoaded); + assertThrows(IllegalStateException.class, tm2::getDirName); + assertThrows(IllegalStateException.class, tm2::getScans); + assertThrows(IllegalStateException.class, tm2::getExternalCompactions); + assertThrows(IllegalStateException.class, tm2::getHostingRequested); + assertThrows(IllegalStateException.class, tm2::getSelectedFiles); + assertThrows(IllegalStateException.class, tm2::getCompacted); + + var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); + ExternalCompactionMetadata ecm = new ExternalCompactionMetadata(Set.of(sf1, sf2), rf1, "cid1", + CompactionKind.USER, (short) 3, CompactionExecutorIdImpl.externalId("Q1"), true, 99L); + - LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID()); - LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID()); ++ LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); ++ LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); + + SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, 159L); + + TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm) + .putSuspension(ser1, 45L).putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1) + .putWal(le2).setHostingRequested().putSelectedFiles(selFiles).setMerged().build(); + + assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet()); + assertEquals(Set.of(sf1, sf2), tm3.getExternalCompactions().get(ecid1).getJobFiles()); + assertEquals(ser1.getHostAndPort(), tm3.getSuspend().server); + assertEquals(45L, tm3.getSuspend().suspensionTime); + assertEquals(new MetadataTime(479, TimeType.LOGICAL), tm3.getTime()); + assertTrue(tm3.getHostingRequested()); + assertEquals(Stream.of(le1, le2).map(LogEntry::toString).collect(toSet()), + tm3.getLogs().stream().map(LogEntry::toString).collect(toSet())); + assertEquals(Set.of(sf1, sf4), tm3.getSelectedFiles().getFiles()); + assertEquals(159L, tm3.getSelectedFiles().getFateTxId()); + assertFalse(tm3.getSelectedFiles().initiallySelectedAll()); + assertEquals(selFiles.getMetadataValue(), tm3.getSelectedFiles().getMetadataValue()); + assertTrue(tm3.hasMerged()); + } + } diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 140f23cae5,19841ac7a3..b219a73ffa --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@@ -130,59 -117,37 +119,59 @@@ public class VolumeUtil } } - /** - * This method does two things. First, it switches any volumes a tablet is using that are - * configured in instance.volumes.replacements. Second, if a tablet dir is no longer configured - * for use it chooses a new tablet directory. - */ - public static TabletFiles updateTabletVolumes(ServerContext context, ServiceLock zooLock, - KeyExtent extent, TabletFiles tabletFiles) { - List<Pair<Path,Path>> replacements = context.getVolumeReplacements(); + public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm) { if (replacements.isEmpty()) { - return tabletFiles; + return false; } - log.trace("Using volume replacements: {}", replacements); - List<LogEntry> logsToRemove = new ArrayList<>(); - List<LogEntry> logsToAdd = new ArrayList<>(); + MutableBoolean needsReplacement = new MutableBoolean(false); + + Consumer<LogEntry> consumer = le -> needsReplacement.setTrue(); + + volumeReplacementEvaluation(replacements, tm, consumer, consumer, + f -> needsReplacement.setTrue(), (f, dfv) -> needsReplacement.setTrue()); + + return needsReplacement.booleanValue(); + } - List<StoredTabletFile> filesToRemove = new ArrayList<>(); - SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new TreeMap<>(); + public static class VolumeReplacements { + public final TabletMetadata tabletMeta; + public final List<LogEntry> logsToRemove = new ArrayList<>(); + public final List<LogEntry> logsToAdd = new ArrayList<>(); + public final List<StoredTabletFile> filesToRemove = new ArrayList<>(); + public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new HashMap<>(); - TabletFiles ret = new TabletFiles(); + public VolumeReplacements(TabletMetadata tabletMeta) { + this.tabletMeta = tabletMeta; + } + } - for (LogEntry logEntry : tabletFiles.logEntries) { + public static VolumeReplacements + computeVolumeReplacements(final List<Pair<Path,Path>> replacements, final TabletMetadata tm) { + var vr = new VolumeReplacements(tm); + volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, vr.logsToAdd::add, + vr.filesToRemove::add, vr.filesToAdd::put); + return vr; + } + + public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm, final Consumer<LogEntry> logsToRemove, + final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> filesToRemove, + final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) { + if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && tm.getLogs().isEmpty())) { + return; + } + + log.trace("Using volume replacements: {}", replacements); + for (LogEntry logEntry : tm.getLogs()) { + log.trace("Evaluating walog {} for replacement.", logEntry); LogEntry switchedLogEntry = switchVolumes(logEntry, replacements); if (switchedLogEntry != null) { - logsToRemove.add(logEntry); - logsToAdd.add(switchedLogEntry); - ret.logEntries.add(switchedLogEntry); - log.debug("Replacing volume {} : {} -> {}", extent, logEntry.getPath(), + logsToRemove.accept(logEntry); + logsToAdd.accept(switchedLogEntry); - log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getFilePath(), - switchedLogEntry.getFilePath()); ++ log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getPath(), + switchedLogEntry.getPath()); - } else { - ret.logEntries.add(logEntry); } } diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java index 526e636f32,0000000000..7d2a09f686 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java @@@ -1,181 -1,0 +1,181 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; + +public abstract class AbstractTabletStateStore implements TabletStateStore { + + private final Ample ample; + + protected AbstractTabletStateStore(ClientContext context) { + this.ample = context.getAmple(); + } + + @Override + public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException { + try (var tabletsMutator = ample.conditionallyMutateTablets()) { + for (Assignment assignment : assignments) { + var conditionalMutator = tabletsMutator.mutateTablet(assignment.tablet) + .requireLocation(TabletMetadata.Location.future(assignment.server)) + .putLocation(TabletMetadata.Location.current(assignment.server)) + .deleteLocation(TabletMetadata.Location.future(assignment.server)).deleteSuspension(); + + updateLastLocation(conditionalMutator, assignment.server, assignment.lastLocation); + + conditionalMutator.submit(tabletMetadata -> { + Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet)); + return tabletMetadata.getLocation() != null && tabletMetadata.getLocation() + .equals(TabletMetadata.Location.current(assignment.server)); + }); + } + + if (tabletsMutator.process().values().stream() + .anyMatch(result -> result.getStatus() != Status.ACCEPTED)) { + throw new DistributedStoreException( + "failed to set tablet location, conditional mutation failed"); + } + } catch (RuntimeException ex) { + throw new DistributedStoreException(ex); + } + } + + @Override + public void setFutureLocations(Collection<Assignment> assignments) + throws DistributedStoreException { + try (var tabletsMutator = ample.conditionallyMutateTablets()) { + for (Assignment assignment : assignments) { + tabletsMutator.mutateTablet(assignment.tablet).requireAbsentOperation() + .requireAbsentLocation().deleteSuspension() + .putLocation(TabletMetadata.Location.future(assignment.server)) + .submit(tabletMetadata -> { + Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet)); + return tabletMetadata.getLocation() != null && tabletMetadata.getLocation() + .equals(TabletMetadata.Location.future(assignment.server)); + }); + } + + var results = tabletsMutator.process(); + + if (results.values().stream().anyMatch(result -> result.getStatus() != Status.ACCEPTED)) { + throw new DistributedStoreException( + "failed to set tablet location, conditional mutation failed. "); + } + + } catch (RuntimeException ex) { + throw new DistributedStoreException(ex); + } + } + + @Override + public void unassign(Collection<TabletMetadata> tablets, + Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException { + unassign(tablets, logsForDeadServers, -1); + } + + @Override + public void suspend(Collection<TabletMetadata> tablets, + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) + throws DistributedStoreException { + unassign(tablets, logsForDeadServers, suspensionTimestamp); + } + + protected abstract void processSuspension(Ample.ConditionalTabletMutator tabletMutator, + TabletMetadata tm, long suspensionTimestamp); + + private void unassign(Collection<TabletMetadata> tablets, + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) + throws DistributedStoreException { + try (var tabletsMutator = ample.conditionallyMutateTablets()) { + for (TabletMetadata tm : tablets) { + if (tm.getLocation() == null) { + continue; + } + + var tabletMutator = + tabletsMutator.mutateTablet(tm.getExtent()).requireLocation(tm.getLocation()); + + if (tm.hasCurrent()) { + + updateLastLocation(tabletMutator, tm.getLocation().getServerInstance(), tm.getLast()); + tabletMutator.deleteLocation(tm.getLocation()); + if (logsForDeadServers != null) { + List<Path> logs = logsForDeadServers.get(tm.getLocation().getServerInstance()); + if (logs != null) { + for (Path log : logs) { - LogEntry entry = new LogEntry(log.toString()); ++ LogEntry entry = LogEntry.fromPath(log.toString()); + tabletMutator.putWal(entry); + } + } + } + } + + if (tm.getLocation() != null && tm.getLocation().getType() != null + && tm.getLocation().getType().equals(LocationType.FUTURE)) { + tabletMutator.deleteLocation(tm.getLocation()); + } + + processSuspension(tabletMutator, tm, suspensionTimestamp); + + tabletMutator.submit(tabletMetadata -> tabletMetadata.getLocation() == null); + } + + Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process(); + + if (results.values().stream() + .anyMatch(conditionalResult -> conditionalResult.getStatus() != Status.ACCEPTED)) { + throw new DistributedStoreException("Some unassignments did not satisfy conditions."); + } + + } catch (RuntimeException ex) { + throw new DistributedStoreException(ex); + } + } + + protected static void updateLastLocation(Ample.TabletUpdates<?> tabletMutator, + TServerInstance location, Location lastLocation) { + Preconditions.checkArgument( + lastLocation == null || lastLocation.getType() == TabletMetadata.LocationType.LAST); + Location newLocation = Location.last(location); + if (lastLocation != null) { + if (!lastLocation.equals(newLocation)) { + tabletMutator.deleteLocation(lastLocation); + tabletMutator.putLocation(newLocation); + } + } else { + tabletMutator.putLocation(newLocation); + } + } + +} diff --cc server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java index fafa79f696,0000000000..caa9c926f8 mode 100644,000000..100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java @@@ -1,188 -1,0 +1,188 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class TabletManagementTest { + + private SortedMap<Key,Value> toRowMap(Mutation mutation) { + SortedMap<Key,Value> rowMap = new TreeMap<>(); + mutation.getUpdates().forEach(cu -> { + Key k = new Key(mutation.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), + cu.getTimestamp()); + Value v = new Value(cu.getValue()); + rowMap.put(k, v); + }); + return rowMap; + } + + private SortedMap<Key,Value> createMetadataEntryKV(KeyExtent extent) { + + Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + + DIRECTORY_COLUMN.put(mutation, new Value("t-0001757")); + FLUSH_COLUMN.put(mutation, new Value("6")); + TIME_COLUMN.put(mutation, new Value("M123456789")); + + StoredTabletFile bf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/bf1")).insert(); + StoredTabletFile bf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/bf2")).insert(); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf1.getMetadata()) + .put(FateTxId.formatTid(56)); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf2.getMetadata()) + .put(FateTxId.formatTid(59)); + + mutation.at().family(ClonedColumnFamily.NAME).qualifier("").put("OK"); + + DataFileValue dfv1 = new DataFileValue(555, 23); + StoredTabletFile tf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/df1.rf")).insert(); + StoredTabletFile tf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/df2.rf")).insert(); + mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf1.getMetadata()).put(dfv1.encode()); + DataFileValue dfv2 = new DataFileValue(234, 13); + mutation.at().family(DataFileColumnFamily.NAME).qualifier(tf2.getMetadata()).put(dfv2.encode()); + + mutation.at().family(CurrentLocationColumnFamily.NAME).qualifier("s001").put("server1:8555"); + + mutation.at().family(LastLocationColumnFamily.NAME).qualifier("s000").put("server2:8555"); + - LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID()); ++ LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); + le1.addToMutation(mutation); - LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID()); ++ LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); + le2.addToMutation(mutation); + + StoredTabletFile sf1 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert(); + StoredTabletFile sf2 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf2.rf")).insert(); + mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf1.getMetadata()).put(""); + mutation.at().family(ScanFileColumnFamily.NAME).qualifier(sf2.getMetadata()).put(""); + + return toRowMap(mutation); + + } + + @Test + public void testEncodeDecodeWithReasons() throws Exception { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + final Set<ManagementAction> actions = + Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, ManagementAction.NEEDS_SPLITTING); + + final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); + + TabletManagement.addActions(entries, actions); + Key key = entries.firstKey(); + Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), + new ArrayList<>(entries.values())); + + // Remove the REASONS column from the entries map for the comparison check + // below + entries.remove(new Key(key.getRow().toString(), "REASONS", "")); + + TabletManagement tmi = new TabletManagement(key, val, true); + assertEquals(entries, tmi.getTabletMetadata().getKeyValues()); + assertEquals(actions, tmi.getActions()); + } + + @Test + public void testEncodeDecodeWithErrors() throws Exception { + KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + + final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); + + TabletManagement.addError(entries, new UnsupportedOperationException("Not supported.")); + Key key = entries.firstKey(); + Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), + new ArrayList<>(entries.values())); + + // Remove the ERROR column from the entries map for the comparison check + // below + entries.remove(new Key(key.getRow().toString(), "ERROR", "")); + + TabletManagement tmi = new TabletManagement(key, val, true); + assertEquals(entries, tmi.getTabletMetadata().getKeyValues()); + assertEquals("Not supported.", tmi.getErrorMessage()); + } + + @Test + public void testBinary() throws Exception { + // test end row with non ascii data + Text endRow = new Text(new byte[] {'m', (byte) 0xff}); + KeyExtent extent = new KeyExtent(TableId.of("5"), endRow, new Text("da")); + + final Set<ManagementAction> actions = + Set.of(ManagementAction.NEEDS_LOCATION_UPDATE, ManagementAction.NEEDS_SPLITTING); + + final SortedMap<Key,Value> entries = createMetadataEntryKV(extent); + + TabletManagement.addActions(entries, actions); + Key key = entries.firstKey(); + Value val = WholeRowIterator.encodeRow(new ArrayList<>(entries.keySet()), + new ArrayList<>(entries.values())); + + assertTrue(entries.keySet().stream().allMatch(k -> k.getRow().equals(extent.toMetaRow()))); + + // Remove the REASONS column from the entries map for the comparison check + // below + entries.remove(new Key(key.getRow(), new Text("REASONS"), new Text(""))); + + TabletManagement tmi = new TabletManagement(key, val, true); + assertEquals(entries, tmi.getTabletMetadata().getKeyValues()); + assertEquals(actions, tmi.getActions()); + + } +} diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 493b64afb4,807ba9f690..4b4d35918b --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@@ -294,15 -286,16 +294,15 @@@ public class GarbageCollectWriteAheadLo } // Tablet is being recovered and has WAL references, remove all the WALs for the dead server // that made the WALs. - for (Collection<String> wals : state.walogs) { - for (String wal : wals) { - UUID walUUID = path2uuid(new Path(wal)); - TServerInstance dead = result.get(walUUID); - // There's a reference to a log file, so skip that server's logs - Set<UUID> idsToIgnore = candidates.remove(dead); - if (idsToIgnore != null) { - result.keySet().removeAll(idsToIgnore); - recoveryLogs.keySet().removeAll(idsToIgnore); - } + for (LogEntry wals : tabletMetadata.getLogs()) { - String wal = wals.getFilePath(); ++ String wal = wals.getPath(); + UUID walUUID = path2uuid(new Path(wal)); + TServerInstance dead = result.get(walUUID); + // There's a reference to a log file, so skip that server's logs + Set<UUID> idsToIgnore = candidates.remove(dead); + if (idsToIgnore != null) { + result.keySet().removeAll(idsToIgnore); + recoveryLogs.keySet().removeAll(idsToIgnore); } } } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 9cb41a90c1,86bc06930c..7c66d84a2f --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@@ -158,70 -156,72 +158,70 @@@ public class RecoveryManager } } - public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) - throws IOException { + public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) throws IOException { boolean recoveryNeeded = false; - for (Collection<String> logs : walogs) { - for (String walog : logs) { - - Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), FileType.WAL, - manager.getContext().getVolumeReplacements()); - if (switchedWalog != null) { - // replaces the volume used for sorting, but do not change entry in metadata table. When - // the tablet loads it will change the metadata table entry. If - // the tablet has the same replacement config, then it will find the sorted log. - log.info("Volume replaced {} -> {}", walog, switchedWalog); - walog = switchedWalog.toString(); - } + for (LogEntry entry : walogs) { - String walog = entry.getFilePath(); ++ String walog = entry.getPath(); + + Path switchedWalog = VolumeUtil.switchVolume(new Path(walog), FileType.WAL, + manager.getContext().getVolumeReplacements()); + if (switchedWalog != null) { + // replaces the volume used for sorting, but do not change entry in metadata table. When + // the tablet loads it will change the metadata table entry. If + // the tablet has the same replacement config, then it will find the sorted log. + log.info("Volume replaced {} -> {}", walog, switchedWalog); + walog = switchedWalog.toString(); + } - String[] parts = walog.split("/"); - String sortId = parts[parts.length - 1]; - String filename = new Path(walog).toString(); - String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); + String[] parts = walog.split("/"); + String sortId = parts[parts.length - 1]; + String filename = new Path(walog).toString(); + String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); + + boolean sortQueued; + synchronized (this) { + sortQueued = sortsQueued.contains(sortId); + } - boolean sortQueued; + if (sortQueued + && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) + == null) { synchronized (this) { - sortQueued = sortsQueued.contains(sortId); + sortsQueued.remove(sortId); } + } - if (sortQueued - && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) - == null) { - synchronized (this) { - sortsQueued.remove(sortId); - } + if (exists(SortedLogState.getFinishedMarkerPath(dest))) { + synchronized (this) { + closeTasksQueued.remove(sortId); + recoveryDelay.remove(sortId); + sortsQueued.remove(sortId); } + continue; + } - if (exists(SortedLogState.getFinishedMarkerPath(dest))) { - synchronized (this) { - closeTasksQueued.remove(sortId); - recoveryDelay.remove(sortId); - sortsQueued.remove(sortId); + recoveryNeeded = true; + synchronized (this) { + if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { + AccumuloConfiguration aconf = manager.getConfiguration(); + LogCloser closer = Property.createInstanceFromPropertyName(aconf, + Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); + Long delay = recoveryDelay.get(sortId); + if (delay == null) { + delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); + } else { + delay = Math.min(2 * delay, 1000 * 60 * 5L); } - continue; - } - recoveryNeeded = true; - synchronized (this) { - if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { - AccumuloConfiguration aconf = manager.getConfiguration(); - LogCloser closer = Property.createInstanceFromPropertyName(aconf, - Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); - Long delay = recoveryDelay.get(sortId); - if (delay == null) { - delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); - } else { - delay = Math.min(2 * delay, 1000 * 60 * 5L); - } - - log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, - (delay / 1000), extent); - - ScheduledFuture<?> future = executor.schedule( - new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); - closeTasksQueued.add(sortId); - recoveryDelay.put(sortId, delay); - } + log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, + (delay / 1000), extent); + + ScheduledFuture<?> future = executor.schedule( + new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); + closeTasksQueued.add(sortId); + recoveryDelay.put(sortId, delay); } } } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index a1619d1697,eb283b6b46..b8cc36b6c9 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@@ -82,10 -88,15 +82,11 @@@ import org.apache.accumulo.core.summary import org.apache.accumulo.core.summary.SummaryCollection; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.tablet.thrift.TabletManagementClientService; -import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo; import org.apache.accumulo.core.tabletingest.thrift.TDurability; import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; + import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; -import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 842d881ecd,9d6043ef3e..1f2870cc62 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -1294,72 -2091,19 +1293,72 @@@ public class Tablet extends TabletBase * Update tablet file data from flush. Returns a StoredTabletFile if there are data entries. */ public Optional<StoredTabletFile> updateTabletDataFile(long maxCommittedTime, - ReferencedTabletFile newDatafile, DataFileValue dfv, Set<String> unusedWalLogs, long flushId, - MinorCompactionReason mincReason) { + ReferencedTabletFile newDatafile, DataFileValue dfv, Set<LogEntry> unusedWalLogs, - long flushId) { - synchronized (timeLock) { - if (maxCommittedTime > persistedTime) { - persistedTime = maxCommittedTime; - } ++ long flushId, MinorCompactionReason mincReason) { + + Preconditions.checkState(refreshLock.isHeldByCurrentThread()); + + // Read these once in case of buggy race conditions will get consistent logging. If all other + // code is locking properly these should not change during this method. + var lastTabletMetadata = getMetadata(); + var expectedTime = lastTabletMetadata.getTime(); + + // Expect time to only move forward from what was recently seen in metadata table. + Preconditions.checkArgument(maxCommittedTime >= expectedTime.getTime()); - return ManagerMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent, - newDatafile, dfv, tabletTime.getMetadataTime(persistedTime), - tabletServer.getTabletSession(), tabletServer.getLock(), unusedWalLogs, lastLocation, - flushId); + // The tablet time is used to determine if the write succeeded, in order to do this the tablet + // time needs to be different from what is currently stored in the metadata table. + while (maxCommittedTime == expectedTime.getTime()) { + var nextTime = tabletTime.getAndUpdateTime(); + Preconditions.checkState(nextTime >= maxCommittedTime); + if (nextTime > maxCommittedTime) { + maxCommittedTime++; + } } + try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets()) { + + var expectedLocation = mincReason == MinorCompactionReason.RECOVERY + ? Location.future(tabletServer.getTabletSession()) + : Location.current(tabletServer.getTabletSession()); + + var tablet = tabletsMutator.mutateTablet(extent).requireLocation(expectedLocation) + .requireSame(lastTabletMetadata, ColumnType.TIME); + + Optional<StoredTabletFile> newFile = Optional.empty(); + + // if entries are present, write to path to metadata table + if (dfv.getNumEntries() > 0) { + tablet.putFile(newDatafile, dfv); + newFile = Optional.of(newDatafile.insert()); + } + + var newTime = tabletTime.getMetadataTime(maxCommittedTime); + tablet.putTime(newTime); + + tablet.putFlushId(flushId); + + unusedWalLogs.forEach(tablet::deleteWal); + + tablet.putZooLock(getContext().getZooKeeperRoot(), tabletServer.getLock()); + + // When trying to determine if write was successful, check if the time was updated. Can not + // check if the new file exists because of two reasons. First, it could be compacted away + // between the write and check. Second, some flushes do not produce a file. + tablet.submit(tabletMetadata -> tabletMetadata.getTime().equals(newTime)); + + var result = tabletsMutator.process().get(extent); + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + + log.error("Metadata for failed tablet file update : {}", result.readMetadata()); + + // Include the things that could have caused the write to fail. + throw new IllegalStateException("Unable to write minor compaction. " + extent + " " + + expectedLocation + " " + expectedTime); + } + + return newFile; + } } @Override @@@ -1416,106 -2192,12 +1415,106 @@@ return timer.getTabletStats(); } - private static String createTabletDirectoryName(ServerContext context, Text endRow) { - if (endRow == null) { - return ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; - } else { - UniqueNameAllocator namer = context.getUniqueNameAllocator(); - return Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName(); + public boolean isOnDemand() { + // TODO a change in the hosting goal could refresh online tablets + return getMetadata().getHostingGoal() == TabletHostingGoal.ONDEMAND; + } + + // The purpose of this lock is to prevent race conditions between concurrent refresh RPC calls and + // between minor compactions and refresh calls. + private final ReentrantLock refreshLock = new ReentrantLock(); + + void bringMinorCompactionOnline(ReferencedTabletFile tmpDatafile, + ReferencedTabletFile newDatafile, DataFileValue dfv, CommitSession commitSession, + long flushId, MinorCompactionReason mincReason) { + Optional<StoredTabletFile> newFile; + // rename before putting in metadata table, so files in metadata table should + // always exist + boolean attemptedRename = false; + VolumeManager vm = getTabletServer().getContext().getVolumeManager(); + do { + try { + if (dfv.getNumEntries() == 0) { + log.debug("No data entries so delete temporary file {}", tmpDatafile); + vm.deleteRecursively(tmpDatafile.getPath()); + } else { + if (!attemptedRename && vm.exists(newDatafile.getPath())) { + log.warn("Target data file already exist {}", newDatafile); + throw new RuntimeException("File unexpectedly exists " + newDatafile.getPath()); + } + // the following checks for spurious rename failures that succeeded but gave an IoE + if (attemptedRename && vm.exists(newDatafile.getPath()) + && !vm.exists(tmpDatafile.getPath())) { + // seems like previous rename succeeded, so break + break; + } + attemptedRename = true; + ScanfileManager.rename(vm, tmpDatafile.getPath(), newDatafile.getPath()); + } + break; + } catch (IOException ioe) { + log.warn("Tablet " + getExtent() + " failed to rename " + newDatafile + + " after MinC, will retry in 60 secs...", ioe); + sleepUninterruptibly(1, TimeUnit.MINUTES); + } + } while (true); + + // The refresh lock must be held for the metadata write that adds the new file to the tablet. + // This prevents a concurrent refresh operation from pulling in the new tablet file before the + // in memory map reference related to the file is deactivated. Scans should use one of the in + // memory map or the new file, never both. + Preconditions.checkState(!getLogLock().isHeldByCurrentThread()); + refreshLock.lock(); + try { + // Can not hold tablet lock while acquiring the log lock. The following check is there to + // prevent deadlock. + getLogLock().lock(); + // do not place any code here between lock and try + try { + // The following call pairs with tablet.finishClearingUnusedLogs() later in this block. If + // moving where the following method is called, examine it and finishClearingUnusedLogs() + // before moving. - Set<String> unusedWalLogs = beginClearingUnusedLogs(); ++ Set<LogEntry> unusedWalLogs = beginClearingUnusedLogs(); + // the order of writing to metadata and walog is important in the face of machine/process + // failures need to write to metadata before writing to walog, when things are done in the + // reverse order data could be lost... the minor compaction start event should be written + // before the following metadata write is made + + newFile = updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, dfv, + unusedWalLogs, flushId, mincReason); + + finishClearingUnusedLogs(); + } finally { + getLogLock().unlock(); + } + + // Without the refresh lock, if a refresh happened here it could make the new file written to + // the metadata table above available for scans while the in memory map from which the file + // was produced is still available for scans + + do { + try { + // the purpose of making this update use the new commit session, instead of the old one + // passed in, is because the new one will reference the logs used by current memory... + getTabletServer().minorCompactionFinished(getTabletMemory().getCommitSession(), + commitSession.getWALogSeq() + 2); + break; + } catch (IOException e) { + log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e); + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } while (true); + + refreshMetadata(RefreshPurpose.MINC_COMPLETION); + } finally { + refreshLock.unlock(); + } + TabletLogger.flushed(getExtent(), newFile); + + long splitSize = getTableConfiguration().getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + if (dfv.getSize() > splitSize) { + log.debug(String.format("Minor Compaction wrote out file larger than split threshold." + + " split threshold = %,d file size = %,d", splitSize, dfv.getSize())); } } diff --cc shell/src/test/java/org/apache/accumulo/shell/commands/ListTabletsCommandTest.java index 14856bb9e7,583819b96b..0730c80245 --- a/shell/src/test/java/org/apache/accumulo/shell/commands/ListTabletsCommandTest.java +++ b/shell/src/test/java/org/apache/accumulo/shell/commands/ListTabletsCommandTest.java @@@ -128,50 -113,6 +128,50 @@@ public class ListTabletsCommandTest public void mockTest() throws Exception { ListTabletsCommand cmd = new TestListTabletsCommand(); + TableId tableId = TableId.of("123"); + + TServerInstance ser1 = new TServerInstance(HostAndPort.fromParts("server1", 8555), "s001"); + TServerInstance ser2 = new TServerInstance(HostAndPort.fromParts("server2", 2354), "s002"); + + StoredTabletFile sf11 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-dir1/sf11.rf")).insert(); + DataFileValue dfv11 = new DataFileValue(5643, 89); + + StoredTabletFile sf12 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-dir1/sf12.rf")).insert(); + DataFileValue dfv12 = new DataFileValue(379963, 1027); + + StoredTabletFile sf21 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-dir2/sf21.rf")).insert(); + DataFileValue dfv21 = new DataFileValue(5323, 142); + + StoredTabletFile sf31 = + new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-dir3/sf31.rf")).insert(); + DataFileValue dfv31 = new DataFileValue(95832L, 231); + + KeyExtent extent = new KeyExtent(tableId, new Text("d"), null); + - LogEntry le1 = new LogEntry("localhost:8020/" + UUID.randomUUID()); - LogEntry le2 = new LogEntry("localhost:8020/" + UUID.randomUUID()); ++ LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); ++ LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); + + TabletMetadata tm1 = TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.ONDEMAND) + .putLocation(TabletMetadata.Location.current(ser1)).putFile(sf11, dfv11) + .putFile(sf12, dfv12).putWal(le1).putDirName("t-dir1").build(); + + extent = new KeyExtent(tableId, new Text("k"), new Text("e")); + TabletMetadata tm2 = TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.ALWAYS) + .putLocation(TabletMetadata.Location.current(ser2)).putFile(sf21, dfv21) + .putDirName("t-dir2").build(LOGS); + + extent = new KeyExtent(tableId, null, new Text("l")); + TabletMetadata tm3 = TabletMetadata.builder(extent).putHostingGoal(TabletHostingGoal.NEVER) + .putFile(sf31, dfv31).putWal(le1).putWal(le2).putDirName("t-dir3").build(LOCATION); + + TabletInformationImpl[] tabletInformation = new TabletInformationImpl[3]; + tabletInformation[0] = new TabletInformationImpl(tm1, "HOSTED"); + tabletInformation[1] = new TabletInformationImpl(tm2, "HOSTED"); + tabletInformation[2] = new TabletInformationImpl(tm3, "UNASSIGNED"); + AccumuloClient client = EasyMock.createMock(AccumuloClient.class); ClientContext context = EasyMock.createMock(ClientContext.class); TableOperations tableOps = EasyMock.createMock(TableOperations.class); diff --cc test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 0633b934f8,0000000000..ebc86f97c8 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@@ -1,967 -1,0 +1,967 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static com.google.common.collect.MoreCollectors.onlyElement; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; +import static org.apache.accumulo.core.util.LazySingletons.GSON; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl; +import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Sets; + +public class AmpleConditionalWriterIT extends AccumuloClusterHarness { + + // ELASTICITY_TODO ensure that all conditional updates are tested + + private TableId tid; + private KeyExtent e1; + private KeyExtent e2; + private KeyExtent e3; + private KeyExtent e4; + + @BeforeEach + public void setupTable() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new Text("f"), new Text("j"))); + c.tableOperations().create(tableName, + new NewTableConfiguration().withSplits(splits).createOffline()); + + c.securityOperations().grantTablePermission("root", MetadataTable.NAME, + TablePermission.WRITE); + + tid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + e1 = new KeyExtent(tid, new Text("c"), null); + e2 = new KeyExtent(tid, new Text("f"), new Text("c")); + e3 = new KeyExtent(tid, new Text("j"), new Text("f")); + e4 = new KeyExtent(tid, null, new Text("j")); + } + } + + @Test + public void testLocations() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + var ts1 = new TServerInstance("localhost:9997", 5000L); + var ts2 = new TServerInstance("localhost:9997", 6000L); + + var context = cluster.getServerContext(); + + assertNull(context.getAmple().readTablet(e1).getLocation()); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1)) + .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1)) + .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts2)) + .putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1)) + .deleteLocation(Location.current(ts1)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertNull(context.getAmple().readTablet(e1).getLocation()); + } + } + + @Test + public void testFiles() throws Exception { + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var ts1 = new TServerInstance("localhost:9997", 5000L); + var ts2 = new TServerInstance("localhost:9997", 6000L); + + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var stf4 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf")); + var dfv = new DataFileValue(100, 100); + + System.out.println(context.getAmple().readTablet(e1).getLocation()); + + // simulate a compaction where the tablet location is not set + var ctmi = new ConditionalTabletsMutatorImpl(context); + + var tm1 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) + .build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES) + .putFile(stf4, new DataFileValue(0, 0)).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles()); + + var tm2 = TabletMetadata.builder(e1).putLocation(Location.current(ts1)).build(); + // simulate minor compacts where the tablet location is not set + for (StoredTabletFile file : List.of(stf1, stf2, stf3)) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOCATION) + .putFile(file, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + } + + assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles()); + + // set the location + var tm3 = TabletMetadata.builder(e1).build(LOCATION); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, LOCATION) + .putLocation(Location.current(ts1)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + var tm4 = TabletMetadata.builder(e1).putLocation(Location.current(ts2)).build(); + // simulate minor compacts where the tablet location is wrong + for (StoredTabletFile file : List.of(stf1, stf2, stf3)) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm4, LOCATION) + .putFile(file, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + } + + assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles()); + + // simulate minor compacts where the tablet location is set + for (StoredTabletFile file : List.of(stf1, stf2, stf3)) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOCATION) + .putFile(file, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + } + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + + // simulate a compaction and test a subset and superset of files + for (var tabletMeta : List.of( + TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).build(), + TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) + .putFile(stf4, dfv).build())) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta, FILES) + .putFile(stf4, new DataFileValue(0, 0)).deleteFile(stf1).deleteFile(stf2) + .deleteFile(stf3).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + } + + // simulate a compaction + ctmi = new ConditionalTabletsMutatorImpl(context); + var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) + .build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES) + .putFile(stf4, new DataFileValue(0, 0)).deleteFile(stf1).deleteFile(stf2).deleteFile(stf3) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf4), context.getAmple().readTablet(e1).getFiles()); + + // simulate a bulk import + var stf5 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/b-0000009/I0000074.rf")); + ctmi = new ConditionalTabletsMutatorImpl(context); + var tm6 = TabletMetadata.builder(e1).build(LOADED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED) + .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), 9L) + .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf4, stf5), context.getAmple().readTablet(e1).getFiles()); + + // simulate a compaction + var stf6 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf")); + ctmi = new ConditionalTabletsMutatorImpl(context); + var tm7 = TabletMetadata.builder(e1).putFile(stf4, dfv).putFile(stf5, dfv).build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm7, FILES) + .putFile(stf6, new DataFileValue(0, 0)).deleteFile(stf4).deleteFile(stf5) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf6), context.getAmple().readTablet(e1).getFiles()); + + // simulate trying to re bulk import file after a compaction + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED) + .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), 9L) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf6), context.getAmple().readTablet(e1).getFiles()); + } + } + + @Test + public void testWALs() { + var context = cluster.getServerContext(); + + // Test adding a WAL to a tablet and verifying its presence + String walFilePath = - java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString(); - LogEntry originalLogEntry = new LogEntry(walFilePath); ++ java.nio.file.Path.of("tserver+8080", UUID.randomUUID().toString()).toString(); ++ LogEntry originalLogEntry = LogEntry.fromPath(walFilePath); + ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); + // create a tablet metadata with no write ahead logs + var tmEmptySet = TabletMetadata.builder(e1).build(LOGS); + // tablet should not have any logs to start with so requireSame with the empty logs should pass + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmEmptySet, LOGS) + .putWal(originalLogEntry).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + Set<LogEntry> expectedLogs = new HashSet<>(); + expectedLogs.add(originalLogEntry); + assertEquals(expectedLogs, new HashSet<>(context.getAmple().readTablet(e1).getLogs()), + "The original LogEntry should be present."); + + // Test adding another WAL and verifying the update + String walFilePath2 = - java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString(); - LogEntry newLogEntry = new LogEntry(walFilePath2); ++ java.nio.file.Path.of("tserver+8080", UUID.randomUUID().toString()).toString(); ++ LogEntry newLogEntry = LogEntry.fromPath(walFilePath2); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().putWal(newLogEntry).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + // Verify that both the original and new WALs are present + expectedLogs.add(newLogEntry); + HashSet<LogEntry> actualLogs = new HashSet<>(context.getAmple().readTablet(e1).getLogs()); + assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry should be present."); + + String walFilePath3 = - java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString(); - LogEntry otherLogEntry = new LogEntry(walFilePath3); ++ java.nio.file.Path.of("tserver+8080", UUID.randomUUID().toString()).toString(); ++ LogEntry otherLogEntry = LogEntry.fromPath(walFilePath3); + + // create a powerset to ensure all possible subsets fail when using requireSame except the + // expected current state + Set<LogEntry> allLogs = Set.of(originalLogEntry, newLogEntry, otherLogEntry); + Set<Set<LogEntry>> allSubsets = Sets.powerSet(allLogs); + + for (Set<LogEntry> subset : allSubsets) { + // Skip the subset that matches the current state of the tablet + if (subset.equals(expectedLogs)) { + continue; + } + + final TabletMetadataBuilder builder = TabletMetadata.builder(e1); + subset.forEach(builder::putWal); + TabletMetadata tmSubset = builder.build(LOGS); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmSubset, LOGS) + .deleteWal(originalLogEntry).submit(t -> false); + results = ctmi.process(); + + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + // ensure the operation did not go through + actualLogs = new HashSet<>(context.getAmple().readTablet(e1).getLogs()); + assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry should be present."); + } + + // Test that requiring the current WALs gets accepted when making an update (deleting a WAL in + // this example) + TabletMetadata tm2 = + TabletMetadata.builder(e1).putWal(originalLogEntry).putWal(newLogEntry).build(LOGS); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOGS) + .deleteWal(originalLogEntry).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus(), + "Requiring the current WALs should result in acceptance when making an update."); + + // Verify that the update went through as expected + assertEquals(List.of(newLogEntry), context.getAmple().readTablet(e1).getLogs(), + "Only the new LogEntry should remain after deleting the original."); + } + + @Test + public void testSelectedFiles() throws Exception { + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var stf4 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf")); + var dfv = new DataFileValue(100, 100); + + System.out.println(context.getAmple().readTablet(e1).getLocation()); + + // simulate a compaction where the tablet location is not set + var ctmi = new ConditionalTabletsMutatorImpl(context); + var tm1 = TabletMetadata.builder(e1).build(FILES, SELECTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES).putFile(stf1, dfv) + .putFile(stf2, dfv).putFile(stf3, dfv).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES, SELECTED) + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 2L)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertNull(context.getAmple().readTablet(e1).getSelectedFiles()); + + var tm2 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) + .build(SELECTED); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, FILES, SELECTED) + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 2L)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Set.of(stf1, stf2, stf3), + context.getAmple().readTablet(e1).getSelectedFiles().getFiles()); + + // a list of selected files objects that are not the same as the current tablet and expected to + // fail + var expectedToFail = new ArrayList<SelectedFiles>(); + + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, 2L)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), true, 2L)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, 2L)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 3L)); + + for (var selectedFiles : expectedToFail) { + var tm3 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) + .putSelectedFiles(selectedFiles).build(); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm3, FILES, SELECTED) + .deleteSelectedFiles().submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertEquals(Set.of(stf1, stf2, stf3), + context.getAmple().readTablet(e1).getSelectedFiles().getFiles()); + } + + var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 2L)).build(); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES, SELECTED) + .deleteSelectedFiles().submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + assertNull(context.getAmple().readTablet(e1).getSelectedFiles()); + } + + /** + * Verifies that if selected files have been manually changed in the metadata, the files will be + * re-ordered before being read + */ + @Test + public void testSelectedFilesReordering() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + String pathPrefix = "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/"; + StoredTabletFile stf1 = StoredTabletFile.of(new Path(pathPrefix + "F0000070.rf")); + StoredTabletFile stf2 = StoredTabletFile.of(new Path(pathPrefix + "F0000071.rf")); + StoredTabletFile stf3 = StoredTabletFile.of(new Path(pathPrefix + "F0000072.rf")); + + final Set<StoredTabletFile> storedTabletFiles = Set.of(stf1, stf2, stf3); + final boolean initiallySelectedAll = true; + final long fateTxId = 2L; + final SelectedFiles selectedFiles = + new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateTxId); + + ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); + + // write the SelectedFiles to the keyextent + ctmi.mutateTablet(e1).requireAbsentOperation().putSelectedFiles(selectedFiles) + .submit(tm -> false); + + // verify we can read the selected files + Status mutationStatus = ctmi.process().get(e1).getStatus(); + assertEquals(Status.ACCEPTED, mutationStatus, "Failed to put selected files to tablet"); + assertEquals(selectedFiles, context.getAmple().readTablet(e1).getSelectedFiles(), + "Selected files should match those that were written"); + + final Text row = e1.toMetaRow(); + final Text selectedColumnFamily = SELECTED_COLUMN.getColumnFamily(); + final Text selectedColumnQualifier = SELECTED_COLUMN.getColumnQualifier(); + + Supplier<String> selectedMetadataValue = () -> { + try (Scanner scanner = client.createScanner(MetadataTable.NAME)) { + scanner.fetchColumn(selectedColumnFamily, selectedColumnQualifier); + scanner.setRange(new Range(row)); + + return scanner.stream().map(Map.Entry::getValue).map(Value::get) + .map(entry -> new String(entry, UTF_8)).collect(onlyElement()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + String actualMetadataValue = selectedMetadataValue.get(); + final String expectedMetadata = selectedFiles.getMetadataValue(); + assertEquals(expectedMetadata, actualMetadataValue, + "Value should be equal to metadata of SelectedFiles object that was written"); + + // get the string value of the paths + List<String> filesPathList = storedTabletFiles.stream().map(StoredTabletFile::toString) + .sorted().collect(Collectors.toList()); + + // verify we have the format of the json correct + String newJson = createSelectedFilesJson(fateTxId, initiallySelectedAll, filesPathList); + assertEquals(actualMetadataValue, newJson, + "Test json should be identical to actual metadata at this point"); + + // reverse the order of the files and create a new json + Collections.reverse(filesPathList); + newJson = createSelectedFilesJson(fateTxId, initiallySelectedAll, filesPathList); + assertNotEquals(actualMetadataValue, newJson, + "Test json should have reverse file order of actual metadata"); + + // write the json with reverse file order + try (BatchWriter bw = client.createBatchWriter(MetadataTable.NAME)) { + Mutation mutation = new Mutation(row); + mutation.put(selectedColumnFamily, selectedColumnQualifier, + new Value(newJson.getBytes(UTF_8))); + bw.addMutation(mutation); + } + + // verify the metadata has been changed + actualMetadataValue = selectedMetadataValue.get(); + assertEquals(newJson, actualMetadataValue, + "Value should be equal to the new json we manually wrote above"); + + TabletMetadata tm1 = + TabletMetadata.builder(e1).putSelectedFiles(selectedFiles).build(SELECTED); + ctmi = new ConditionalTabletsMutatorImpl(context); + StoredTabletFile stf4 = StoredTabletFile.of(new Path(pathPrefix + "F0000073.rf")); + // submit a mutation with the condition that the selected files match what was originally + // written + DataFileValue dfv = new DataFileValue(100, 100); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, SELECTED).putFile(stf4, dfv) + .submit(tm -> false); + + mutationStatus = ctmi.process().get(e1).getStatus(); + + // with a SortedFilesIterator attached to the Condition for SELECTED, the metadata should be + // re-ordered and once again match what was originally written meaning the conditional + // mutation should have been accepted and the file added should be present + assertEquals(Status.ACCEPTED, mutationStatus); + assertEquals(Set.of(stf4), context.getAmple().readTablet(e1).getFiles(), + "Expected to see the file that was added by the mutation"); + } + } + + /** + * Creates a json suitable to create a SelectedFiles object from. The given parameters will be + * inserted into the returned json String. Example of returned json String: + * + * <pre> + * { + * "txid": "FATE[123456]", + * "selAll": true, + * "files": ["/path/to/file1.rf", "/path/to/file2.rf"] + * } + * </pre> + */ + public static String createSelectedFilesJson(Long txid, boolean selAll, + Collection<String> paths) { + String filesJsonArray = GSON.get().toJson(paths); + String formattedTxid = FateTxId.formatTid(Long.parseLong(Long.toString(txid), 16)); + return ("{'txid':'" + formattedTxid + "','selAll':" + selAll + ",'files':" + filesJsonArray + + "}").replace('\'', '\"'); + } + + @Test + public void testMultipleExtents() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var ts1 = new TServerInstance("localhost:9997", 5000L); + var ts2 = new TServerInstance("localhost:9997", 6000L); + + var context = cluster.getServerContext(); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + var results = ctmi.process(); + + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + assertEquals(Location.future(ts2), context.getAmple().readTablet(e2).getLocation()); + assertNull(context.getAmple().readTablet(e3).getLocation()); + assertNull(context.getAmple().readTablet(e4).getLocation()); + + assertEquals(Set.of(e1, e2), results.keySet()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + ctmi.mutateTablet(e3).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + ctmi.mutateTablet(e4).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + results = ctmi.process(); + + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Status.REJECTED, results.get(e2).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e3).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e4).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + assertEquals(Location.future(ts2), context.getAmple().readTablet(e2).getLocation()); + assertEquals(Location.future(ts1), context.getAmple().readTablet(e3).getLocation()); + assertEquals(Location.future(ts2), context.getAmple().readTablet(e4).getLocation()); + + assertEquals(Set.of(e1, e2, e3, e4), results.keySet()); + + } + } + + @Test + public void testOperations() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + var opid1 = TabletOperationId.from("SPLITTING:FATE[1234]"); + var opid2 = TabletOperationId.from("MERGING:FATE[5678]"); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> false); + ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit(tm -> false); + ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit(tm -> false); + var results = ctmi.process(); + + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + assertEquals(Status.REJECTED, results.get(e3).getStatus()); + assertEquals(TabletOperationType.SPLITTING, + context.getAmple().readTablet(e1).getOperationId().getType()); + assertEquals(opid1, context.getAmple().readTablet(e1).getOperationId()); + assertEquals(TabletOperationType.MERGING, + context.getAmple().readTablet(e2).getOperationId().getType()); + assertEquals(opid2, context.getAmple().readTablet(e2).getOperationId()); + assertNull(context.getAmple().readTablet(e3).getOperationId()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit(tm -> false); + ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit(tm -> false); + results = ctmi.process(); + + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Status.REJECTED, results.get(e2).getStatus()); + assertEquals(TabletOperationType.SPLITTING, + context.getAmple().readTablet(e1).getOperationId().getType()); + assertEquals(TabletOperationType.MERGING, + context.getAmple().readTablet(e2).getOperationId().getType()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit(tm -> false); + ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit(tm -> false); + results = ctmi.process(); + + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + assertNull(context.getAmple().readTablet(e1).getOperationId()); + assertNull(context.getAmple().readTablet(e2).getOperationId()); + } + } + + @Test + public void testCompacted() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + + var tabletMeta1 = TabletMetadata.builder(e1).build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .putCompacted(55L).submit(tabletMetadata -> tabletMetadata.getCompacted().contains(55L)); + var tabletMeta2 = TabletMetadata.builder(e2).putCompacted(45L).build(COMPACTED); + ctmi.mutateTablet(e2).requireAbsentOperation().requireSame(tabletMeta2, COMPACTED) + .putCompacted(56L).submit(tabletMetadata -> tabletMetadata.getCompacted().contains(56L)); + + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.REJECTED, results.get(e2).getStatus()); + + tabletMeta1 = context.getAmple().readTablet(e1); + assertEquals(Set.of(55L), tabletMeta1.getCompacted()); + assertEquals(Set.of(), context.getAmple().readTablet(e2).getCompacted()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .putCompacted(65L).putCompacted(75L).submit(tabletMetadata -> false); + + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + tabletMeta1 = context.getAmple().readTablet(e1); + assertEquals(Set.of(55L, 65L, 75L), tabletMeta1.getCompacted()); + + // test require same with a superset + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L) + .putCompacted(45L).build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Set.of(55L, 65L, 75L), context.getAmple().readTablet(e1).getCompacted()); + + // test require same with a subset + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Set.of(55L, 65L, 75L), context.getAmple().readTablet(e1).getCompacted()); + + // now use the exact set the tablet has + ctmi = new ConditionalTabletsMutatorImpl(context); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L) + .build(COMPACTED); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) + .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .submit(tabletMetadata -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Set.of(), context.getAmple().readTablet(e1).getCompacted()); + } + } + + @Test + public void testRootTabletUpdate() { + var context = cluster.getServerContext(); + + var rootMeta = context.getAmple().readTablet(RootTable.EXTENT); + var loc = rootMeta.getLocation(); + + assertEquals(LocationType.CURRENT, loc.getType()); + assertNull(rootMeta.getOperationId()); + + TabletOperationId opid = TabletOperationId.from(TabletOperationType.MERGING, 7); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation() + .putOperation(opid).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus()); + assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation() + .requireLocation(Location.future(loc.getServerInstance())).putOperation(opid) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus()); + assertNull(context.getAmple().readTablet(RootTable.EXTENT).getOperationId()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation() + .requireLocation(Location.current(loc.getServerInstance())).putOperation(opid) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus()); + assertEquals(opid.canonical(), + context.getAmple().readTablet(RootTable.EXTENT).getOperationId().canonical()); + } + + @Test + public void testTime() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + for (var time : List.of(new MetadataTime(100, TimeType.LOGICAL), + new MetadataTime(100, TimeType.MILLIS), new MetadataTime(0, TimeType.LOGICAL))) { + var ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta1 = TabletMetadata.builder(e1).putTime(time).build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME) + .putTime(new MetadataTime(101, TimeType.LOGICAL)).submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(new MetadataTime(0, TimeType.MILLIS), + context.getAmple().readTablet(e1).getTime()); + } + + for (int i = 0; i < 10; i++) { + var ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta1 = + TabletMetadata.builder(e1).putTime(new MetadataTime(i, TimeType.MILLIS)).build(); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME) + .putTime(new MetadataTime(i + 1, TimeType.MILLIS)).submit(tabletMetadata -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(new MetadataTime(i + 1, TimeType.MILLIS), + context.getAmple().readTablet(e1).getTime()); + } + } + } + + @Test + public void testFlushId() { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var context = cluster.getServerContext(); + + assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + + var tabletMeta1 = TabletMetadata.builder(e1).putFlushId(42L).build(); + assertTrue(tabletMeta1.getFlushId().isPresent()); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, FLUSH_ID) + .putFlushId(43L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 43L); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertTrue(context.getAmple().readTablet(e1).getFlushId().isEmpty()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta2 = TabletMetadata.builder(e1).build(FLUSH_ID); + assertFalse(tabletMeta2.getFlushId().isPresent()); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta2, FLUSH_ID) + .putFlushId(43L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 43L); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(43L, context.getAmple().readTablet(e1).getFlushId().getAsLong()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + var tabletMeta3 = TabletMetadata.builder(e1).putFlushId(43L).build(); + assertTrue(tabletMeta1.getFlushId().isPresent()); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, FLUSH_ID) + .putFlushId(44L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 44L); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(44L, context.getAmple().readTablet(e1).getFlushId().getAsLong()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta3, FLUSH_ID) + .putFlushId(45L).submit(tabletMetadata -> tabletMetadata.getFlushId().orElse(-1) == 45L); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(44L, context.getAmple().readTablet(e1).getFlushId().getAsLong()); + } + } + + @Test + public void testAsyncMutator() throws Exception { + var table = getUniqueNames(2)[1]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + // The AsyncConditionalTabletsMutatorImpl processes batches of conditional mutations. Run + // tests where more than the batch size is processed an ensure this handled correctly. + + TreeSet<Text> splits = + IntStream.range(1, (int) (AsyncConditionalTabletsMutatorImpl.BATCH_SIZE * 2.5)) + .mapToObj(i -> new Text(String.format("%06d", i))) + .collect(Collectors.toCollection(TreeSet::new)); + + assertTrue(splits.size() > AsyncConditionalTabletsMutatorImpl.BATCH_SIZE); + + c.tableOperations().create(table, new NewTableConfiguration().withSplits(splits)); + var tableId = TableId.of(c.tableOperations().tableIdMap().get(table)); + + var ample = cluster.getServerContext().getAmple(); + + AtomicLong accepted = new AtomicLong(0); + AtomicLong total = new AtomicLong(0); + Consumer<Ample.ConditionalResult> resultsConsumer = result -> { + if (result.getStatus() == Status.ACCEPTED) { + accepted.incrementAndGet(); + } + total.incrementAndGet(); + }; + + // run a test where a subset of tablets are modified, all modifications should be accepted + var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50); + + int expected = 0; + try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build(); + var mutator = ample.conditionallyMutateTablets(resultsConsumer)) { + for (var tablet : tablets) { + if (tablet.getEndRow() != null + && Integer.parseInt(tablet.getEndRow().toString()) % 2 == 0) { + mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid1) + .submit(tm -> opid1.equals(tm.getOperationId())); + expected++; + } + } + } + + assertTrue(expected > 0); + assertEquals(expected, accepted.get()); + assertEquals(total.get(), accepted.get()); + + // run test where some will be accepted and some will be rejected and ensure the counts come + // out as expected. + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51); + + accepted.set(0); + total.set(0); + + try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build(); + var mutator = ample.conditionallyMutateTablets(resultsConsumer)) { + for (var tablet : tablets) { + mutator.mutateTablet(tablet.getExtent()).requireAbsentOperation().putOperation(opid2) + .submit(tm -> opid2.equals(tm.getOperationId())); + } + } + + var numTablets = splits.size() + 1; + assertEquals(numTablets - expected, accepted.get()); + assertEquals(numTablets, total.get()); + } + } + +} diff --cc test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index ae8b24ad9c,0000000000..5ca097da33 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@@ -1,484 -1,0 +1,484 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.ManagerState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.manager.state.TabletManagementIterator; +import org.apache.accumulo.server.manager.state.TabletManagementParameters; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +/** + * Test to ensure that the {@link TabletManagementIterator} properly skips over tablet information + * in the metadata table when there is no work to be done on the tablet (see ACCUMULO-3580) + */ +public class TabletManagementIteratorIT extends AccumuloClusterHarness { + private final static Logger log = LoggerFactory.getLogger(TabletManagementIteratorIT.class); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(3); + } + + @Test + public void test() throws AccumuloException, AccumuloSecurityException, TableExistsException, + TableNotFoundException, IOException { + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + String[] tables = getUniqueNames(8); + final String t1 = tables[0]; + final String t2 = tables[1]; + final String t3 = tables[2]; + final String t4 = tables[3]; + final String metaCopy1 = tables[4]; + final String metaCopy2 = tables[5]; + final String metaCopy3 = tables[6]; + final String metaCopy4 = tables[7]; + + // create some metadata + createTable(client, t1, true); + createTable(client, t2, false); + createTable(client, t3, true); + createTable(client, t4, true); + + // Scan table t3 which will cause it's tablets + // to be hosted. Then, remove the location. + Scanner s = client.createScanner(t3); + s.setRange(new Range()); + @SuppressWarnings("unused") + var unused = Iterables.size(s); // consume all the data + + // examine a clone of the metadata table, so we can manipulate it + copyTable(client, MetadataTable.NAME, metaCopy1); + + TabletManagementParameters tabletMgmtParams = createParameters(client); + int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); + while (tabletsInFlux > 0) { + log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1); + UtilWaitThread.sleep(500); + copyTable(client, MetadataTable.NAME, metaCopy1); + tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); + } + assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "No tables should need attention"); + + // The metadata table stabilized and metaCopy1 contains a copy suitable for testing. Before + // metaCopy1 is modified, copy it for subsequent test. + copyTable(client, metaCopy1, metaCopy2); + copyTable(client, metaCopy1, metaCopy3); + copyTable(client, metaCopy1, metaCopy4); + + // t1 is unassigned, setting to always will generate a change to host tablets + setTabletHostingGoal(client, metaCopy1, t1, TabletHostingGoal.ALWAYS.name()); + // t3 is hosted, setting to never will generate a change to unhost tablets + setTabletHostingGoal(client, metaCopy1, t3, TabletHostingGoal.NEVER.name()); + tabletMgmtParams = createParameters(client); + assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have four tablets with hosting goal changes"); + + // test the assigned case (no location) + removeLocation(client, metaCopy1, t3); + assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have two tablets without a loc"); + + // Test setting the operation id on one of the tablets in table t1. Table t1 has two tablets + // w/o a location. Only one should need attention because of the operation id. + setOperationId(client, metaCopy1, t1, new Text("some split"), TabletOperationType.SPLITTING); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have tablets needing attention because of operation id"); + + // test the cases where the assignment is to a dead tserver + reassignLocation(client, metaCopy2, t3); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, tabletMgmtParams), + "Only 1 of 2 tablets in table t1 should be returned"); + + // Remove location and set merge operation id on both tablets + // These tablets should not need attention as they have no WALs + setTabletHostingGoal(client, metaCopy4, t4, TabletHostingGoal.ALWAYS.name()); + removeLocation(client, metaCopy4, t4); + assertEquals(2, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Tablets have no location and a hosting goal of always, so they should need attention"); + + // Test MERGING and SPLITTING do not need attention with no location or wals + setOperationId(client, metaCopy4, t4, null, TabletOperationType.MERGING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for merge as they have no location"); + setOperationId(client, metaCopy4, t4, null, TabletOperationType.SPLITTING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for merge as they have no location"); + + // Create a log entry for one of the tablets, this tablet will now need attention + // for both MERGING and SPLITTING + setOperationId(client, metaCopy4, t4, null, TabletOperationType.MERGING); + createLogEntry(client, metaCopy4, t4); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have a tablet needing attention because of wals"); + // Switch op to SPLITTING which should also need attention like MERGING + setOperationId(client, metaCopy4, t4, null, TabletOperationType.SPLITTING); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have a tablet needing attention because of wals"); + + // Switch op to delete, no tablets should need attention even with WALs + setOperationId(client, metaCopy4, t4, null, TabletOperationType.DELETING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for delete"); + + // test the bad tablet location state case (inconsistent metadata) + tabletMgmtParams = createParameters(client); + addDuplicateLocation(client, metaCopy3, t3); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, tabletMgmtParams), + "Should have 1 tablet that needs a metadata repair"); + + // test the volume replacements case. Need to insert some files into + // the metadata for t4, then run the TabletManagementIterator with + // volume replacements + addFiles(client, metaCopy4, t4); + List<Pair<Path,Path>> replacements = new ArrayList<>(); + replacements.add(new Pair<Path,Path>(new Path("file:/vol1/accumulo/inst_id"), + new Path("file:/vol2/accumulo/inst_id"))); + tabletMgmtParams = createParameters(client, replacements); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have one tablet that needs a volume replacement"); + + // clean up + dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, metaCopy4); + } + } + + private void setTabletHostingGoal(AccumuloClient client, String table, String tableNameToModify, + String state) throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); + for (Entry<Key,Value> entry : scanner) { + Mutation m = new Mutation(entry.getKey().getRow()); + m.put(HostingColumnFamily.GOAL_COLUMN.getColumnFamily(), + HostingColumnFamily.GOAL_COLUMN.getColumnQualifier(), entry.getKey().getTimestamp() + 1, + new Value(state)); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + } + } + + private void addDuplicateLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); + m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new Value("fake:9005")); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + + private void addFiles(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); + m.put(DataFileColumnFamily.NAME, + new Text(StoredTabletFile + .serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")), + new Value(new DataFileValue(0, 0, 0).encode())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + try { + client.createScanner(table).iterator() + .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + e.getValue())); + } catch (TableNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private void reassignLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); + scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + Entry<Key,Value> entry = scanner.iterator().next(); + Mutation m = new Mutation(entry.getKey().getRow()); + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(), + entry.getKey().getTimestamp()); + m.put(entry.getKey().getColumnFamily(), new Text("1234567"), + entry.getKey().getTimestamp() + 1, new Value("fake:9005")); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + } + + // Sets an operation type on all tablets up to the end row + private void setOperationId(AccumuloClient client, String table, String tableNameToModify, + Text end, TabletOperationType opType) throws TableNotFoundException { + var opid = TabletOperationId.from(opType, 42L); + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + try (TabletsMetadata tabletsMetadata = + getServerContext().getAmple().readTablets().forTable(tableIdToModify) + .overlapping(null, end != null ? TabletsSection.encodeRow(tableIdToModify, end) : null) + .fetch(ColumnType.PREV_ROW).build()) { + for (TabletMetadata tabletMetadata : tabletsMetadata) { + Mutation m = new Mutation(tabletMetadata.getExtent().toMetaRow()); + MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, + new Value(opid.canonical())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + } + } + + private void removeLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + BatchDeleter deleter = client.createBatchDeleter(table, Authorizations.EMPTY, 1); + deleter + .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, null).toMetaRange())); + deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + deleter.delete(); + deleter.close(); + } + + private int findTabletsNeedingAttention(AccumuloClient client, String table, + TabletManagementParameters tabletMgmtParams) throws TableNotFoundException, IOException { + int results = 0; + List<KeyExtent> resultList = new ArrayList<>(); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + TabletManagementIterator.configureScanner(scanner, tabletMgmtParams); + scanner.updateScanIteratorOption("tabletChange", "debug", "1"); + for (Entry<Key,Value> e : scanner) { + if (e != null) { + TabletManagement mti = TabletManagementIterator.decode(e); + results++; + log.debug("Found tablets that changed state: {}", mti.getTabletMetadata().getExtent()); + log.debug("metadata: {}", mti.getTabletMetadata()); + resultList.add(mti.getTabletMetadata().getExtent()); + } + } + } + log.debug("Tablets in flux: {}", resultList); + return results; + } + + private void createTable(AccumuloClient client, String t, boolean online) + throws AccumuloSecurityException, AccumuloException, TableNotFoundException, + TableExistsException { + SortedSet<Text> partitionKeys = new TreeSet<>(); + partitionKeys.add(new Text("some split")); + NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitionKeys); + client.tableOperations().create(t, ntc); + client.tableOperations().online(t); + if (!online) { + client.tableOperations().offline(t, true); + } + } + + /** + * Create a copy of the source table by first gathering all the rows of the source in a list of + * mutations. Then create the copy of the table and apply the mutations to the copy. + */ + private void copyTable(AccumuloClient client, String source, String copy) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException { + try { + dropTables(client, copy); + } catch (TableNotFoundException ex) { + // ignored + } + + log.info("Gathering rows to copy {} ", source); + List<Mutation> mutations = new ArrayList<>(); + + try (Scanner scanner = client.createScanner(source, Authorizations.EMPTY)) { + RowIterator rows = new RowIterator(new IsolatedScanner(scanner)); + + while (rows.hasNext()) { + Iterator<Entry<Key,Value>> row = rows.next(); + Mutation m = null; + + while (row.hasNext()) { + Entry<Key,Value> entry = row.next(); + Key k = entry.getKey(); + if (m == null) { + m = new Mutation(k.getRow()); + } + + m.put(k.getColumnFamily(), k.getColumnQualifier(), k.getColumnVisibilityParsed(), + k.getTimestamp(), entry.getValue()); + } + + mutations.add(m); + } + } + + // metadata should be stable with only 6 rows (2 for each table) + log.debug("Gathered {} rows to create copy {}", mutations.size(), copy); + assertEquals(8, mutations.size(), "Metadata should have 8 rows (2 for each table)"); + client.tableOperations().create(copy); + + try (BatchWriter writer = client.createBatchWriter(copy)) { + for (Mutation m : mutations) { + writer.addMutation(m); + } + } + + log.info("Finished creating copy " + copy); + } + + private void dropTables(AccumuloClient client, String... tables) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + for (String t : tables) { + client.tableOperations().delete(t); + } + } + + // Creates a log entry on the "some split" extent, this could be modified easily to support + // other extents + private void createLogEntry(AccumuloClient client, String table, String tableNameToModify) + throws MutationsRejectedException, TableNotFoundException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), null); + Mutation m = new Mutation(extent.toMetaRow()); + String fileName = "file:/accumulo/wal/localhost+9997/" + UUID.randomUUID().toString(); - LogEntry logEntry = new LogEntry(fileName); ++ LogEntry logEntry = LogEntry.fromPath(fileName); + logEntry.addToMutation(m); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + + private static TabletManagementParameters createParameters(AccumuloClient client) { + return createParameters(client, List.of()); + } + + private static TabletManagementParameters createParameters(AccumuloClient client, + List<Pair<Path,Path>> replacements) { + var context = (ClientContext) client; + Set<TableId> onlineTables = Sets.filter(context.getTableIdToNameMap().keySet(), + tableId -> context.getTableState(tableId) == TableState.ONLINE); + + HashSet<TServerInstance> tservers = new HashSet<>(); + for (String tserver : context.instanceOperations().getTabletServers()) { + try { + var zPath = ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId()) + + Constants.ZTSERVERS + "/" + tserver); + long sessionId = ServiceLock.getSessionId(context.getZooCache(), zPath); + tservers.add(new TServerInstance(tserver, sessionId)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return new TabletManagementParameters(ManagerState.NORMAL, + Map.of( + Ample.DataLevel.ROOT, true, Ample.DataLevel.USER, true, Ample.DataLevel.METADATA, true), + onlineTables, + new LiveTServerSet.LiveTServersSnapshot(tservers, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)), + Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements); + } +}