This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new f674e9a0b0 Moves the compaction commit process into FATE (#4109) f674e9a0b0 is described below commit f674e9a0b0a2b09090dae3d216c482ef26c0fde9 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jan 12 19:51:32 2024 -0500 Moves the compaction commit process into FATE (#4109) The custom refresh tracking code was removed and compaction commit was moved to being a FATE operation with the following 4 steps. 1. Rename file done in RenameCompactionFile class 2. Update the metadata table via a conditional mutation done in CommitCompaction class 3. Write the gc candidates done in PutGcCandidates class 4. Optionally send a RPC refresh request if the tablet was hosted done in RefreshTablet class There is some follow on work that still needs to be done to improve how this change works with detecting dead compactions. After that is done these changes should address problems outlined #3811 and #3802 that were related to process death before adding GC candidates. Now that GC candidates are written in FATE, if it dies it will run again later. This is currently storing the compaction commit FATE operations in zookeeper. This would not be suitable for a cluster because per tablet information should never be stored in zookeeper. However its fine as a temporary situation in the elasticity branch until FATE storage is availabe in an accumulo table, see #4049 and #3559 --- .../apache/accumulo/core/metadata/RootTable.java | 4 - .../accumulo/core/metadata/schema/Ample.java | 43 -- .../core/metadata/schema/MetadataSchema.java | 13 - .../accumulo/server/init/ZooKeeperInitializer.java | 3 - .../accumulo/server/metadata/RefreshesImpl.java | 254 ----------- .../accumulo/server/metadata/ServerAmpleImpl.java | 6 - .../java/org/apache/accumulo/manager/Manager.java | 3 +- .../coordinator/CompactionCoordinator.java | 481 ++------------------- .../coordinator/commit/CommitCompaction.java | 281 ++++++++++++ .../coordinator/commit/CompactionCommitData.java | 62 +++ .../coordinator/commit/PutGcCandidates.java | 51 +++ .../coordinator/commit/RefreshTablet.java | 64 +++ .../coordinator/commit/RenameCompactionFile.java | 92 ++++ .../accumulo/test/compaction/RefreshesIT.java | 97 ----- 14 files changed, 595 insertions(+), 859 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java index ce3c654908..7c00cc0e81 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java @@ -43,10 +43,6 @@ public class RootTable { * ZK path relative to the zookeeper node where the root tablet gc candidates are stored. */ public static final String ZROOT_TABLET_GC_CANDIDATES = ZROOT_TABLET + "/gc_candidates"; - /* - * ZK path relative to the zookeeper node where the root tablet refresh entries are stored. - */ - public static final String ZROOT_TABLET_REFRESHES = ZROOT_TABLET + "/refreshes"; public static final KeyExtent EXTENT = new KeyExtent(ID, null, null); public static final KeyExtent OLD_EXTENT = diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 98666a74c8..9bcaa83ef7 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -21,7 +21,6 @@ package org.apache.accumulo.core.metadata.schema; import java.util.Collection; import java.util.Iterator; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; @@ -675,46 +674,4 @@ public interface Ample { default void removeBulkLoadInProgressFlag(String path) { throw new UnsupportedOperationException(); } - - interface Refreshes { - static class RefreshEntry { - private final ExternalCompactionId ecid; - - private final KeyExtent extent; - private final TServerInstance tserver; - - public RefreshEntry(ExternalCompactionId ecid, KeyExtent extent, TServerInstance tserver) { - this.ecid = Objects.requireNonNull(ecid); - this.extent = Objects.requireNonNull(extent); - this.tserver = Objects.requireNonNull(tserver); - } - - public ExternalCompactionId getEcid() { - return ecid; - } - - public KeyExtent getExtent() { - return extent; - } - - public TServerInstance getTserver() { - return tserver; - } - } - - void add(Collection<RefreshEntry> entries); - - void delete(Collection<RefreshEntry> entries); - - Stream<RefreshEntry> stream(); - } - - /** - * Refresh entries in the metadata table are used to track hosted tablets that need to have their - * metadata refreshed after a compaction. These entries ensure the refresh happens even in the - * case of process death. - */ - default Refreshes refreshes(DataLevel dataLevel) { - throw new UnsupportedOperationException(); - } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index a69651e3d0..ceecfb0a77 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -533,17 +533,4 @@ public class MetadataSchema { return section.getRowPrefix(); } } - - public static class RefreshSection { - private static final Section section = - new Section(RESERVED_PREFIX + "refresh", true, RESERVED_PREFIX + "refresi", false); - - public static Range getRange() { - return section.getRange(); - } - - public static String getRowPrefix() { - return section.getRowPrefix(); - } - } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java index cbf59bc52c..719f468032 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java @@ -47,7 +47,6 @@ import org.apache.accumulo.server.conf.codec.VersionedPropCodec; import org.apache.accumulo.server.conf.codec.VersionedProperties; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.apache.accumulo.server.log.WalStateManager; -import org.apache.accumulo.server.metadata.RefreshesImpl; import org.apache.accumulo.server.metadata.RootGcCandidates; import org.apache.accumulo.server.tables.TableManager; import org.apache.zookeeper.KeeperException; @@ -137,8 +136,6 @@ public class ZooKeeperInitializer { ZooUtil.NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_GC_CANDIDATES, new RootGcCandidates().toJson().getBytes(UTF_8), ZooUtil.NodeExistsPolicy.FAIL); - zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_REFRESHES, - RefreshesImpl.getInitialJson().getBytes(UTF_8), ZooUtil.NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMANAGERS, EMPTY_BYTE_ARRAY, ZooUtil.NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMANAGER_LOCK, EMPTY_BYTE_ARRAY, diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/RefreshesImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/RefreshesImpl.java deleted file mode 100644 index 531136853b..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/RefreshesImpl.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.metadata; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_REFRESHES; -import static org.apache.accumulo.core.util.LazySingletons.GSON; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Collection; -import java.util.Map; -import java.util.Objects; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.RefreshSection; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.server.ServerContext; -import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -public class RefreshesImpl implements Ample.Refreshes { - - private static final Logger log = LoggerFactory.getLogger(RefreshesImpl.class); - - private final Ample.DataLevel dataLevel; - private final ServerContext context; - - public static String getInitialJson() { - return toJson(Map.of()); - } - - // the expected version of serialized data - private static final int CURRENT_VERSION = 1; - - // This class is used to serialize and deserialize root tablet metadata using GSon. Any changes to - // this class must consider persisted data. - private static class Data { - - final int version; - - final Map<String,String> entries; - - public Data(int version, Map<String,String> entries) { - this.version = version; - this.entries = entries; - } - } - - private static String toJson(Map<String,String> entries) { - var data = new Data(CURRENT_VERSION, Objects.requireNonNull(entries)); - return GSON.get().toJson(data, Data.class); - } - - private Map<String,String> fromJson(String json) { - Data data = GSON.get().fromJson(json, Data.class); - Preconditions.checkArgument(data.version == CURRENT_VERSION, "Expected version %s saw %s", - CURRENT_VERSION, data.version); - Objects.requireNonNull(data.entries); - return data.entries; - } - - public RefreshesImpl(ServerContext context, Ample.DataLevel dataLevel) { - this.context = context; - this.dataLevel = dataLevel; - } - - private void mutateRootRefreshes(Consumer<Map<String,String>> mutator) { - String zpath = context.getZooKeeperRoot() + ZROOT_TABLET_REFRESHES; - try { - context.getZooReaderWriter().mutateExisting(zpath, currVal -> { - String currJson = new String(currVal, UTF_8); - log.debug("Root refreshes before change : {}", currJson); - Map<String,String> entries = fromJson(currJson); - mutator.accept(entries); - String newJson = toJson(entries); - log.debug("Root refreshes after change : {}", newJson); - if (newJson.length() > 262_144) { - log.warn("Root refreshes stored in ZK at {} are getting large ({} bytes)." - + " Large nodes may cause problems for Zookeeper!", zpath, newJson.length()); - } - return newJson.getBytes(UTF_8); - }); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - private Text createRow(RefreshEntry entry) { - return new Text(RefreshSection.getRowPrefix() + entry.getEcid().canonical()); - } - - private Mutation createAddMutation(RefreshEntry entry) { - Mutation m = new Mutation(createRow(entry)); - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos)) { - dos.writeInt(1); - entry.getExtent().writeTo(dos); - dos.writeUTF(entry.getTserver().getHostPortSession()); - dos.close(); - m.at().family("").qualifier("").put(baos.toByteArray()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - - return m; - } - - private Mutation createDeleteMutation(RefreshEntry entry) { - Mutation m = new Mutation(createRow(entry)); - m.putDelete("", ""); - return m; - } - - private void requireRootTablets(Collection<RefreshEntry> entries) { - if (!entries.stream().allMatch(e -> e.getExtent().isRootTablet())) { - var nonRootTablets = entries.stream().map(RefreshEntry::getExtent) - .filter(e -> !e.isRootTablet()).collect(Collectors.toSet()); - throw new IllegalArgumentException("Expected only root tablet but saw " + nonRootTablets); - } - } - - private RefreshEntry decode(Map.Entry<Key,Value> entry) { - String row = entry.getKey().getRowData().toString(); - Preconditions.checkArgument(row.startsWith(RefreshSection.getRowPrefix()), - "Row %s did not start with %s", row, RefreshSection.getRowPrefix()); - Preconditions.checkArgument(entry.getKey().getColumnFamilyData().length() == 0, - "Expected empty family but saw %s", entry.getKey().getColumnFamilyData()); - Preconditions.checkArgument(entry.getKey().getColumnQualifierData().length() == 0, - "Expected empty qualifier but saw %s", entry.getKey().getColumnQualifierData()); - - try (ByteArrayInputStream bais = new ByteArrayInputStream(entry.getValue().get()); - DataInputStream dis = new DataInputStream(bais)) { - var version = dis.readInt(); - Preconditions.checkArgument(version == CURRENT_VERSION, "Expected version %s saw %s", - CURRENT_VERSION, version); - var extent = KeyExtent.readFrom(dis); - var tserver = new TServerInstance(dis.readUTF()); - return new RefreshEntry( - ExternalCompactionId.of(row.substring(RefreshSection.getRowPrefix().length())), extent, - tserver); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public void add(Collection<RefreshEntry> entries) { - Objects.requireNonNull(entries); - if (dataLevel == Ample.DataLevel.ROOT) { - // expect all of these to be the root tablet, verifying because its not stored - requireRootTablets(entries); - Consumer<Map<String,String>> mutator = map -> entries.forEach(refreshEntry -> map - .put(refreshEntry.getEcid().canonical(), refreshEntry.getTserver().getHostPortSession())); - mutateRootRefreshes(mutator); - } else { - try (BatchWriter writer = context.createBatchWriter(dataLevel.metaTable())) { - for (RefreshEntry entry : entries) { - writer.addMutation(createAddMutation(entry)); - } - } catch (MutationsRejectedException | TableNotFoundException e) { - throw new IllegalStateException(e); - } - } - } - - @Override - public void delete(Collection<RefreshEntry> entries) { - Objects.requireNonNull(entries); - if (dataLevel == Ample.DataLevel.ROOT) { - // expect all of these to be the root tablet, verifying because its not stored - requireRootTablets(entries); - Consumer<Map<String,String>> mutator = - map -> entries.forEach(refreshEntry -> map.remove(refreshEntry.getEcid().canonical())); - mutateRootRefreshes(mutator); - } else { - try (BatchWriter writer = context.createBatchWriter(dataLevel.metaTable())) { - for (RefreshEntry entry : entries) { - writer.addMutation(createDeleteMutation(entry)); - } - } catch (MutationsRejectedException | TableNotFoundException e) { - throw new IllegalStateException(e); - } - } - } - - @Override - public Stream<RefreshEntry> stream() { - if (dataLevel == Ample.DataLevel.ROOT) { - var zooReader = context.getZooReader(); - byte[] jsonBytes; - try { - jsonBytes = - zooReader.getData(context.getZooKeeperRoot() + RootTable.ZROOT_TABLET_REFRESHES); - return fromJson(new String(jsonBytes, UTF_8)).entrySet().stream() - .map(e -> new RefreshEntry(ExternalCompactionId.of(e.getKey()), RootTable.EXTENT, - new TServerInstance(e.getValue()))); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } else { - Range range = RefreshSection.getRange(); - - Scanner scanner; - try { - scanner = context.createScanner(dataLevel.metaTable(), Authorizations.EMPTY); - } catch (TableNotFoundException e) { - throw new IllegalStateException(e); - } - scanner.setRange(range); - return scanner.stream().onClose(scanner::close).map(this::decode); - } - } -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 316082b1d4..8e0beae740 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -338,10 +338,4 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample { throw new IllegalStateException(e); } } - - @Override - public Refreshes refreshes(DataLevel dataLevel) { - return new RefreshesImpl(context, dataLevel); - } - } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index cfb311f383..6bba307b05 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -941,7 +941,8 @@ public class Manager extends AbstractServer // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); - compactionCoordinator = new CompactionCoordinator(context, tserverSet, security, nextEvent); + compactionCoordinator = + new CompactionCoordinator(context, tserverSet, security, nextEvent, fateRefs); // Start the Manager's Client service // Ensure that calls before the manager gets the lock fail ManagerClientService.Iface haProxy = diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 41196776c2..3deae9424a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -26,8 +26,6 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; import java.io.FileNotFoundException; @@ -36,22 +34,16 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -76,25 +68,21 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; -import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry; import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; -import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; @@ -103,7 +91,6 @@ import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; -import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.cache.Caches.CacheName; @@ -114,8 +101,11 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.EventCoordinator; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; +import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; +import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; -import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionPluginUtils; @@ -135,10 +125,8 @@ import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Weigher; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; -import com.google.common.util.concurrent.MoreExecutors; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; @@ -159,22 +147,15 @@ public class CompactionCoordinator protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = new ConcurrentHashMap<>(); - /* - * When the manager starts up any refreshes that were in progress when the last manager process - * died must be completed before new refresh entries are written. This map of countdown latches - * helps achieve that goal. - */ - private final Map<Ample.DataLevel,CountDownLatch> refreshLatches; - /* Map of group name to last time compactor called to get a compaction job */ // ELASTICITY_TODO need to clean out groups that are no longer configured.. private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); private final ServerContext ctx; - private final LiveTServerSet tserverSet; private final SecurityOperation security; private final CompactionJobQueues jobQueues; private final EventCoordinator eventCoordinator; + private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances; // Exposed for tests protected volatile Boolean shutdown = false; @@ -188,9 +169,9 @@ public class CompactionCoordinator private final QueueMetrics queueMetrics; public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, - SecurityOperation security, EventCoordinator eventCoordinator) { + SecurityOperation security, EventCoordinator eventCoordinator, + AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) { this.ctx = ctx; - this.tserverSet = tservers; this.schedExecutor = this.ctx.getScheduledExecutor(); this.security = security; this.eventCoordinator = eventCoordinator; @@ -200,11 +181,7 @@ public class CompactionCoordinator this.queueMetrics = new QueueMetrics(jobQueues); - var refreshLatches = new EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class); - refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1)); - refreshLatches.put(Ample.DataLevel.METADATA, new CountDownLatch(1)); - refreshLatches.put(Ample.DataLevel.USER, new CountDownLatch(1)); - this.refreshLatches = Collections.unmodifiableMap(refreshLatches); + this.fateInstances = fateInstances; completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); @@ -262,59 +239,9 @@ public class CompactionCoordinator ThreadPools.watchNonCriticalScheduledTask(future); } - private void processRefreshes(Ample.DataLevel dataLevel) { - try (var refreshStream = ctx.getAmple().refreshes(dataLevel).stream()) { - // process batches of refresh entries to avoid reading all into memory at once - Iterators.partition(refreshStream.iterator(), 10000).forEachRemaining(refreshEntries -> { - LOG.info("Processing {} tablet refreshes for {}", refreshEntries.size(), dataLevel); - - var extents = - refreshEntries.stream().map(RefreshEntry::getExtent).collect(Collectors.toList()); - var tabletsMeta = new HashMap<KeyExtent,TabletMetadata>(); - try (var tablets = ctx.getAmple().readTablets().forTablets(extents, Optional.empty()) - .fetch(PREV_ROW, LOCATION, SCANS).build()) { - tablets.stream().forEach(tm -> tabletsMeta.put(tm.getExtent(), tm)); - } - - var tserverRefreshes = new HashMap<TabletMetadata.Location,List<TKeyExtent>>(); - - refreshEntries.forEach(refreshEntry -> { - var tm = tabletsMeta.get(refreshEntry.getExtent()); - - // only need to refresh if the tablet is still on the same tserver instance - if (tm != null && tm.getLocation() != null - && tm.getLocation().getServerInstance().equals(refreshEntry.getTserver())) { - KeyExtent extent = tm.getExtent(); - Collection<StoredTabletFile> scanfiles = tm.getScans(); - var ttr = extent.toThrift(); - tserverRefreshes.computeIfAbsent(tm.getLocation(), k -> new ArrayList<>()).add(ttr); - } - }); - - String logId = "Coordinator:" + dataLevel; - ThreadPoolExecutor threadPool = - ctx.threadPools().createFixedThreadPool(10, "Tablet refresh " + logId, false); - try { - TabletRefresher.refreshTablets(threadPool, logId, ctx, tserverSet::getCurrentServers, - tserverRefreshes); - } finally { - threadPool.shutdownNow(); - } - - ctx.getAmple().refreshes(dataLevel).delete(refreshEntries); - }); - } - // allow new refreshes to be written now that all preexisting ones are processed - refreshLatches.get(dataLevel).countDown(); - } - @Override public void run() { - processRefreshes(Ample.DataLevel.ROOT); - processRefreshes(Ample.DataLevel.METADATA); - processRefreshes(Ample.DataLevel.USER); - startCompactionCleaner(schedExecutor); startRunningCleaner(schedExecutor); @@ -669,58 +596,6 @@ public class CompactionCoordinator return this; } - class RefreshWriter { - - private final ExternalCompactionId ecid; - private final KeyExtent extent; - - private RefreshEntry writtenEntry; - - RefreshWriter(ExternalCompactionId ecid, KeyExtent extent) { - this.ecid = ecid; - this.extent = extent; - - var dataLevel = Ample.DataLevel.of(extent.tableId()); - try { - // Wait for any refresh entries from the previous manager process to be processed before - // writing new ones. - refreshLatches.get(dataLevel).await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - public void writeRefresh(TabletMetadata.Location location) { - Objects.requireNonNull(location); - - if (writtenEntry != null) { - if (location.getServerInstance().equals(writtenEntry.getTserver())) { - // the location was already written so nothing to do - return; - } else { - deleteRefresh(); - } - } - - var entry = new RefreshEntry(ecid, extent, location.getServerInstance()); - - ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId())).add(List.of(entry)); - - LOG.debug("wrote refresh entry for {}", ecid); - - writtenEntry = entry; - } - - public void deleteRefresh() { - if (writtenEntry != null) { - ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId())) - .delete(List.of(writtenEntry)); - LOG.debug("deleted refresh entry for {}", ecid); - writtenEntry = null; - } - } - } - private Optional<CompactionConfig> getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { if (metaJob.getJob().getKind() == CompactionKind.USER && metaJob.getTabletMetadata().getSelectedFiles() != null) { @@ -738,11 +613,6 @@ public class CompactionCoordinator * <ol> * <li>Reads the tablets metadata and determines if the compaction can commit. Its possible that * things changed while the compaction was running and it can no longer commit.</li> - * <li>If the compaction can commit then a ~refresh entry may be written to the metadata table. - * This is done before attempting to commit to cover the case of process failure after commit. If - * the manager dies after commit then when it restarts it will see the ~refresh entry and refresh - * that tablet. The ~refresh entry is only written when its a system compaction on a tablet with a - * location.</li> * <li>Commit the compaction using a conditional mutation. If the tablets files or location * changed since reading the tablets metadata, then conditional mutation will fail. When this * happens it will reread the metadata and go back to step 1 conceptually. When committing a @@ -752,7 +622,6 @@ public class CompactionCoordinator * <li>After successful commit a refresh request is sent to the tablet if it has a location. This * will cause the tablet to start using the newly compacted files for future scans. Also the * tablet can delete the scan entries if there are no active scans using them.</li> - * <li>If a ~refresh entry was written, delete it since the refresh was successful.</li> * </ol> * * <p> @@ -762,21 +631,6 @@ public class CompactionCoordinator * refresh was actually done. Therefore, user compactions will refresh as part of the fate * operation so that it's known to be done before the fate operation returns. Since the fate * operation will do it, there is no need to do it here for user compactions. - * </p> - * - * <p> - * The ~refresh entries serve a similar purpose to FATE operations, it ensures that code executes - * even when a process dies. FATE was intentionally not used for compaction commit because FATE - * stores its data in zookeeper. The refresh entry is stored in the metadata table, which is much - * more scalable than zookeeper. The number of system compactions of small files could be large - * and this would be a large number of writes to zookeeper. Zookeeper scales somewhat with reads, - * but not with writes. - * </p> - * - * <p> - * Issue #3559 was opened to explore the possibility of making compaction commit a fate operation - * which would remove the need for the ~refresh section. - * </p> * * @param tinfo trace info * @param credentials tcredentials object @@ -795,7 +649,19 @@ public class CompactionCoordinator SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } + // maybe fate has not started yet + var localFates = fateInstances.get(); + while (localFates == null) { + UtilWaitThread.sleep(100); + if (shutdown) { + return; + } + localFates = fateInstances.get(); + } + var extent = KeyExtent.fromThrift(textent); + var localFate = localFates.get(FateInstanceType.fromTableId(extent.tableId())); + LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, extent); final var ecid = ExternalCompactionId.of(externalCompactionId); @@ -803,286 +669,35 @@ public class CompactionCoordinator var tabletMeta = ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); - if (!canCommitCompaction(ecid, tabletMeta)) { + if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) { return; } CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); - - // ELASTICITY_TODO this code does not handle race conditions or faults. Need to ensure refresh - // happens in the case of manager process death between commit and refresh. - ReferencedTabletFile newDatafile = - TabletNameGenerator.computeCompactionFileDest(ecm.getCompactTmpName()); - - Optional<ReferencedTabletFile> optionalNewFile; - try { - optionalNewFile = renameOrDeleteFile(stats, ecm, newDatafile); - } catch (IOException e) { - LOG.warn("Can not commit complete compaction {} because unable to delete or rename {} ", ecid, - ecm.getCompactTmpName(), e); - compactionFailed(Map.of(ecid, extent)); - return; - } - - RefreshWriter refreshWriter = new RefreshWriter(ecid, extent); - - try { - tabletMeta = commitCompaction(stats, ecid, tabletMeta, optionalNewFile, refreshWriter); - } catch (RuntimeException e) { - LOG.warn("Failed to commit complete compaction {} {}", ecid, extent, e); - compactionFailed(Map.of(ecid, extent)); - } - - if (ecm.getKind() != CompactionKind.USER) { - refreshTablet(tabletMeta); - } - - // if a refresh entry was written, it can be removed after the tablet was refreshed - refreshWriter.deleteRefresh(); + var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, extent, ecm, stats)); + + // ELASTICITY_TODO add tag to fate that ECID can be added to. This solves two problem. First it + // defends against starting multiple fate txs for the same thing. This will help the split code + // also. Second the tag can be used by the dead compaction detector to ignore committing + // compactions. The imple coould hash the key to produce the fate tx id. + var txid = localFate.startTransaction(); + localFate.seedTransaction("COMMIT_COMPACTION", txid, renameOp, true, + "Commit compaction " + ecid); + + // ELASTICITY_TODO need to remove this wait. It is here because when the dead compaction + // detector ask a compactor what its currently running it expects that cover commit. To remove + // this wait would need another way for the dead compaction detector to know about committing + // compactions. Could add a tag to the fate tx with the ecid and have dead compaction detector + // scan these tags. This wait makes the code running in fate not be fault tolerant because in + // the + // case of faults the dead compaction detector may remove the compaction entry. + localFate.waitForCompletion(txid); // It's possible that RUNNING might not have an entry for this ecid in the case // of a coordinator restart when the Coordinator can't find the TServer for the // corresponding external compaction. recordCompletion(ecid); - - // This will causes the tablet to be reexamined to see if it needs any more compactions. - eventCoordinator.event(extent, "Compaction completed %s", extent); - } - - private Optional<ReferencedTabletFile> renameOrDeleteFile(TCompactionStats stats, - CompactionMetadata ecm, ReferencedTabletFile newDatafile) throws IOException { - if (stats.getEntriesWritten() == 0) { - // the compaction produced no output so do not need to rename or add a file to the metadata - // table, only delete the input files. - if (!ctx.getVolumeManager().delete(ecm.getCompactTmpName().getPath())) { - throw new IOException("delete returned false"); - } - - return Optional.empty(); - } else { - if (!ctx.getVolumeManager().rename(ecm.getCompactTmpName().getPath(), - newDatafile.getPath())) { - throw new IOException("rename returned false"); - } - - return Optional.of(newDatafile); - } - } - - private void refreshTablet(TabletMetadata metadata) { - var location = metadata.getLocation(); - if (location != null) { - KeyExtent extent = metadata.getExtent(); - - // there is a single tserver and single tablet, do not need a thread pool. The direct executor - // will run everything in the current thread - ExecutorService executorService = MoreExecutors.newDirectExecutorService(); - try { - TabletRefresher.refreshTablets(executorService, - "compaction:" + metadata.getExtent().toString(), ctx, tserverSet::getCurrentServers, - Map.of(metadata.getLocation(), List.of(extent.toThrift()))); - } finally { - executorService.shutdownNow(); - } - } - } - - // ELASTICITY_TODO unit test this method - private boolean canCommitCompaction(ExternalCompactionId ecid, TabletMetadata tabletMetadata) { - - if (tabletMetadata == null) { - LOG.debug("Received completion notification for nonexistent tablet {}", ecid); - return false; - } - - var extent = tabletMetadata.getExtent(); - - if (tabletMetadata.getOperationId() != null) { - // split, merge, and delete tablet should delete the compaction entry in the tablet - LOG.debug("Received completion notification for tablet with active operation {} {} {}", ecid, - extent, tabletMetadata.getOperationId()); - return false; - } - - CompactionMetadata ecm = tabletMetadata.getExternalCompactions().get(ecid); - - if (ecm == null) { - LOG.debug("Received completion notification for unknown compaction {} {}", ecid, extent); - return false; - } - - if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { - if (tabletMetadata.getSelectedFiles() == null) { - // when the compaction is canceled, selected files are deleted - LOG.debug( - "Received completion notification for user compaction and tablet has no selected files {} {}", - ecid, extent); - return false; - } - - if (ecm.getFateTxId() != tabletMetadata.getSelectedFiles().getFateTxId()) { - // maybe the compaction was cancled and another user compaction was started on the tablet. - LOG.debug( - "Received completion notification for user compaction where its fate txid did not match the tablets {} {} {} {}", - ecid, extent, FateTxId.formatTid(ecm.getFateTxId()), - FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId())); - } - - if (!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) { - // this is not expected to happen - LOG.error("User compaction contained files not in the selected set {} {} {} {} {}", - tabletMetadata.getExtent(), ecid, ecm.getKind(), - Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles), - ecm.getJobFiles()); - return false; - } - } - - if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) { - // this is not expected to happen - LOG.error("Compaction contained files not in the tablet files set {} {} {} {}", - tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), ecm.getJobFiles()); - return false; - } - - return true; - } - - private TabletMetadata commitCompaction(TCompactionStats stats, ExternalCompactionId ecid, - TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, - RefreshWriter refreshWriter) { - - KeyExtent extent = tablet.getExtent(); - - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); - - while (canCommitCompaction(ecid, tablet)) { - CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); - - // the compacted files should not exists in the tablet already - var tablet2 = tablet; - newDatafile.ifPresent( - newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()), - "File already exists in tablet %s %s", newFile, tablet2.getFiles())); - - if (tablet.getLocation() != null - && tablet.getExternalCompactions().get(ecid).getKind() != CompactionKind.USER) { - // Write the refresh entry before attempting to update tablet metadata, this ensures that - // refresh will happen even if this process dies. In the case where this process does not - // die refresh will happen after commit. User compactions will make refresh calls in their - // fate operation, so it does not need to be done here. - refreshWriter.writeRefresh(tablet.getLocation()); - } - - try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { - var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() - .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION); - - if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { - tabletMutator.requireSame(tablet, SELECTED, COMPACTED); - } - - // make the needed updates to the tablet - updateTabletForCompaction(stats, ecid, tablet, newDatafile, extent, ecm, tabletMutator); - - tabletMutator - .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); - - // TODO expensive logging - LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile, - ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName) - .collect(Collectors.toList())); - - // ELASTICITY_TODO check return value and retry, could fail because of race conditions - var result = tabletsMutator.process().get(extent); - if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { - // compaction was committed, mark the compaction input files for deletion - // - // ELASTICITIY_TODO in the case of process death the GC candidates would never be added - // like #3811. If compaction commit were moved to FATE per #3559 then this would not - // be an issue. If compaction commit is never moved to FATE, then this addition could - // moved to the compaction refresh process. The compaction refresh process will go away - // if compaction commit is moved to FATE, so should only do this if not moving to FATE. - ctx.getAmple().putGcCandidates(extent.tableId(), ecm.getJobFiles()); - break; - } else { - // compaction failed to commit, maybe something changed on the tablet so lets reread the - // metadata and try again - tablet = result.readMetadata(); - } - - retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for tablet " + extent); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - return tablet; - } - - private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, - TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, KeyExtent extent, - CompactionMetadata ecm, Ample.ConditionalTabletMutator tabletMutator) { - // ELASTICITY_TODO improve logging adapt to use existing tablet files logging - if (ecm.getKind() == CompactionKind.USER) { - if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { - // all files selected for the user compactions are finished, so the tablet is finish and - // its compaction id needs to be updated. - - long fateTxId = tablet.getSelectedFiles().getFateTxId(); - - Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId), - "Tablet %s unexpected has selected files and compacted columns for %s", - tablet.getExtent(), fateTxId); - - // TODO set to trace - LOG.debug("All selected files compcated for {} setting compacted for {}", - tablet.getExtent(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); - - tabletMutator.deleteSelectedFiles(); - tabletMutator.putCompacted(fateTxId); - - } else { - // not all of the selected files were finished, so need to add the new file to the - // selected set - - Set<StoredTabletFile> newSelectedFileSet = - new HashSet<>(tablet.getSelectedFiles().getFiles()); - newSelectedFileSet.removeAll(ecm.getJobFiles()); - - if (newDatafile.isPresent()) { - // TODO set to trace - LOG.debug( - "Not all selected files for {} are done, adding new selected file {} from compaction", - tablet.getExtent(), newDatafile.orElseThrow().getPath().getName()); - newSelectedFileSet.add(newDatafile.orElseThrow().insert()); - } else { - // TODO set to trace - LOG.debug( - "Not all selected files for {} are done, compaction produced no output so not adding to selected set.", - tablet.getExtent()); - } - - tabletMutator.putSelectedFiles( - new SelectedFiles(newSelectedFileSet, tablet.getSelectedFiles().initiallySelectedAll(), - tablet.getSelectedFiles().getFateTxId())); - } - } - - if (tablet.getLocation() != null) { - // add scan entries to prevent GC in case the hosted tablet is currently using the files for - // scan - ecm.getJobFiles().forEach(tabletMutator::putScan); - } - ecm.getJobFiles().forEach(tabletMutator::deleteFile); - tabletMutator.deleteExternalCompaction(ecid); - - if (newDatafile.isPresent()) { - tabletMutator.putFile(newDatafile.orElseThrow(), - new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); - } + // ELASTICITY_TODO should above call move into fate code? } @Override @@ -1330,11 +945,6 @@ public class CompactionCoordinator cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); } - /* Method exists to be overridden in test to hide static method */ - protected String getTServerAddressString(HostAndPort tserverAddress) { - return ExternalCompactionUtil.getHostPortString(tserverAddress); - } - /* Method exists to be overridden in test to hide static method */ protected List<RunningCompaction> getCompactionsRunningOnCompactors() { return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); @@ -1346,11 +956,6 @@ public class CompactionCoordinator ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, externalCompactionId); } - /* Method exists to be overridden in test to hide static method */ - protected void returnTServerClient(TabletServerClientService.Client client) { - ThriftUtil.returnClient(client, this.ctx); - } - private void deleteEmpty(ZooReaderWriter zoorw, String path) throws KeeperException, InterruptedException { try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java new file mode 100644 index 0000000000..6063a3b38f --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator.commit; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.AbstractTabletFile; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class CommitCompaction extends ManagerRepo { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(CommitCompaction.class); + private final CompactionCommitData commitData; + private final String newDatafile; + + public CommitCompaction(CompactionCommitData commitData, String newDatafile) { + this.commitData = commitData; + this.newDatafile = newDatafile; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + var ecid = ExternalCompactionId.of(commitData.ecid); + var newFile = Optional.ofNullable(newDatafile).map(f -> ReferencedTabletFile.of(new Path(f))); + + // ELASTICITIY_TODO is it possible to test this code running a 2nd time, simulating a failure + // and rerun? Maybe fate could have a testing mode where it calls operations multiple times? + + // It is possible that when this runs that the compaction was previously committed and then the + // process died and now its running again. In this case commit should do nothing, but its + // important to still carry on with the rest of the steps after commit. This code ignores a that + // fact that a commit may not have happened in the current call and continues for this reason. + TabletMetadata tabletMetadata = commitCompaction(manager.getContext(), ecid, newFile); + + String loc = null; + if (tabletMetadata != null && tabletMetadata.getLocation() != null) { + loc = tabletMetadata.getLocation().getHostPortSession(); + } + + // This will causes the tablet to be reexamined to see if it needs any more compactions. + var extent = KeyExtent.fromThrift(commitData.textent); + manager.getEventCoordinator().event(extent, "Compaction completed %s", extent); + + return new PutGcCandidates(commitData, loc); + } + + KeyExtent getExtent() { + return KeyExtent.fromThrift(commitData.textent); + } + + private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid, + Optional<ReferencedTabletFile> newDatafile) { + + var tablet = + ctx.getAmple().readTablet(getExtent(), ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); + + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) + .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.5) + .logInterval(3, MINUTES).createRetry(); + + while (canCommitCompaction(ecid, tablet)) { + CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); + + // the compacted files should not exists in the tablet already + var tablet2 = tablet; + newDatafile.ifPresent( + newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()), + "File already exists in tablet %s %s", newFile, tablet2.getFiles())); + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation() + .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION); + + if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { + tabletMutator.requireSame(tablet, SELECTED, COMPACTED); + } + + // make the needed updates to the tablet + updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator); + + tabletMutator + .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + + // TODO expensive logging + LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile, + ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList())); + + var result = tabletsMutator.process().get(getExtent()); + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + break; + } else { + // compaction failed to commit, maybe something changed on the tablet so lets reread the + // metadata and try again + tablet = result.readMetadata(); + } + + retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for tablet " + getExtent()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + return tablet; + } + + private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, + TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm, + Ample.ConditionalTabletMutator tabletMutator) { + // ELASTICITY_TODO improve logging adapt to use existing tablet files logging + if (ecm.getKind() == CompactionKind.USER) { + if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { + // all files selected for the user compactions are finished, so the tablet is finish and + // its compaction id needs to be updated. + + long fateTxId = tablet.getSelectedFiles().getFateTxId(); + + Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId), + "Tablet %s unexpected has selected files and compacted columns for %s", + tablet.getExtent(), fateTxId); + + // TODO set to trace + LOG.debug("All selected files compcated for {} setting compacted for {}", + tablet.getExtent(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + + tabletMutator.deleteSelectedFiles(); + tabletMutator.putCompacted(fateTxId); + + } else { + // not all of the selected files were finished, so need to add the new file to the + // selected set + + Set<StoredTabletFile> newSelectedFileSet = + new HashSet<>(tablet.getSelectedFiles().getFiles()); + newSelectedFileSet.removeAll(ecm.getJobFiles()); + + if (newDatafile.isPresent()) { + // TODO set to trace + LOG.debug( + "Not all selected files for {} are done, adding new selected file {} from compaction", + tablet.getExtent(), newDatafile.orElseThrow().getPath().getName()); + newSelectedFileSet.add(newDatafile.orElseThrow().insert()); + } else { + // TODO set to trace + LOG.debug( + "Not all selected files for {} are done, compaction produced no output so not adding to selected set.", + tablet.getExtent()); + } + + tabletMutator.putSelectedFiles( + new SelectedFiles(newSelectedFileSet, tablet.getSelectedFiles().initiallySelectedAll(), + tablet.getSelectedFiles().getFateTxId())); + } + } + + if (tablet.getLocation() != null) { + // add scan entries to prevent GC in case the hosted tablet is currently using the files for + // scan + ecm.getJobFiles().forEach(tabletMutator::putScan); + } + ecm.getJobFiles().forEach(tabletMutator::deleteFile); + tabletMutator.deleteExternalCompaction(ecid); + + if (newDatafile.isPresent()) { + tabletMutator.putFile(newDatafile.orElseThrow(), + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); + } + } + + // ELASTICITY_TODO unit test this method + public static boolean canCommitCompaction(ExternalCompactionId ecid, + TabletMetadata tabletMetadata) { + + if (tabletMetadata == null) { + LOG.debug("Received completion notification for nonexistent tablet {}", ecid); + return false; + } + + var extent = tabletMetadata.getExtent(); + + if (tabletMetadata.getOperationId() != null) { + // split, merge, and delete tablet should delete the compaction entry in the tablet + LOG.debug("Received completion notification for tablet with active operation {} {} {}", ecid, + extent, tabletMetadata.getOperationId()); + return false; + } + + CompactionMetadata ecm = tabletMetadata.getExternalCompactions().get(ecid); + + if (ecm == null) { + LOG.debug("Received completion notification for unknown compaction {} {}", ecid, extent); + return false; + } + + if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { + if (tabletMetadata.getSelectedFiles() == null) { + // when the compaction is canceled, selected files are deleted + LOG.debug( + "Received completion notification for user compaction and tablet has no selected files {} {}", + ecid, extent); + return false; + } + + if (ecm.getFateTxId() != tabletMetadata.getSelectedFiles().getFateTxId()) { + // maybe the compaction was cancled and another user compaction was started on the tablet. + LOG.debug( + "Received completion notification for user compaction where its fate txid did not match the tablets {} {} {} {}", + ecid, extent, FateTxId.formatTid(ecm.getFateTxId()), + FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId())); + } + + if (!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) { + // this is not expected to happen + LOG.error("User compaction contained files not in the selected set {} {} {} {} {}", + tabletMetadata.getExtent(), ecid, ecm.getKind(), + Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles), + ecm.getJobFiles()); + return false; + } + } + + if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) { + // this is not expected to happen + LOG.error("Compaction contained files not in the tablet files set {} {} {} {}", + tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), ecm.getJobFiles()); + return false; + } + + return true; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java new file mode 100644 index 0000000000..23b293c25e --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CompactionCommitData.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator.commit; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; + +public class CompactionCommitData implements Serializable { + private static final long serialVersionUID = 1L; + final CompactionKind kind; + final Set<String> inputPaths; + final String outputTmpPath; + final String ecid; + final TKeyExtent textent; + final TCompactionStats stats; + + public CompactionCommitData(ExternalCompactionId ecid, KeyExtent extent, CompactionMetadata ecm, + TCompactionStats stats) { + this.ecid = ecid.canonical(); + this.textent = extent.toThrift(); + this.kind = ecm.getKind(); + this.inputPaths = + ecm.getJobFiles().stream().map(StoredTabletFile::getMetadata).collect(Collectors.toSet()); + this.outputTmpPath = ecm.getCompactTmpName().getNormalizedPathStr(); + this.stats = stats; + } + + public TableId getTableId() { + return KeyExtent.fromThrift(textent).tableId(); + } + + public Collection<StoredTabletFile> getJobFiles() { + return inputPaths.stream().map(StoredTabletFile::of).collect(Collectors.toList()); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java new file mode 100644 index 0000000000..6ce5e37ea7 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/PutGcCandidates.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator.commit; + +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; + +public class PutGcCandidates extends ManagerRepo { + private static final long serialVersionUID = 1L; + private final CompactionCommitData commitData; + private final String refreshLocation; + + public PutGcCandidates(CompactionCommitData commitData, String refreshLocation) { + this.commitData = commitData; + this.refreshLocation = refreshLocation; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + + // add the GC candidates + manager.getContext().getAmple().putGcCandidates(commitData.getTableId(), + commitData.getJobFiles()); + + if (commitData.kind == CompactionKind.USER || refreshLocation == null) { + // user compactions will refresh tablets as part of the FATE operation driving the user + // compaction, so no need to do it here + return null; + } + + return new RefreshTablet(commitData.textent, refreshLocation); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java new file mode 100644 index 0000000000..d2044990a0 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RefreshTablet.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator.commit; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher; + +import com.google.common.util.concurrent.MoreExecutors; + +public class RefreshTablet extends ManagerRepo { + private static final long serialVersionUID = 1L; + private final TKeyExtent extent; + private final String tserverInstance; + + public RefreshTablet(TKeyExtent extent, String tserverInstance) { + this.extent = extent; + this.tserverInstance = tserverInstance; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + + TServerInstance tsi = new TServerInstance(tserverInstance); + + // there is a single tserver and single tablet, do not need a thread pool. The direct executor + // will run everything in the current thread + ExecutorService executorService = MoreExecutors.newDirectExecutorService(); + try { + TabletRefresher.refreshTablets(executorService, "compaction:" + KeyExtent.fromThrift(extent), + manager.getContext(), manager::onlineTabletServers, + Map.of(TabletMetadata.Location.current(tsi), List.of(extent))); + } finally { + executorService.shutdownNow(); + } + + return null; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java new file mode 100644 index 0000000000..2261ae7bf3 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/RenameCompactionFile.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator.commit; + +import java.io.IOException; + +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RenameCompactionFile extends ManagerRepo { + private static final Logger log = LoggerFactory.getLogger(RenameCompactionFile.class); + private static final long serialVersionUID = 1L; + private final CompactionCommitData commitData; + + public RenameCompactionFile(CompactionCommitData commitData) { + this.commitData = commitData; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + ReferencedTabletFile newDatafile = null; + var ctx = manager.getContext(); + + var tmpPath = new Path(commitData.outputTmpPath); + + if (commitData.stats.getEntriesWritten() == 0) { + // the compaction produced no output so do not need to rename or add a file to the metadata + // table, only delete the input files. + try { + if (!ctx.getVolumeManager().delete(tmpPath)) { + throw new IOException("delete returned false for " + tmpPath); + } + } catch (IOException ioe) { + // Log something in case there is an exception while doing the check, will have the original + // exception. + log.debug("Attempting to see if file exists after delete failure of {}", tmpPath, ioe); + if (!ctx.getVolumeManager().exists(tmpPath)) { + log.debug( + "Failed to delete file {}, but it does not exists. Assuming this is a 2nd run.", + tmpPath, ioe); + } else { + throw ioe; + } + } + } else { + newDatafile = TabletNameGenerator.computeCompactionFileDest(ReferencedTabletFile.of(tmpPath)); + try { + if (!ctx.getVolumeManager().rename(tmpPath, newDatafile.getPath())) { + throw new IOException("rename returned false for " + tmpPath); + } + } catch (IOException ioe) { + // Log something in case there is an exception while doing the check, will have the original + // exception. + log.debug("Attempting to see if file exists after rename failure of {} to {}", tmpPath, + newDatafile, ioe); + if (ctx.getVolumeManager().exists(newDatafile.getPath()) + && !ctx.getVolumeManager().exists(tmpPath)) { + log.debug( + "Failed to rename {} to {}, but destination exists and source does not. Assuming this is a 2nd run.", + tmpPath, newDatafile, ioe); + } else { + throw ioe; + } + } + } + + return new CommitCompaction(commitData, + newDatafile == null ? null : newDatafile.getNormalizedPathStr()); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/RefreshesIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/RefreshesIT.java deleted file mode 100644 index 053f01d5a3..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/compaction/RefreshesIT.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.compaction; - -import static java.util.stream.Collectors.toMap; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; - -/** - * Tests reading and writing refesh entries. This is done as a an IT instead of a unit test because - * it would take too much code to mock ZK and the metadata table. - */ -public class RefreshesIT extends ConfigurableMacBase { - private void testRefreshes(Ample.DataLevel level, KeyExtent extent1, KeyExtent extent2) { - var refreshes = getServerContext().getAmple().refreshes(level); - - assertEquals(0, refreshes.stream().count()); - - var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); - var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); - var ecid3 = ExternalCompactionId.generate(UUID.randomUUID()); - - var tserver1 = new TServerInstance("host1:9997[abcdef123]"); - var tserver2 = new TServerInstance("host2:9997[1234567890]"); - - refreshes.add(List.of(new RefreshEntry(ecid1, extent1, tserver1))); - - assertEquals(Map.of(ecid1, extent1), - refreshes.stream().collect(toMap(RefreshEntry::getEcid, RefreshEntry::getExtent))); - assertEquals(Map.of(ecid1, tserver1), - refreshes.stream().collect(toMap(RefreshEntry::getEcid, RefreshEntry::getTserver))); - - refreshes.add(List.of(new RefreshEntry(ecid2, extent2, tserver1), - new RefreshEntry(ecid3, extent2, tserver2))); - - assertEquals(Map.of(ecid1, extent1, ecid2, extent2, ecid3, extent2), - refreshes.stream().collect(toMap(RefreshEntry::getEcid, RefreshEntry::getExtent))); - assertEquals(Map.of(ecid1, tserver1, ecid2, tserver1, ecid3, tserver2), - refreshes.stream().collect(toMap(RefreshEntry::getEcid, RefreshEntry::getTserver))); - - refreshes.delete(List.of(new RefreshEntry(ecid2, extent2, tserver1))); - - assertEquals(Map.of(ecid1, extent1, ecid3, extent2), - refreshes.stream().collect(toMap(RefreshEntry::getEcid, RefreshEntry::getExtent))); - assertEquals(Map.of(ecid1, tserver1, ecid3, tserver2), - refreshes.stream().collect(toMap(RefreshEntry::getEcid, RefreshEntry::getTserver))); - - refreshes.delete(List.of(new RefreshEntry(ecid3, extent2, tserver2), - new RefreshEntry(ecid1, extent1, tserver1))); - - assertEquals(0, refreshes.stream().count()); - } - - @Test - public void testRefreshStorage() { - var extent1 = new KeyExtent(TableId.of("1"), null, null); - var extent2 = new KeyExtent(TableId.of("2"), new Text("m"), new Text("c")); - - // the root level only expects the root tablet extent - assertThrows(IllegalArgumentException.class, - () -> testRefreshes(Ample.DataLevel.ROOT, extent1, extent2)); - testRefreshes(Ample.DataLevel.ROOT, RootTable.EXTENT, RootTable.EXTENT); - testRefreshes(Ample.DataLevel.METADATA, extent1, extent2); - testRefreshes(Ample.DataLevel.USER, extent1, extent2); - } -}