This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch cassandra-19944 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 297bd82ab242a316d4facfe91530355ba26cc3dd Author: Ariel Weisberg <[email protected]> AuthorDate: Thu Sep 19 16:19:15 2024 -0400 Persist metadata synchronously --- .gitmodules | 4 +- modules/accord | 2 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 1 + .../org/apache/cassandra/db/memtable/Memtable.java | 6 + .../db/memtable/ShardedSkipListMemtable.java | 20 +++ .../cassandra/db/memtable/SkipListMemtable.java | 10 ++ .../service/accord/AccordCommandStore.java | 162 ++++++++++++++++----- .../service/accord/AccordCommandStores.java | 9 +- .../cassandra/service/accord/AccordKeyspace.java | 27 +++- .../cassandra/service/accord/AccordTestUtils.java | 17 +-- 10 files changed, 198 insertions(+), 60 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610..588a06ee42 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/aweisberg/cassandra-accord.git + branch = meta-persists-rebase diff --git a/modules/accord b/modules/accord index 486cd4bc15..42b2823672 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 486cd4bc15d33500b7b896f9e4691a38d946b679 +Subproject commit 42b28236726a425bb0a9790a010e659266e90e43 diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 9c1e6b40ee..1374acd086 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -248,6 +248,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner SCHEMA_CHANGE, OWNED_RANGES_CHANGE, ACCORD, + ACCORD_TXN_GC, UNIT_TESTS // explicitly requested flush needed for a test } diff --git a/src/java/org/apache/cassandra/db/memtable/Memtable.java b/src/java/org/apache/cassandra/db/memtable/Memtable.java index f722ec20a9..b34a617ef4 100644 --- a/src/java/org/apache/cassandra/db/memtable/Memtable.java +++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java @@ -30,6 +30,7 @@ import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredSource; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.schema.TableMetadata; @@ -441,4 +442,9 @@ public interface Memtable extends Comparable<Memtable>, UnfilteredSource super(copy.segmentId, copy.position); } } + + default Token lastToken() + { + throw new UnsupportedOperationException("lastToken is not supported"); + } } diff --git a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java index 92cdbbad9f..eb4a44ebd2 100644 --- a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java @@ -50,6 +50,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.IncludingExcludingBounds; import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.SSTableReadsListener; import org.apache.cassandra.schema.TableMetadata; @@ -112,6 +113,25 @@ public class ShardedSkipListMemtable extends AbstractShardedMemtable return true; } + @Override + public Token lastToken() + { + Token lastToken = null; + for (MemtableShard shard : shards) + { + Iterator<PartitionPosition> ppIterator = shard.partitions.descendingKeySet().iterator(); + if (ppIterator.hasNext()) + { + Token token = ppIterator.next().getToken(); + if (lastToken == null) + lastToken = token; + else if (lastToken.compareTo(token) < 0) + lastToken = token; + } + } + return lastToken; + } + /** * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate * OpOrdering. diff --git a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java index 8871b03bd6..3f6fbcbd52 100644 --- a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java @@ -50,6 +50,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.IncludingExcludingBounds; import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.SSTableReadsListener; import org.apache.cassandra.schema.TableMetadata; @@ -97,6 +98,15 @@ public class SkipListMemtable extends AbstractAllocatorMemtable return partitions.isEmpty(); } + @Override + public Token lastToken() + { + Iterator<PartitionPosition> iterator = partitions.keySet().iterator(); + if (iterator.hasNext()) + return iterator.next().getToken(); + return null; + } + /** * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate * OpOrdering. diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index c1ce231da9..a266de9697 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.accord; +import java.util.ArrayList; import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; @@ -26,22 +27,26 @@ import java.util.NavigableMap; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.api.Key; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.DataStore; +import accord.api.Key; import accord.api.LocalListeners; import accord.api.ProgressLog; -import accord.local.cfk.CommandsForKey; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.CommandStore; @@ -50,13 +55,16 @@ import accord.local.NodeTimeService; import accord.local.PreLoadContext; import accord.local.RedundantBefore; import accord.local.SafeCommandStore; +import accord.local.cfk.CommandsForKey; import accord.primitives.Keys; +import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.RoutableKey; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.Invariants; import accord.utils.ReducingRangeMap; +import accord.utils.TriConsumer; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import org.apache.cassandra.cache.CacheSize; @@ -64,14 +72,30 @@ import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ColumnFamilyStore.FlushReason; +import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.db.memtable.TrieMemtable; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableReadsListener; import org.apache.cassandra.metrics.AccordStateCacheMetrics; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.async.AsyncOperation; import org.apache.cassandra.service.accord.events.CacheEvents; import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; public class AccordCommandStore extends CommandStore implements CacheSize @@ -110,6 +134,8 @@ public class AccordCommandStore extends CommandStore implements CacheSize private long lastSystemTimestampMicros = Long.MIN_VALUE; private final CommandsForRangesLoader commandsForRangesLoader; + private static final TriConsumer<Object, BiConsumer<Object, Throwable>, Executor> addCallbackAdapter = (future, callback, executor) -> ((org.apache.cassandra.utils.concurrent.Future<?>)future).addCallback(callback, executor); + public AccordCommandStore(int id, NodeTimeService time, Agent agent, @@ -120,7 +146,17 @@ public class AccordCommandStore extends CommandStore implements CacheSize IJournal journal, AccordStateCacheMetrics cacheMetrics) { - this(id, time, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder, journal, Stage.READ.executor(), Stage.MUTATION.executor(), cacheMetrics); + this(id, + time, + agent, + dataStore, + progressLogFactory, + listenerFactory, + epochUpdateHolder, + journal, + Stage.READ.executor(), + Stage.MUTATION.executor(), + cacheMetrics); } private static <K, V> void registerJfrListener(int id, AccordStateCache.Instance<K, V, ?> instance, String name) @@ -203,7 +239,7 @@ public class AccordCommandStore extends CommandStore implements CacheSize ExecutorPlus saveExecutor, AccordStateCacheMetrics cacheMetrics) { - super(id, time, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder); + super(id, time, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder, addCallbackAdapter, AccordKeyspace::updateDurableBefore, AccordKeyspace::updateRedundantBefore, AccordKeyspace::updateBootstrapBeganAt, AccordKeyspace::updateSafeToRead); this.journal = journal; loggingId = String.format("[%s]", id); executor = executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id + ']'); @@ -245,9 +281,9 @@ public class AccordCommandStore extends CommandStore implements CacheSize if (rejectBefore != null) super.setRejectBefore(rejectBefore); if (durableBefore != null) - super.setDurableBefore(durableBefore); + super.setDurableBefore(DurableBefore.merge(durableBefore, durableBefore())); if (redundantBefore != null) - super.setRedundantBefore(redundantBefore); + super.setRedundantBefore(RedundantBefore.merge(redundantBefore, redundantBefore)); if (bootstrapBeganAt != null) super.setBootstrapBeganAt(bootstrapBeganAt); if (safeToRead != null) @@ -260,7 +296,7 @@ public class AccordCommandStore extends CommandStore implements CacheSize static Factory factory(AccordJournal journal, AccordStateCacheMetrics cacheMetrics) { - return (id, time, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch) -> + return (id, time, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch, scheduler) -> new AccordCommandStore(id, time, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch, journal, cacheMetrics); } @@ -547,36 +583,6 @@ public class AccordCommandStore extends CommandStore implements CacheSize AccordKeyspace.updateRejectBefore(this, newRejectBefore); } - protected void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) - { - super.setBootstrapBeganAt(newBootstrapBeganAt); - // TODO (required, correctness): rework to persist via journal once available, this can lose updates in some edge cases - AccordKeyspace.updateBootstrapBeganAt(this, newBootstrapBeganAt); - } - - protected void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) - { - super.setSafeToRead(newSafeToRead); - // TODO (required, correctness): rework to persist via journal once available, this can lose updates in some edge cases - AccordKeyspace.updateSafeToRead(this, newSafeToRead); - } - - @Override - public void setDurableBefore(DurableBefore newDurableBefore) - { - super.setDurableBefore(newDurableBefore); - AccordKeyspace.updateDurableBefore(this, newDurableBefore); - } - - @Override - protected void setRedundantBefore(RedundantBefore newRedundantBefore) - { - super.setRedundantBefore(newRedundantBefore); - // TODO (required): this needs to be synchronous, or at least needs to take effect before we rely upon it - AccordKeyspace.updateRedundantBefore(this, newRedundantBefore); - } - - public NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return super.bootstrapBeganAt(); } public NavigableMap<Timestamp, Ranges> safeToRead() { return super.safeToRead(); } public void appendCommands(List<SavedCommand.Writer<TxnId>> commands, List<Command> sanityCheck, Runnable onFlush) @@ -589,4 +595,86 @@ public class AccordCommandStore extends CommandStore implements CacheSize { return journal.loadCommand(id, txnId); } + + public Future<?> prepareForGC(@Nonnull Timestamp gcBefore, @Nonnull Ranges ranges) + { + checkNotNull(gcBefore, "gcBefore should not be null"); + checkNotNull(ranges, "ranges should not be null"); + ListMultimap<TableId, org.apache.cassandra.dht.Range<Token>> toPrepare = ArrayListMultimap.create(); + for (Range r : ranges) + { + TokenRange tr = (TokenRange)r; + TableId tableId = ((TokenRange) r).table(); + toPrepare.put(tableId, tr.toKeyspaceRange()); + } + + List<Future<CommitLogPosition>> flushes = new ArrayList<>(toPrepare.keySet().size()); + for (TableId tableId : toPrepare.keySet()) + { + List<org.apache.cassandra.dht.Range<Token>> tableRanges = toPrepare.get(tableId); + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs == null) + { + // TODO (review): Error handling? + logger.warn("Cannot prepare CFS with tableId {} for Accord GC because it does not exist", tableId); + continue; + } + + Memtable currentMemtable = cfs.getCurrentMemtable(); + if (currentMemtable.getMinTimestamp() > gcBefore.hlc()) + continue; + + boolean intersects = false; + // TrieMemtable doesn't support reverse iteration so can't find the last token + if (currentMemtable instanceof TrieMemtable) + intersects = true; + else + { + Token firstToken = null; + try (UnfilteredPartitionIterator iterator = currentMemtable.partitionIterator(ColumnFilter.all(cfs.metadata()), DataRange.allData(cfs.getPartitioner()), SSTableReadsListener.NOOP_LISTENER)) + { + if (iterator.hasNext()) + firstToken = iterator.next().partitionKey().getToken(); + } + Token lastToken = currentMemtable.lastToken(); + + if (firstToken != null) + { + checkState(lastToken != null); + if (firstToken.equals(lastToken)) + { + for (org.apache.cassandra.dht.Range<Token> tableRange : tableRanges) + { + if (tableRange.contains(firstToken)) + { + intersects = true; + break; + } + } + } + else + { + checkState(firstToken.compareTo(lastToken) < 0); + org.apache.cassandra.dht.Range<Token> memtableRange = new org.apache.cassandra.dht.Range<>(firstToken, lastToken); + for (org.apache.cassandra.dht.Range<Token> tableRange : tableRanges) + { + if (tableRange.intersects(memtableRange)) + { + intersects = true; + break; + } + } + } + } + } + + if (intersects) + flushes.add(cfs.forceFlush(FlushReason.ACCORD_TXN_GC)); + } + + if (flushes.isEmpty()) + return ImmediateFuture.success(null); + else + return FutureCombiner.allOf(flushes); + } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index 60cbcf84ff..46bd54ae8f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -24,6 +24,7 @@ import accord.api.ConfigurationService.EpochReady; import accord.api.DataStore; import accord.api.LocalListeners; import accord.api.ProgressLog; +import accord.api.Scheduler; import accord.local.CommandStores; import accord.local.Node; import accord.local.NodeTimeService; @@ -47,18 +48,18 @@ public class AccordCommandStores extends CommandStores implements CacheSize AccordCommandStores(NodeTimeService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory, - AccordJournal journal) + AccordJournal journal, Scheduler scheduler) { super(time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory, - AccordCommandStore.factory(journal, new AccordStateCacheMetrics(ACCORD_STATE_CACHE))); + AccordCommandStore.factory(journal, new AccordStateCacheMetrics(ACCORD_STATE_CACHE)), scheduler); setCapacity(DatabaseDescriptor.getAccordCacheSizeInMiB() << 20); this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this); } static Factory factory(AccordJournal journal) { - return (time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory) -> - new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory, journal); + return (time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory, scheduler) -> + new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory, journal, scheduler); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 8046b9d388..d133a2ee9c 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -31,11 +31,11 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; @@ -48,7 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Key; -import accord.local.cfk.CommandsForKey; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.CommandStore; @@ -58,6 +57,7 @@ import accord.local.RedundantBefore; import accord.local.SaveStatus; import accord.local.Status; import accord.local.Status.Durability; +import accord.local.cfk.CommandsForKey; import accord.primitives.Ranges; import accord.primitives.Routable; import accord.primitives.Route; @@ -151,9 +151,12 @@ import org.apache.cassandra.utils.Clock.Global; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.BTreeSet; import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; import static accord.utils.Invariants.checkArgument; import static accord.utils.Invariants.checkState; +import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.String.format; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; @@ -1687,23 +1690,33 @@ public class AccordKeyspace return updateCommandStoreMetadata(commandStore, "reject_before", rejectBefore, LocalVersionedSerializers.rejectBefore); } - public static Future<?> updateDurableBefore(CommandStore commandStore, DurableBefore durableBefore) + public static Future<?> updateDurableBefore(CommandStore commandStore, @Nullable Timestamp gcBefore, @Nullable Ranges ranges, DurableBefore durableBefore) { + checkArgument(gcBefore == null); + checkArgument(ranges == null); return updateCommandStoreMetadata(commandStore, "durable_before", durableBefore, LocalVersionedSerializers.durableBefore); } - public static Future<?> updateRedundantBefore(CommandStore commandStore, RedundantBefore redundantBefore) + public static Future<?> updateRedundantBefore(CommandStore commandStore, @Nonnull Timestamp gcBefore, @Nonnull Ranges ranges, RedundantBefore redundantBefore) { - return updateCommandStoreMetadata(commandStore, "redundant_before", redundantBefore, LocalVersionedSerializers.redundantBefore); + checkNotNull(gcBefore, "gcBefore should not be null"); + checkNotNull(ranges, "ranges should not be null"); + Future<?> tableUpdateFuture = updateCommandStoreMetadata(commandStore, "redundant_before", redundantBefore, LocalVersionedSerializers.redundantBefore); + Future<?> gcPrepareFuture = ((AccordCommandStore)commandStore).prepareForGC(gcBefore, ranges); + return FutureCombiner.allOf(tableUpdateFuture, gcPrepareFuture); } - public static Future<?> updateBootstrapBeganAt(CommandStore commandStore, NavigableMap<TxnId, Ranges> bootstrapBeganAt) + public static Future<?> updateBootstrapBeganAt(CommandStore commandStore, @Nullable Timestamp gcBefore, @Nullable Ranges ranges, NavigableMap<TxnId, Ranges> bootstrapBeganAt) { + checkArgument(gcBefore == null); + checkArgument(ranges == null); return updateCommandStoreMetadata(commandStore, "bootstrap_began_at", bootstrapBeganAt, LocalVersionedSerializers.bootstrapBeganAt); } - public static Future<?> updateSafeToRead(CommandStore commandStore, NavigableMap<Timestamp, Ranges> safeToRead) + public static Future<?> updateSafeToRead(CommandStore commandStore, @Nullable Timestamp gcBefore, @Nullable Ranges ranges, NavigableMap<Timestamp, Ranges> safeToRead) { + checkArgument(gcBefore == null); + checkArgument(ranges == null); return updateCommandStoreMetadata(commandStore, "safe_to_read", safeToRead, LocalVersionedSerializers.safeToRead); } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index 80a7b41769..58e749aad4 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -32,20 +32,15 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import com.google.common.collect.Sets; +import org.junit.Assert; +import accord.api.Data; import accord.api.LocalListeners; import accord.api.ProgressLog.NoOpProgressLog; import accord.api.RemoteListeners; -import accord.impl.DefaultLocalListeners; -import accord.utils.SortedArrays.SortedArrayList; -import org.apache.cassandra.ServerTestUtils; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.util.File; -import org.junit.Assert; - -import accord.api.Data; import accord.api.Result; import accord.api.RoutingKey; +import accord.impl.DefaultLocalListeners; import accord.impl.InMemoryCommandStore; import accord.local.Command; import accord.local.CommandStore; @@ -74,12 +69,15 @@ import accord.primitives.TxnId; import accord.primitives.Writes; import accord.topology.Shard; import accord.topology.Topology; +import accord.utils.SortedArrays.SortedArrayList; import accord.utils.async.AsyncChains; +import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.concurrent.ManualExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.AccordSpec; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.TransactionStatement; @@ -88,6 +86,7 @@ import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.metrics.AccordStateCacheMetrics; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; @@ -375,7 +374,7 @@ public class AccordTestUtils SingleEpochRanges holder = new SingleEpochRanges(Ranges.of(range)); InMemoryCommandStore.Synchronized result = new InMemoryCommandStore.Synchronized(0, time, new AccordAgent(), - null, null, cs -> null, holder); + null, null, cs -> null, holder, null); holder.set(result); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
