This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cassandra-19944
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 297bd82ab242a316d4facfe91530355ba26cc3dd
Author: Ariel Weisberg <[email protected]>
AuthorDate: Thu Sep 19 16:19:15 2024 -0400

    Persist metadata synchronously
---
 .gitmodules                                        |   4 +-
 modules/accord                                     |   2 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   1 +
 .../org/apache/cassandra/db/memtable/Memtable.java |   6 +
 .../db/memtable/ShardedSkipListMemtable.java       |  20 +++
 .../cassandra/db/memtable/SkipListMemtable.java    |  10 ++
 .../service/accord/AccordCommandStore.java         | 162 ++++++++++++++++-----
 .../service/accord/AccordCommandStores.java        |   9 +-
 .../cassandra/service/accord/AccordKeyspace.java   |  27 +++-
 .../cassandra/service/accord/AccordTestUtils.java  |  17 +--
 10 files changed, 198 insertions(+), 60 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 616dacf610..588a06ee42 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/apache/cassandra-accord.git
-       branch = trunk
+       url = https://github.com/aweisberg/cassandra-accord.git
+       branch = meta-persists-rebase
diff --git a/modules/accord b/modules/accord
index 486cd4bc15..42b2823672 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 486cd4bc15d33500b7b896f9e4691a38d946b679
+Subproject commit 42b28236726a425bb0a9790a010e659266e90e43
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9c1e6b40ee..1374acd086 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -248,6 +248,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
         SCHEMA_CHANGE,
         OWNED_RANGES_CHANGE,
         ACCORD,
+        ACCORD_TXN_GC,
         UNIT_TESTS // explicitly requested flush needed for a test
     }
 
diff --git a/src/java/org/apache/cassandra/db/memtable/Memtable.java 
b/src/java/org/apache/cassandra/db/memtable/Memtable.java
index f722ec20a9..b34a617ef4 100644
--- a/src/java/org/apache/cassandra/db/memtable/Memtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSource;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.schema.TableMetadata;
@@ -441,4 +442,9 @@ public interface Memtable extends Comparable<Memtable>, 
UnfilteredSource
             super(copy.segmentId, copy.position);
         }
     }
+
+   default Token lastToken()
+   {
+       throw new UnsupportedOperationException("lastToken is not supported");
+   }
 }
diff --git 
a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java 
b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java
index 92cdbbad9f..eb4a44ebd2 100644
--- a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.IncludingExcludingBounds;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.SSTableReadsListener;
 import org.apache.cassandra.schema.TableMetadata;
@@ -112,6 +113,25 @@ public class ShardedSkipListMemtable extends 
AbstractShardedMemtable
         return true;
     }
 
+    @Override
+    public Token lastToken()
+    {
+        Token lastToken = null;
+        for (MemtableShard shard : shards)
+        {
+            Iterator<PartitionPosition> ppIterator = 
shard.partitions.descendingKeySet().iterator();
+            if (ppIterator.hasNext())
+            {
+                Token token = ppIterator.next().getToken();
+                if (lastToken == null)
+                    lastToken = token;
+                else if (lastToken.compareTo(token) < 0)
+                    lastToken = token;
+            }
+        }
+        return lastToken;
+    }
+
     /**
      * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, 
which supplies the appropriate
      * OpOrdering.
diff --git a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java 
b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java
index 8871b03bd6..3f6fbcbd52 100644
--- a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java
+++ b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.IncludingExcludingBounds;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.SSTableReadsListener;
 import org.apache.cassandra.schema.TableMetadata;
@@ -97,6 +98,15 @@ public class SkipListMemtable extends 
AbstractAllocatorMemtable
         return partitions.isEmpty();
     }
 
+    @Override
+    public Token lastToken()
+    {
+        Iterator<PartitionPosition> iterator = partitions.keySet().iterator();
+        if (iterator.hasNext())
+            return iterator.next().getToken();
+        return null;
+    }
+
     /**
      * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, 
which supplies the appropriate
      * OpOrdering.
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index c1ce231da9..a266de9697 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -26,22 +27,26 @@ import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.api.Key;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Key;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.local.cfk.CommandsForKey;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -50,13 +55,16 @@ import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
 import accord.local.SafeCommandStore;
+import accord.local.cfk.CommandsForKey;
 import accord.primitives.Keys;
+import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.ReducingRangeMap;
+import accord.utils.TriConsumer;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import org.apache.cassandra.cache.CacheSize;
@@ -64,14 +72,30 @@ import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnFamilyStore.FlushReason;
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.memtable.TrieMemtable;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
 import org.apache.cassandra.metrics.AccordStateCacheMetrics;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.async.AsyncOperation;
 import org.apache.cassandra.service.accord.events.CacheEvents;
 import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 
 public class AccordCommandStore extends CommandStore implements CacheSize
@@ -110,6 +134,8 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
     private long lastSystemTimestampMicros = Long.MIN_VALUE;
     private final CommandsForRangesLoader commandsForRangesLoader;
 
+    private static final TriConsumer<Object, BiConsumer<Object, Throwable>, 
Executor> addCallbackAdapter = (future, callback, executor) -> 
((org.apache.cassandra.utils.concurrent.Future<?>)future).addCallback(callback, 
executor);
+
     public AccordCommandStore(int id,
                               NodeTimeService time,
                               Agent agent,
@@ -120,7 +146,17 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
                               IJournal journal,
                               AccordStateCacheMetrics cacheMetrics)
     {
-        this(id, time, agent, dataStore, progressLogFactory, listenerFactory, 
epochUpdateHolder, journal, Stage.READ.executor(), Stage.MUTATION.executor(), 
cacheMetrics);
+        this(id,
+             time,
+             agent,
+             dataStore,
+             progressLogFactory,
+             listenerFactory,
+             epochUpdateHolder,
+             journal,
+             Stage.READ.executor(),
+             Stage.MUTATION.executor(),
+             cacheMetrics);
     }
 
     private static <K, V> void registerJfrListener(int id, 
AccordStateCache.Instance<K, V, ?> instance, String name)
@@ -203,7 +239,7 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
                               ExecutorPlus saveExecutor,
                               AccordStateCacheMetrics cacheMetrics)
     {
-        super(id, time, agent, dataStore, progressLogFactory, listenerFactory, 
epochUpdateHolder);
+        super(id, time, agent, dataStore, progressLogFactory, listenerFactory, 
epochUpdateHolder, addCallbackAdapter, AccordKeyspace::updateDurableBefore, 
AccordKeyspace::updateRedundantBefore, AccordKeyspace::updateBootstrapBeganAt, 
AccordKeyspace::updateSafeToRead);
         this.journal = journal;
         loggingId = String.format("[%s]", id);
         executor = 
executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id + 
']');
@@ -245,9 +281,9 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
                 if (rejectBefore != null)
                     super.setRejectBefore(rejectBefore);
                 if (durableBefore != null)
-                    super.setDurableBefore(durableBefore);
+                    super.setDurableBefore(DurableBefore.merge(durableBefore, 
durableBefore()));
                 if (redundantBefore != null)
-                    super.setRedundantBefore(redundantBefore);
+                    
super.setRedundantBefore(RedundantBefore.merge(redundantBefore, 
redundantBefore));
                 if (bootstrapBeganAt != null)
                     super.setBootstrapBeganAt(bootstrapBeganAt);
                 if (safeToRead != null)
@@ -260,7 +296,7 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
 
     static Factory factory(AccordJournal journal, AccordStateCacheMetrics 
cacheMetrics)
     {
-        return (id, time, agent, dataStore, progressLogFactory, 
listenerFactory, rangesForEpoch) ->
+        return (id, time, agent, dataStore, progressLogFactory, 
listenerFactory, rangesForEpoch, scheduler) ->
                new AccordCommandStore(id, time, agent, dataStore, 
progressLogFactory, listenerFactory, rangesForEpoch, journal, cacheMetrics);
     }
 
@@ -547,36 +583,6 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
         AccordKeyspace.updateRejectBefore(this, newRejectBefore);
     }
 
-    protected void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
-    {
-        super.setBootstrapBeganAt(newBootstrapBeganAt);
-        // TODO (required, correctness): rework to persist via journal once 
available, this can lose updates in some edge cases
-        AccordKeyspace.updateBootstrapBeganAt(this, newBootstrapBeganAt);
-    }
-
-    protected void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead)
-    {
-        super.setSafeToRead(newSafeToRead);
-        // TODO (required, correctness): rework to persist via journal once 
available, this can lose updates in some edge cases
-        AccordKeyspace.updateSafeToRead(this, newSafeToRead);
-    }
-
-    @Override
-    public void setDurableBefore(DurableBefore newDurableBefore)
-    {
-        super.setDurableBefore(newDurableBefore);
-        AccordKeyspace.updateDurableBefore(this, newDurableBefore);
-    }
-
-    @Override
-    protected void setRedundantBefore(RedundantBefore newRedundantBefore)
-    {
-        super.setRedundantBefore(newRedundantBefore);
-        // TODO (required): this needs to be synchronous, or at least needs to 
take effect before we rely upon it
-        AccordKeyspace.updateRedundantBefore(this, newRedundantBefore);
-    }
-
-    public NavigableMap<TxnId, Ranges> bootstrapBeganAt() { return 
super.bootstrapBeganAt(); }
     public NavigableMap<Timestamp, Ranges> safeToRead() { return 
super.safeToRead(); }
 
     public void appendCommands(List<SavedCommand.Writer<TxnId>> commands, 
List<Command> sanityCheck, Runnable onFlush)
@@ -589,4 +595,86 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
     {
         return journal.loadCommand(id, txnId);
     }
+
+    public Future<?> prepareForGC(@Nonnull Timestamp gcBefore, @Nonnull Ranges 
ranges)
+    {
+        checkNotNull(gcBefore, "gcBefore should not be null");
+        checkNotNull(ranges, "ranges should not be null");
+        ListMultimap<TableId, org.apache.cassandra.dht.Range<Token>> toPrepare 
= ArrayListMultimap.create();
+        for (Range r : ranges)
+        {
+            TokenRange tr = (TokenRange)r;
+            TableId tableId = ((TokenRange) r).table();
+            toPrepare.put(tableId, tr.toKeyspaceRange());
+        }
+
+        List<Future<CommitLogPosition>> flushes = new 
ArrayList<>(toPrepare.keySet().size());
+        for (TableId tableId : toPrepare.keySet())
+        {
+            List<org.apache.cassandra.dht.Range<Token>> tableRanges = 
toPrepare.get(tableId);
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+            if (cfs == null)
+            {
+                // TODO (review): Error handling?
+                logger.warn("Cannot prepare CFS with tableId {} for Accord GC 
because it does not exist", tableId);
+                continue;
+            }
+
+            Memtable currentMemtable = cfs.getCurrentMemtable();
+            if (currentMemtable.getMinTimestamp() > gcBefore.hlc())
+                continue;
+
+            boolean intersects = false;
+            // TrieMemtable doesn't support reverse iteration so can't find 
the last token
+            if (currentMemtable instanceof TrieMemtable)
+                intersects = true;
+            else
+            {
+                Token firstToken = null;
+                try (UnfilteredPartitionIterator iterator = 
currentMemtable.partitionIterator(ColumnFilter.all(cfs.metadata()), 
DataRange.allData(cfs.getPartitioner()), SSTableReadsListener.NOOP_LISTENER))
+                {
+                    if (iterator.hasNext())
+                        firstToken = iterator.next().partitionKey().getToken();
+                }
+                Token lastToken = currentMemtable.lastToken();
+
+                if (firstToken != null)
+                {
+                    checkState(lastToken != null);
+                    if (firstToken.equals(lastToken))
+                    {
+                        for (org.apache.cassandra.dht.Range<Token> tableRange 
: tableRanges)
+                        {
+                            if (tableRange.contains(firstToken))
+                            {
+                                intersects = true;
+                                break;
+                            }
+                        }
+                    }
+                    else
+                    {
+                        checkState(firstToken.compareTo(lastToken) < 0);
+                        org.apache.cassandra.dht.Range<Token> memtableRange = 
new org.apache.cassandra.dht.Range<>(firstToken, lastToken);
+                        for (org.apache.cassandra.dht.Range<Token> tableRange 
: tableRanges)
+                        {
+                            if (tableRange.intersects(memtableRange))
+                            {
+                                intersects = true;
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+
+            if (intersects)
+                flushes.add(cfs.forceFlush(FlushReason.ACCORD_TXN_GC));
+        }
+
+        if (flushes.isEmpty())
+            return ImmediateFuture.success(null);
+        else
+            return FutureCombiner.allOf(flushes);
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 60cbcf84ff..46bd54ae8f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -24,6 +24,7 @@ import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
+import accord.api.Scheduler;
 import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.NodeTimeService;
@@ -47,18 +48,18 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
 
     AccordCommandStores(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random,
                         ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, LocalListeners.Factory listenerFactory,
-                        AccordJournal journal)
+                        AccordJournal journal, Scheduler scheduler)
     {
         super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenerFactory,
-              AccordCommandStore.factory(journal, new 
AccordStateCacheMetrics(ACCORD_STATE_CACHE)));
+              AccordCommandStore.factory(journal, new 
AccordStateCacheMetrics(ACCORD_STATE_CACHE)), scheduler);
         setCapacity(DatabaseDescriptor.getAccordCacheSizeInMiB() << 20);
         this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this);
     }
 
     static Factory factory(AccordJournal journal)
     {
-        return (time, agent, store, random, shardDistributor, 
progressLogFactory, listenerFactory) ->
-               new AccordCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, listenerFactory, journal);
+        return (time, agent, store, random, shardDistributor, 
progressLogFactory, listenerFactory, scheduler) ->
+               new AccordCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, listenerFactory, journal, scheduler);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 8046b9d388..d133a2ee9c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -31,11 +31,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -48,7 +48,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Key;
-import accord.local.cfk.CommandsForKey;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -58,6 +57,7 @@ import accord.local.RedundantBefore;
 import accord.local.SaveStatus;
 import accord.local.Status;
 import accord.local.Status.Durability;
+import accord.local.cfk.CommandsForKey;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.Route;
@@ -151,9 +151,12 @@ import org.apache.cassandra.utils.Clock.Global;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
 
 import static accord.utils.Invariants.checkArgument;
 import static accord.utils.Invariants.checkState;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
@@ -1687,23 +1690,33 @@ public class AccordKeyspace
         return updateCommandStoreMetadata(commandStore, "reject_before", 
rejectBefore, LocalVersionedSerializers.rejectBefore);
     }
 
-    public static Future<?> updateDurableBefore(CommandStore commandStore, 
DurableBefore durableBefore)
+    public static Future<?> updateDurableBefore(CommandStore commandStore, 
@Nullable Timestamp gcBefore, @Nullable Ranges ranges, DurableBefore 
durableBefore)
     {
+        checkArgument(gcBefore == null);
+        checkArgument(ranges == null);
         return updateCommandStoreMetadata(commandStore, "durable_before", 
durableBefore, LocalVersionedSerializers.durableBefore);
     }
 
-    public static Future<?> updateRedundantBefore(CommandStore commandStore, 
RedundantBefore redundantBefore)
+    public static Future<?> updateRedundantBefore(CommandStore commandStore, 
@Nonnull Timestamp gcBefore, @Nonnull Ranges ranges, RedundantBefore 
redundantBefore)
     {
-        return updateCommandStoreMetadata(commandStore, "redundant_before", 
redundantBefore, LocalVersionedSerializers.redundantBefore);
+        checkNotNull(gcBefore, "gcBefore should not be null");
+        checkNotNull(ranges, "ranges should not be null");
+        Future<?> tableUpdateFuture = updateCommandStoreMetadata(commandStore, 
"redundant_before", redundantBefore, LocalVersionedSerializers.redundantBefore);
+        Future<?> gcPrepareFuture = 
((AccordCommandStore)commandStore).prepareForGC(gcBefore, ranges);
+        return FutureCombiner.allOf(tableUpdateFuture, gcPrepareFuture);
     }
 
-    public static Future<?> updateBootstrapBeganAt(CommandStore commandStore, 
NavigableMap<TxnId, Ranges> bootstrapBeganAt)
+    public static Future<?> updateBootstrapBeganAt(CommandStore commandStore, 
@Nullable Timestamp gcBefore, @Nullable Ranges ranges, NavigableMap<TxnId, 
Ranges> bootstrapBeganAt)
     {
+        checkArgument(gcBefore == null);
+        checkArgument(ranges == null);
         return updateCommandStoreMetadata(commandStore, "bootstrap_began_at", 
bootstrapBeganAt, LocalVersionedSerializers.bootstrapBeganAt);
     }
 
-    public static Future<?> updateSafeToRead(CommandStore commandStore, 
NavigableMap<Timestamp, Ranges> safeToRead)
+    public static Future<?> updateSafeToRead(CommandStore commandStore, 
@Nullable Timestamp gcBefore, @Nullable Ranges ranges, NavigableMap<Timestamp, 
Ranges> safeToRead)
     {
+        checkArgument(gcBefore == null);
+        checkArgument(ranges == null);
         return updateCommandStoreMetadata(commandStore, "safe_to_read", 
safeToRead, LocalVersionedSerializers.safeToRead);
     }
 
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 80a7b41769..58e749aad4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -32,20 +32,15 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import com.google.common.collect.Sets;
+import org.junit.Assert;
 
+import accord.api.Data;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog.NoOpProgressLog;
 import accord.api.RemoteListeners;
-import accord.impl.DefaultLocalListeners;
-import accord.utils.SortedArrays.SortedArrayList;
-import org.apache.cassandra.ServerTestUtils;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.File;
-import org.junit.Assert;
-
-import accord.api.Data;
 import accord.api.Result;
 import accord.api.RoutingKey;
+import accord.impl.DefaultLocalListeners;
 import accord.impl.InMemoryCommandStore;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -74,12 +69,15 @@ import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.topology.Shard;
 import accord.topology.Topology;
+import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.async.AsyncChains;
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.concurrent.ManualExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
@@ -88,6 +86,7 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.metrics.AccordStateCacheMetrics;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
@@ -375,7 +374,7 @@ public class AccordTestUtils
 
         SingleEpochRanges holder = new SingleEpochRanges(Ranges.of(range));
         InMemoryCommandStore.Synchronized result = new 
InMemoryCommandStore.Synchronized(0, time, new AccordAgent(),
-                                                     null, null, cs -> null, 
holder);
+                                                     null, null, cs -> null, 
holder, null);
         holder.set(result);
         return result;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to