This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f6c1002e44 Stream individual files in their own transactions and hand
over ownership to a parent transaction on completion
f6c1002e44 is described below
commit f6c1002e44ed73fbcf940242217f5d20e0bf2d7d
Author: Marcus Eriksson <[email protected]>
AuthorDate: Wed Jun 11 08:08:59 2025 +0200
Stream individual files in their own transactions and hand over ownership
to a parent transaction on completion
patch by Marcus Eriksson; reviewed by Caleb Rackliffe and Jon Meredith for
CASSANDRA-20728
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 77 ++++++++---
.../db/compaction/AbstractCompactionStrategy.java | 6 +-
.../db/compaction/AbstractStrategyHolder.java | 4 +-
.../db/compaction/CompactionStrategyHolder.java | 6 +-
.../db/compaction/CompactionStrategyManager.java | 6 +-
.../db/compaction/PendingRepairHolder.java | 6 +-
.../db/compaction/UnifiedCompactionStrategy.java | 6 +-
.../db/compaction/unified/ShardedMultiWriter.java | 12 +-
.../db/lifecycle/ILifecycleTransaction.java | 7 +-
.../db/lifecycle/LifecycleNewTracker.java | 47 -------
.../db/lifecycle/LifecycleTransaction.java | 18 +--
.../org/apache/cassandra/db/lifecycle/LogFile.java | 15 ++
.../cassandra/db/lifecycle/LogTransaction.java | 12 ++
.../lifecycle/StreamingLifecycleTransaction.java | 68 +++++++++
.../CassandraEntireSSTableStreamReader.java | 50 +++++--
.../db/streaming/CassandraIncomingFile.java | 12 +-
.../db/streaming/CassandraStreamReader.java | 17 ++-
.../db/streaming/CassandraStreamReceiver.java | 50 +------
src/java/org/apache/cassandra/index/Index.java | 10 +-
.../org/apache/cassandra/index/IndexRegistry.java | 4 +-
.../cassandra/index/SingletonIndexGroup.java | 6 +-
.../index/accord/MemtableIndexManager.java | 4 +-
.../cassandra/index/accord/RouteJournalIndex.java | 8 +-
.../index/accord/RouteMemtableIndexManager.java | 6 +-
.../cassandra/index/sai/StorageAttachedIndex.java | 4 +-
.../index/sai/StorageAttachedIndexGroup.java | 6 +-
.../index/sai/disk/StorageAttachedIndexWriter.java | 16 +--
.../index/sai/disk/format/IndexDescriptor.java | 6 +-
.../index/sai/disk/format/OnDiskFormat.java | 8 +-
.../index/sai/disk/v1/V1OnDiskFormat.java | 8 +-
.../index/sai/memory/MemtableIndexManager.java | 6 +-
.../org/apache/cassandra/index/sasi/SASIIndex.java | 6 +-
.../io/sstable/RangeAwareSSTableWriter.java | 20 ++-
.../io/sstable/SSTableTxnSingleStreamWriter.java | 145 +++++++++++++++++++
.../io/sstable/SSTableZeroCopyWriter.java | 7 +-
.../io/sstable/SimpleSSTableMultiWriter.java | 16 +--
.../cassandra/io/sstable/format/SSTableFormat.java | 3 +-
.../cassandra/io/sstable/format/SSTableWriter.java | 30 ++--
.../io/sstable/format/SortedTableWriter.java | 6 +-
.../io/sstable/format/big/BigTableWriter.java | 16 +--
.../io/sstable/format/bti/BtiTableWriter.java | 12 +-
.../apache/cassandra/streaming/IncomingStream.java | 2 +-
.../streaming/StreamDeserializingTask.java | 5 +-
.../IndexBuildFailsAfterStreamingTest.java | 99 +++++++++++++
.../streaming/StreamFailedAfterReceivingTest.java | 154 +++++++++++++++++++++
.../streaming/StreamFailedAfterTransferTest.java | 127 +++++++++++++++++
.../db/lifecycle/LifecycleTransactionTest.java | 51 +++++++
.../CassandraEntireSSTableStreamWriterTest.java | 10 +-
...TableStreamConcurrentComponentMutationTest.java | 8 +-
.../apache/cassandra/index/CustomIndexTest.java | 8 +-
.../org/apache/cassandra/index/StubIndexGroup.java | 4 +-
.../org/apache/cassandra/io/sstable/ScrubTest.java | 6 +-
.../cassandra/streaming/StreamReaderTest.java | 3 +-
54 files changed, 970 insertions(+), 280 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6fb94d9626..1efac73e11 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Stream individual files in their own transactions and hand over ownership
to a parent transaction on completion (CASSANDRA-20728)
* Limit the number of held heap dumps to not consume disk space excessively
(CASSANDRA-20457)
* Accord: BEGIN TRANSACTIONs IF condition logic does not properly support
meaningless emptiness and null values (CASSANDRA-20667)
* Accord: startup race condition where accord journal tries to access the 2i
index before its ready (CASSANDRA-20686)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e81c0d7be8..a499bc42f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -85,7 +85,7 @@ import
org.apache.cassandra.db.compaction.CompactionStrategyManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.Tracker;
@@ -652,19 +652,19 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
return memtableFactory.streamFromMemtable();
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient,
SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient,
SerializationHeader header, ILifecycleTransaction txn)
{
- return createSSTableMultiWriter(descriptor, keyCount, repairedAt,
pendingRepair, isTransient, null, 0, header, lifecycleNewTracker);
+ return createSSTableMultiWriter(descriptor, keyCount, repairedAt,
pendingRepair, isTransient, null, 0, header, txn);
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient,
IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header,
LifecycleNewTracker lifecycleNewTracker)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient,
IntervalSet<CommitLogPosition> commitLogPositions, SerializationHeader header,
ILifecycleTransaction txn)
{
- return createSSTableMultiWriter(descriptor, keyCount, repairedAt,
pendingRepair, isTransient, commitLogPositions, 0, header, lifecycleNewTracker);
+ return createSSTableMultiWriter(descriptor, keyCount, repairedAt,
pendingRepair, isTransient, commitLogPositions, 0, header, txn);
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient,
IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel,
SerializationHeader header, LifecycleNewTracker lifecycleNewTracker)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
long keyCount, long repairedAt, TimeUUID pendingRepair, boolean isTransient,
IntervalSet<CommitLogPosition> commitLogPositions, int sstableLevel,
SerializationHeader header, ILifecycleTransaction txn)
{
- return
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount,
repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel,
header, indexManager.listIndexGroups(), lifecycleNewTracker);
+ return
getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount,
repairedAt, pendingRepair, isTransient, commitLogPositions, sstableLevel,
header, indexManager.listIndexGroups(), txn);
}
public boolean supportsEarlyOpen()
@@ -2410,22 +2410,67 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean, Memtable.Owner
}
}
- private static final LifecycleNewTracker DO_NOT_TRACK = new
LifecycleNewTracker()
+ private static final ILifecycleTransaction DO_NOT_TRACK = new
ILifecycleTransaction()
{
- public void trackNew(SSTable table)
+ @Override
+ public void trackNew(SSTable sstable)
{
- // not tracking
- }
- public void untrackNew(SSTable table)
- {
- // not tracking
}
- public OperationType opType()
+ @Override
+ public void untrackNew(SSTable sstable)
{
- return OperationType.FLUSH;
+
}
+
+ @Override
+ public OperationType opType() {return null;}
+
+ @Override
+ public void checkpoint() {}
+
+ @Override
+ public void update(SSTableReader reader, boolean original) {}
+
+ @Override
+ public void update(Collection<SSTableReader> readers, boolean
original) {}
+
+ @Override
+ public SSTableReader current(SSTableReader reader) {return null;}
+
+ @Override
+ public void obsolete(SSTableReader reader) {}
+
+ @Override
+ public void obsoleteOriginals() {}
+
+ @Override
+ public Set<SSTableReader> originals() {return Set.of();}
+
+ @Override
+ public boolean isObsolete(SSTableReader reader) {return false;}
+
+ @Override
+ public boolean isOffline() {return false;}
+
+ @Override
+ public TimeUUID opId() {return null;}
+
+ @Override
+ public void cancel(SSTableReader removedSSTable) {}
+
+ @Override
+ public Throwable commit(Throwable accumulate) {return null;}
+
+ @Override
+ public Throwable abort(Throwable accumulate) {return null;}
+
+ @Override
+ public void prepareToCommit() {}
+
+ @Override
+ public void close() {}
};
/**
diff --git
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 99a509c5f2..def444f9a9 100644
---
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -562,7 +562,7 @@ public abstract class AbstractCompactionStrategy
int sstableLevel,
SerializationHeader
header,
Collection<Index.Group>
indexGroups,
- LifecycleNewTracker
lifecycleNewTracker)
+ ILifecycleTransaction
txn)
{
return SimpleSSTableMultiWriter.create(descriptor,
keyCount,
@@ -574,7 +574,7 @@ public abstract class AbstractCompactionStrategy
sstableLevel,
header,
indexGroups,
- lifecycleNewTracker, cfs);
+ txn, cfs);
}
public boolean supportsEarlyOpen()
diff --git
a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index 2db7e1db60..4bc7146f83 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
@@ -200,7 +200,7 @@ public abstract class AbstractStrategyHolder
int
sstableLevel,
SerializationHeader header,
Collection<Index.Group> indexGroups,
-
LifecycleNewTracker lifecycleNewTracker);
+
ILifecycleTransaction txn);
/**
* Return the directory index the given compaction strategy belongs to, or
-1
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 9bc4d6d7cb..af5c749929 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
@@ -230,7 +230,7 @@ public class CompactionStrategyHolder extends
AbstractStrategyHolder
int sstableLevel,
SerializationHeader
header,
Collection<Index.Group>
indexGroups,
- LifecycleNewTracker
lifecycleNewTracker)
+ ILifecycleTransaction
txn)
{
if (isRepaired)
{
@@ -255,7 +255,7 @@ public class CompactionStrategyHolder extends
AbstractStrategyHolder
sstableLevel,
header,
indexGroups,
- lifecycleNewTracker);
+ txn);
}
@Override
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index dfb908425a..b8eaa5bd81 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -54,7 +54,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.compaction.AbstractStrategyHolder.TaskSupplier;
import org.apache.cassandra.db.compaction.PendingRepairManager.CleanupTask;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
@@ -1257,7 +1257,7 @@ public class CompactionStrategyManager implements
INotificationConsumer
int sstableLevel,
SerializationHeader
header,
Collection<Index.Group>
indexGroups,
- LifecycleNewTracker
lifecycleNewTracker)
+ ILifecycleTransaction
txn)
{
SSTable.validateRepairedMetadata(repairedAt, pendingRepair,
isTransient);
maybeReloadDiskBoundaries();
@@ -1273,7 +1273,7 @@ public class CompactionStrategyManager implements
INotificationConsumer
sstableLevel,
header,
indexGroups,
-
lifecycleNewTracker);
+
txn);
}
finally
{
diff --git
a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 1f2ff4a1a1..45e8753394 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
@@ -250,7 +250,7 @@ public class PendingRepairHolder extends
AbstractStrategyHolder
int sstableLevel,
SerializationHeader
header,
Collection<Index.Group>
indexGroups,
- LifecycleNewTracker
lifecycleNewTracker)
+ ILifecycleTransaction
txn)
{
Preconditions.checkArgument(repairedAt ==
ActiveRepairService.UNREPAIRED_SSTABLE,
"PendingRepairHolder can't create
sstablewriter with repaired at set");
@@ -267,7 +267,7 @@ public class PendingRepairHolder extends
AbstractStrategyHolder
sstableLevel,
header,
indexGroups,
- lifecycleNewTracker);
+ txn);
}
@Override
diff --git
a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java
b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java
index c23e600e7b..d44a9f3288 100644
--- a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.db.compaction.unified.Controller;
import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
import org.apache.cassandra.db.lifecycle.CompositeLifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.PartialLifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -303,7 +303,7 @@ public class UnifiedCompactionStrategy extends
AbstractCompactionStrategy
int sstableLevel,
SerializationHeader
header,
Collection<Index.Group>
indexGroups,
- LifecycleNewTracker
lifecycleNewTracker)
+ ILifecycleTransaction
txn)
{
ShardManager shardManager = getShardManager();
double flushDensity = cfs.metric.flushSizeOnDisk.get() *
shardManager.shardSetCoverage() / shardManager.localSpaceCoverage();
@@ -317,7 +317,7 @@ public class UnifiedCompactionStrategy extends
AbstractCompactionStrategy
commitLogPositions,
header,
indexGroups,
- lifecycleNewTracker,
+ txn,
boundaries);
}
diff --git
a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
index a5b5df9e49..ab5465df25 100644
---
a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
+++
b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.compaction.ShardTracker;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -64,7 +64,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter
private final IntervalSet<CommitLogPosition> commitLogPositions;
private final SerializationHeader header;
private final Collection<Index.Group> indexGroups;
- private final LifecycleNewTracker lifecycleNewTracker;
+ private final ILifecycleTransaction txn;
private final ShardTracker boundaries;
private final SSTableWriter[] writers;
private int currentWriter;
@@ -78,7 +78,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter
IntervalSet<CommitLogPosition>
commitLogPositions,
SerializationHeader header,
Collection<Index.Group> indexGroups,
- LifecycleNewTracker lifecycleNewTracker,
+ ILifecycleTransaction txn,
ShardTracker boundaries)
{
this.cfs = cfs;
@@ -90,7 +90,7 @@ public class ShardedMultiWriter implements SSTableMultiWriter
this.commitLogPositions = commitLogPositions;
this.header = header;
this.indexGroups = indexGroups;
- this.lifecycleNewTracker = lifecycleNewTracker;
+ this.txn = txn;
this.boundaries = boundaries;
this.writers = new SSTableWriter[this.boundaries.count()]; // at least
one
@@ -118,7 +118,7 @@ public class ShardedMultiWriter implements
SSTableMultiWriter
.setSerializationHeader(header)
.addDefaultComponents(indexGroups)
.setSecondaryIndexGroups(indexGroups)
- .build(lifecycleNewTracker, cfs);
+ .build(txn, cfs);
}
private long forSplittingKeysBy(long splits) {
@@ -227,7 +227,7 @@ public class ShardedMultiWriter implements
SSTableMultiWriter
for (SSTableWriter writer : writers)
if (writer != null)
{
- lifecycleNewTracker.untrackNew(writer);
+ txn.untrackNew(writer);
t = writer.abort(t);
}
return t;
diff --git
a/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java
b/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java
index f3e9c9b7c5..5da8447917 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/ILifecycleTransaction.java
@@ -23,13 +23,18 @@ import java.util.Set;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Transactional;
-public interface ILifecycleTransaction extends Transactional,
LifecycleNewTracker
+public interface ILifecycleTransaction extends Transactional
{
+ void trackNew(SSTable sstable);
+ void untrackNew(SSTable sstable);
+ OperationType opType();
void checkpoint();
void update(SSTableReader reader, boolean original);
void update(Collection<SSTableReader> readers, boolean original);
diff --git
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
deleted file mode 100644
index 9a0785c43f..0000000000
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleNewTracker.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.cassandra.db.lifecycle;
-
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.SSTable;
-
-/**
- * An interface for tracking new sstables added to a LifecycleTransaction,
possibly through some proxy.
- */
-public interface LifecycleNewTracker
-{
- /**
- * Called when a new table is about to be created, so that this table can
be tracked by a transaction.
- * @param table - the new table to be tracked
- */
- void trackNew(SSTable table);
-
-
- /**
- * Called when a new table is no longer required, so that this table can
be untracked by a transaction.
- * @param table - the table to be untracked
- */
- void untrackNew(SSTable table);
-
- /**
- * @return the type of operation tracking these sstables
- */
- OperationType opType();
-}
diff --git
a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 15b0417b38..48b0bc1ffb 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -205,12 +205,7 @@ public class LifecycleTransaction extends
Transactional.AbstractTransactional im
}
}
- public LogTransaction log()
- {
- return log;
- }
-
- @Override //LifecycleNewTracker
+ @Override
public OperationType opType()
{
return log.type();
@@ -322,6 +317,14 @@ public class LifecycleTransaction extends
Transactional.AbstractTransactional im
return accumulate;
}
+ void takeOwnership(ILifecycleTransaction txn)
+ {
+ LifecycleTransaction ltn = (LifecycleTransaction)txn;
+ if (!ltn.obsoletions.isEmpty() || !ltn.originals.isEmpty() ||
!ltn.logged.isEmpty())
+ throw new IllegalStateException("takeOwnership is only supported
in add-only transactions (streams)");
+ log.takeOwnership(ltn.log);
+ }
+
private Throwable runOnCommitHooks(Throwable accumulate)
{
return runHooks(commitHooks, accumulate);
@@ -646,8 +649,6 @@ public class LifecycleTransaction extends
Transactional.AbstractTransactional im
return getFirst(originals, null);
}
- // LifecycleNewTracker
-
@Override
public void trackNew(SSTable table)
{
@@ -659,7 +660,6 @@ public class LifecycleTransaction extends
Transactional.AbstractTransactional im
{
log.untrackNew(table);
}
-
public static boolean removeUnfinishedLeftovers(ColumnFamilyStore cfs)
{
return
LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories());
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 13436b112a..08ba257f16 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -552,4 +552,19 @@ final class LogFile implements AutoCloseable
{
return records.isEmpty();
}
+
+ public void takeOwnership(LogFile txnFile)
+ {
+ if (completed)
+ throw TransactionAlreadyCompletedException.create(getFiles());
+ for (LogRecord record : txnFile.records)
+ {
+ if (record.type != Type.ADD)
+ throw new IllegalStateException("Can only transfer ADD records
- not " + record + " - " + txnFile.records);
+ File directory = new File(record.absolutePath.get()).parent();
+ String fileName = StringUtils.join(directory,
File.pathSeparator(), getFileName());
+ replicas.maybeCreateReplica(directory, fileName, onDiskRecords);
+ addRecord(record);
+ }
+ }
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 92766de385..b675ad238b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -209,6 +209,18 @@ class LogTransaction extends
Transactional.AbstractTransactional implements Tran
}
}
+ void takeOwnership(LogTransaction log)
+ {
+ synchronized (lock)
+ {
+ if (state() != State.IN_PROGRESS)
+ throw new IllegalStateException("The LogTransaction getting
ownership should be IN_PROGRESS, not " + state());
+ if (log.state() != State.READY_TO_COMMIT)
+ throw new IllegalStateException("The LogTransaction giving up
its ownership should be READY_TO_COMMIT, not " + log.state());
+ txnFile.takeOwnership(log.txnFile);
+ }
+ }
+
Map<SSTable, LogRecord> makeRemoveRecords(Iterable<SSTableReader> sstables)
{
synchronized (lock)
diff --git
a/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java
b/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java
new file mode 100644
index 0000000000..38145b6132
--- /dev/null
+++
b/src/java/org/apache/cassandra/db/lifecycle/StreamingLifecycleTransaction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.Collection;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
+/**
+ * Special, restricted LifecycleTransaction for streaming, synchronizes access
to the shared transaction
+ * and adds a method to take ownership of a "child-transaction".
+ *
+ * Each incoming file is now its own normal LifecycleTransaction.
+ */
+public class StreamingLifecycleTransaction
+{
+ private final LifecycleTransaction sharedTxn;
+
+ public StreamingLifecycleTransaction()
+ {
+ this.sharedTxn = LifecycleTransaction.offline(OperationType.STREAM);
+ }
+
+ public synchronized Throwable commit(Throwable accumulate)
+ {
+ return sharedTxn.commit(accumulate);
+ }
+
+ public synchronized void update(Collection<SSTableReader> readers)
+ {
+ sharedTxn.update(readers, false);
+ }
+
+ public synchronized void abort()
+ {
+ maybeFail(sharedTxn.abort(null));
+ }
+
+ public synchronized void finish()
+ {
+ sharedTxn.prepareToCommit();
+ sharedTxn.commit();
+ }
+
+ public synchronized void takeOwnership(ILifecycleTransaction txn)
+ {
+ sharedTxn.takeOwnership(txn);
+ }
+}
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 97c3b2d4f9..f2f1784a9c 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.streaming;
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.function.UnaryOperator;
@@ -28,12 +29,15 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.IOOptions;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.SSTableZeroCopyWriter;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -102,7 +106,7 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
prettyPrintMemory(totalSize),
cfs.metadata());
- SSTableZeroCopyWriter writer = null;
+ SSTableTxnZeroCopyWriter writer = null;
try
{
@@ -121,7 +125,7 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
prettyPrintMemory(totalSize));
writer.writeComponent(component, in, length);
-
session.progress(writer.descriptor.fileFor(component).toString(),
ProgressInfo.Direction.IN, length, length, length);
+
session.progress(writer.descriptor().fileFor(component).toString(),
ProgressInfo.Direction.IN, length, length, length);
bytesRead += length;
logger.debug("[Stream #{}] Finished receiving {} component
from {}, componentSize = {}, readBytes = {}, totalSize = {}",
@@ -137,7 +141,7 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
.mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair,
false);
String description = String.format("level %s and repairedAt time
%s and pendingRepair %s",
header.sstableLevel,
messageHeader.repairedAt, messageHeader.pendingRepair);
-
writer.descriptor.getMetadataSerializer().mutate(writer.descriptor,
description, transform);
+
writer.descriptor().getMetadataSerializer().mutate(writer.descriptor(),
description, transform);
return writer;
}
catch (Throwable e)
@@ -167,14 +171,14 @@ public class CassandraEntireSSTableStreamReader
implements IStreamReader
return dir;
}
- protected SSTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long
totalSize, Collection<Component> components) throws IOException
+ protected SSTableTxnZeroCopyWriter createWriter(ColumnFamilyStore cfs,
long totalSize, Collection<Component> components) throws IOException
{
File dataDir = getDataDir(cfs, totalSize);
StreamReceiver streamReceiver = session.getAggregator(tableId);
assert streamReceiver instanceof CassandraStreamReceiver;
- LifecycleNewTracker lifecycleNewTracker =
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker();
+ LifecycleTransaction txn =
LifecycleTransaction.offline(OperationType.STREAM);
Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version);
@@ -190,12 +194,32 @@ public class CassandraEntireSSTableStreamReader
implements IStreamReader
DatabaseDescriptor.getFlushCompression());
logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(),
desc, components);
- return desc.getFormat()
- .getWriterFactory()
- .builder(desc)
- .setComponents(components)
- .setTableMetadataRef(cfs.metadata)
- .setIOOptions(ioOptions)
- .createZeroCopyWriter(lifecycleNewTracker, cfs);
+ return new SSTableTxnZeroCopyWriter(txn, desc.getFormat()
+ .getWriterFactory()
+ .builder(desc)
+ .setComponents(components)
+
.setTableMetadataRef(cfs.metadata)
+ .setIOOptions(ioOptions)
+
.createZeroCopyWriter(txn, cfs));
+ }
+
+ public static class SSTableTxnZeroCopyWriter extends
SSTableTxnSingleStreamWriter
+ {
+ private final SSTableZeroCopyWriter writer;
+ public SSTableTxnZeroCopyWriter(ILifecycleTransaction txn,
SSTableZeroCopyWriter writer)
+ {
+ super(txn, writer);
+ this.writer = writer;
+ }
+
+ public void writeComponent(Component component, DataInputPlus in, long
length) throws ClosedChannelException
+ {
+ writer.writeComponent(component, in, length);
+ }
+
+ public Descriptor descriptor()
+ {
+ return writer.descriptor;
+ }
}
}
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index e8a6fbcc7c..3498f28b61 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.IncomingStream;
@@ -43,7 +44,7 @@ public class CassandraIncomingFile implements IncomingStream
private final StreamSession session;
private final StreamMessageHeader header;
- private volatile SSTableMultiWriter sstable;
+ private volatile SSTableTxnSingleStreamWriter sstable;
private volatile long size = -1;
private volatile int numFiles = 1;
@@ -84,7 +85,14 @@ public class CassandraIncomingFile implements IncomingStream
reader = new CassandraStreamReader(header, streamHeader, session);
size = streamHeader.size();
- sstable = reader.read(in);
+ sstable = (SSTableTxnSingleStreamWriter)reader.read(in);
+ }
+
+ public synchronized Throwable abort(Throwable t)
+ {
+ if (sstable != null)
+ t = sstable.abort(t);
+ return t;
}
@Override
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index af4b4dbc18..bab8d5a84b 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -35,7 +35,9 @@ import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
@@ -47,6 +49,7 @@ import org.apache.cassandra.exceptions.UnknownColumnException;
import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
@@ -172,7 +175,7 @@ public class CassandraStreamReader implements IStreamReader
{
return header != null? header.toHeader(metadata) : null; //pre-3.0
sstable have no SerializationHeader
}
- protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long
totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?, ?> format)
throws IOException
+ protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs,
long totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?, ?>
format) throws IOException
{
Directories.DataDirectory localDir =
cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
@@ -180,10 +183,14 @@ public class CassandraStreamReader implements
IStreamReader
StreamReceiver streamReceiver = session.getAggregator(tableId);
Preconditions.checkState(streamReceiver instanceof
CassandraStreamReceiver);
- LifecycleNewTracker lifecycleNewTracker =
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).createLifecycleNewTracker();
+ ILifecycleTransaction txn = createTxn();
+ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs,
estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel,
totalSize, txn, getHeader(cfs.metadata()));
+ return new SSTableTxnSingleStreamWriter(txn, writer);
+ }
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs,
estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel,
totalSize, lifecycleNewTracker, getHeader(cfs.metadata()));
- return writer;
+ private ILifecycleTransaction createTxn()
+ {
+ return LifecycleTransaction.offline(OperationType.STREAM);
}
protected long totalSize()
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
index 480fcbe951..fc3e9044cf 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@ -35,10 +35,8 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -47,8 +45,7 @@ import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTopology;
@@ -80,7 +77,7 @@ public class CassandraStreamReceiver implements StreamReceiver
private final StreamSession session;
// Transaction tracking new files received
- private final LifecycleTransaction txn;
+ private final StreamingLifecycleTransaction txn;
// holds references to SSTables received
protected final Collection<SSTableReader> sstables;
@@ -98,7 +95,7 @@ public class CassandraStreamReceiver implements StreamReceiver
this.session = session;
// this is an "offline" transaction, as we currently manually expose
the sstables once done;
// this should be revisited at a later date, so that
LifecycleTransaction manages all sstable state changes
- this.txn = LifecycleTransaction.offline(OperationType.STREAM);
+ this.txn = new StreamingLifecycleTransaction();
this.ranges = ranges;
this.sstables = new ArrayList<>(totalFiles);
this.requiresWritePath = requiresWritePath(cfs);
@@ -122,16 +119,16 @@ public class CassandraStreamReceiver implements
StreamReceiver
CassandraIncomingFile file = getFile(stream);
Collection<SSTableReader> finished = null;
- SSTableMultiWriter sstable = file.getSSTable();
+ SSTableTxnSingleStreamWriter sstable =
(SSTableTxnSingleStreamWriter)file.getSSTable();
try
{
- finished = sstable.finish(true);
+ finished = sstable.transferOwnershipTo(txn);
}
catch (Throwable t)
{
Throwables.maybeFail(sstable.abort(t));
}
- txn.update(finished, false);
+ txn.update(finished);
sstables.addAll(finished);
receivedEntireSSTable = file.isEntireSSTable();
}
@@ -143,39 +140,6 @@ public class CassandraStreamReceiver implements
StreamReceiver
Throwables.maybeFail(file.getSSTable().abort(null));
}
- /**
- * @return a LifecycleNewTracker whose operations are synchronised on this
StreamReceiveTask.
- */
- public synchronized LifecycleNewTracker createLifecycleNewTracker()
- {
- return new LifecycleNewTracker()
- {
- @Override
- public void trackNew(SSTable table)
- {
- synchronized (CassandraStreamReceiver.this)
- {
- txn.trackNew(table);
- }
- }
-
- @Override
- public void untrackNew(SSTable table)
- {
- synchronized (CassandraStreamReceiver.this)
- {
- txn.untrackNew(table);
- }
- }
-
- public OperationType opType()
- {
- return txn.opType();
- }
- };
- }
-
-
@Override
public synchronized void abort()
{
diff --git a/src/java/org/apache/cassandra/index/Index.java
b/src/java/org/apache/cassandra/index/Index.java
index f9e2265631..8ebbc4226d 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -47,7 +47,7 @@ import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -384,11 +384,11 @@ public interface Index
* Get flush observer to observe partition/cell events generated by
flushing SSTable (memtable flush or compaction).
*
* @param descriptor The descriptor of the sstable observer is requested
for.
- * @param tracker The {@link LifecycleNewTracker} associated with the
SSTable being written
+ * @param txn The {@link ILifecycleTransaction} associated with the
SSTable being written
*
* @return SSTable flush observer.
*/
- default SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker)
+ default SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn)
{
return null;
}
@@ -822,12 +822,12 @@ public interface Index
* Get flush observer to observe partition/cell events generated by
flushing SSTable (memtable flush or compaction).
*
* @param descriptor The descriptor of the sstable observer is
requested for.
- * @param tracker The {@link LifecycleNewTracker} associated with the
SSTable being written
+ * @param txn The {@link ILifecycleTransaction} associated with the
SSTable being written
* @param tableMetadata The immutable metadata of the table at the
moment the SSTable is flushed
*
* @return SSTable flush observer.
*/
- SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker, TableMetadata tableMetadata);
+ SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn, TableMetadata tableMetadata);
/**
* @param type index transaction type
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java
b/src/java/org/apache/cassandra/index/IndexRegistry.java
index 5fa91ca092..8b14ef2c9e 100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.memtable.Memtable;
@@ -250,7 +250,7 @@ public interface IndexRegistry extends Iterable<Index>
@Nullable
@Override
- public SSTableFlushObserver getFlushObserver(Descriptor
descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata)
+ public SSTableFlushObserver getFlushObserver(Descriptor
descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata)
{
return null;
}
diff --git a/src/java/org/apache/cassandra/index/SingletonIndexGroup.java
b/src/java/org/apache/cassandra/index/SingletonIndexGroup.java
index 162247fd74..501144f069 100644
--- a/src/java/org/apache/cassandra/index/SingletonIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/SingletonIndexGroup.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.io.sstable.Component;
@@ -88,9 +88,9 @@ public class SingletonIndexGroup implements Index.Group
}
@Override
- public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker, TableMetadata tableMetadata)
+ public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn, TableMetadata tableMetadata)
{
- return delegate.getFlushObserver(descriptor, tracker);
+ return delegate.getFlushObserver(descriptor, txn);
}
@Override
diff --git
a/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java
b/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java
index bdaebc2a94..54bcb238a6 100644
--- a/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java
+++ b/src/java/org/apache/cassandra/index/accord/MemtableIndexManager.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import java.util.NavigableSet;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.schema.TableId;
@@ -31,7 +31,7 @@ public interface MemtableIndexManager
{
long index(DecoratedKey key, Row row, Memtable mt);
- MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker);
+ MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn);
void discardMemtable(Memtable memtable);
diff --git a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
index aa97dd981e..acd014a5c4 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
@@ -49,7 +49,7 @@ import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.Int32Type;
@@ -317,17 +317,17 @@ public class RouteJournalIndex implements Index,
INotificationConsumer
@Override
public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
- LifecycleNewTracker tracker)
+ ILifecycleTransaction txn)
{
// mimics
org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat.newPerColumnIndexWriter
IndexDescriptor id = IndexDescriptor.create(descriptor,
baseCfs.getPartitioner(), baseCfs.metadata().comparator);
- if (tracker.opType() != OperationType.FLUSH || !initBuildStarted)
+ if (txn.opType() != OperationType.FLUSH || !initBuildStarted)
{
return new RouteIndexFormat.SSTableIndexWriter(this, id);
}
else
{
- return new RouteIndexFormat.MemtableRouteIndexWriter(id,
memtableIndexManager.getPendingMemtableIndex(tracker));
+ return new RouteIndexFormat.MemtableRouteIndexWriter(id,
memtableIndexManager.getPendingMemtableIndex(txn));
}
}
diff --git
a/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java
b/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java
index f979ea5cfe..13159d1acb 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteMemtableIndexManager.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.schema.TableId;
@@ -73,10 +73,10 @@ public class RouteMemtableIndexManager implements
MemtableIndexManager
}
@Override
- public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker)
+ public MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn)
{
return liveMemtableIndexMap.keySet().stream()
- .filter(m ->
tracker.equals(m.getFlushTransaction()))
+ .filter(m ->
txn.equals(m.getFlushTransaction()))
.findFirst()
.map(liveMemtableIndexMap::get)
.orElse(null);
diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
index c17f9fb0a9..c93923562c 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
@@ -67,7 +67,7 @@ import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.guardrails.MaxThreshold;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.FloatType;
import org.apache.cassandra.db.memtable.Memtable;
@@ -541,7 +541,7 @@ public class StorageAttachedIndex implements Index
}
@Override
- public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker)
+ public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn)
{
// flush observers should be created from the index group, this is
only used by the singleton index group
throw new UnsupportedOperationException("Storage-attached index flush
observers should never be created directly.");
diff --git
a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
index 02e7971814..28711e9abe 100644
--- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
+++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.rows.Row;
@@ -200,12 +200,12 @@ public class StorageAttachedIndexGroup implements
Index.Group, INotificationCons
}
@Override
- public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker, TableMetadata tableMetadata)
+ public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn, TableMetadata tableMetadata)
{
IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
tableMetadata.partitioner, tableMetadata.comparator);
try
{
- return
StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes,
tracker);
+ return
StorageAttachedIndexWriter.createFlushObserverWriter(indexDescriptor, indexes,
txn);
}
catch (Throwable t)
{
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
index 341bcaac5e..ea1a7d69c7 100644
---
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
+++
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.tries.InMemoryTrie;
@@ -63,29 +63,29 @@ public class StorageAttachedIndexWriter implements
SSTableFlushObserver
public static StorageAttachedIndexWriter
createFlushObserverWriter(IndexDescriptor indexDescriptor,
Collection<StorageAttachedIndex> indexes,
-
LifecycleNewTracker lifecycleNewTracker) throws IOException
+
ILifecycleTransaction txn) throws IOException
{
- return new StorageAttachedIndexWriter(indexDescriptor, indexes,
lifecycleNewTracker, false);
+ return new StorageAttachedIndexWriter(indexDescriptor, indexes, txn,
false);
}
public static StorageAttachedIndexWriter
createBuilderWriter(IndexDescriptor indexDescriptor,
Collection<StorageAttachedIndex> indexes,
-
LifecycleNewTracker lifecycleNewTracker,
+
ILifecycleTransaction txn,
boolean
perIndexComponentsOnly) throws IOException
{
- return new StorageAttachedIndexWriter(indexDescriptor, indexes,
lifecycleNewTracker, perIndexComponentsOnly);
+ return new StorageAttachedIndexWriter(indexDescriptor, indexes, txn,
perIndexComponentsOnly);
}
private StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
Collection<StorageAttachedIndex>
indexes,
- LifecycleNewTracker lifecycleNewTracker,
+ ILifecycleTransaction txn,
boolean perIndexComponentsOnly) throws
IOException
{
this.indexDescriptor = indexDescriptor;
- this.rowMapping = RowMapping.create(lifecycleNewTracker.opType());
+ this.rowMapping = RowMapping.create(txn.opType());
this.perIndexWriters = indexes.stream().map(index ->
indexDescriptor.newPerColumnIndexWriter(index,
-
lifecycleNewTracker,
+
txn,
rowMapping))
.filter(Objects::nonNull) // a null here
means the column had no data to flush
.collect(Collectors.toList());
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
index 4501aec7f9..976097b084 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ClusteringComparator;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.IndexValidation;
@@ -135,10 +135,10 @@ public class IndexDescriptor
}
public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex
index,
- LifecycleNewTracker
tracker,
+ ILifecycleTransaction
txn,
RowMapping rowMapping)
{
- return version.onDiskFormat().newPerColumnIndexWriter(index, this,
tracker, rowMapping);
+ return version.onDiskFormat().newPerColumnIndexWriter(index, this,
txn, rowMapping);
}
public boolean isPerSSTableIndexBuildComplete()
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
index 30ba3b6295..a33a5f7837 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Set;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
@@ -80,19 +80,19 @@ public interface OnDiskFormat
PerSSTableIndexWriter newPerSSTableIndexWriter(IndexDescriptor
indexDescriptor) throws IOException;
/**
- * Create a new {@link PerColumnIndexWriter} to write the per-column
on-disk components of an index. The {@link LifecycleNewTracker}
+ * Create a new {@link PerColumnIndexWriter} to write the per-column
on-disk components of an index. The {@link ILifecycleTransaction}
* is used to determine the type of index write about to happen this will
either be an
* {@code OperationType.FLUSH} indicating that we are about to flush a
{@link org.apache.cassandra.index.sai.memory.MemtableIndex}
* or one of the other operation types indicating that we will be writing
from an existing SSTable
*
* @param index The {@link StorageAttachedIndex} holding the current index
build status
* @param indexDescriptor The {@link IndexDescriptor} for the SSTable
- * @param tracker The {@link LifecycleNewTracker} for index build
operation.
+ * @param txn The {@link ILifecycleTransaction} for index build operation.
* @param rowMapping The {@link RowMapping} that is used to map rowID to
{@code PrimaryKey} during the write operation
*/
PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index,
IndexDescriptor
indexDescriptor,
- LifecycleNewTracker tracker,
+ ILifecycleTransaction txn,
RowMapping rowMapping);
/**
diff --git
a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
index 8d8266ac34..d8f5d9e631 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
import com.codahale.metrics.Gauge;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
@@ -151,11 +151,11 @@ public class V1OnDiskFormat implements OnDiskFormat
@Override
public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex
index,
IndexDescriptor
indexDescriptor,
- LifecycleNewTracker
tracker,
+ ILifecycleTransaction
txn,
RowMapping rowMapping)
{
// If we're not flushing, or we haven't yet started the initialization
build, flush from SSTable contents.
- if (tracker.opType() != OperationType.FLUSH ||
!index.isInitBuildStarted())
+ if (txn.opType() != OperationType.FLUSH || !index.isInitBuildStarted())
{
NamedMemoryLimiter limiter = SEGMENT_BUILD_MEMORY_LIMITER;
logger.info(index.identifier().logMessage("Starting a compaction
index build. Global segment memory usage: {}"),
@@ -164,7 +164,7 @@ public class V1OnDiskFormat implements OnDiskFormat
return new SSTableIndexWriter(indexDescriptor, index, limiter,
index.isIndexValid());
}
- return new
MemtableIndexWriter(index.memtableIndexManager().getPendingMemtableIndex(tracker),
+ return new
MemtableIndexWriter(index.memtableIndexManager().getPendingMemtableIndex(txn),
indexDescriptor,
index.termType(),
index.identifier(),
diff --git
a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
index abd62dbff1..99bff5a1e1 100644
--- a/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
+++ b/src/java/org/apache/cassandra/index/sai/memory/MemtableIndexManager.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
@@ -119,10 +119,10 @@ public class MemtableIndexManager
}
@Nullable
- public MemtableIndex getPendingMemtableIndex(LifecycleNewTracker tracker)
+ public MemtableIndex getPendingMemtableIndex(ILifecycleTransaction txn)
{
return liveMemtableIndexMap.keySet().stream()
- .filter(m ->
tracker.equals(m.getFlushTransaction()))
+ .filter(m ->
txn.equals(m.getFlushTransaction()))
.findFirst()
.map(liveMemtableIndexMap::get)
.orElse(null);
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index ccfed7f5c9..b2989cb8d8 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.memtable.Memtable;
@@ -323,9 +323,9 @@ public class SASIIndex implements Index,
INotificationConsumer
return new SASIIndexSearcher(cfs, command,
DatabaseDescriptor.getRangeRpcTimeout(MILLISECONDS));
}
- public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker)
+ public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn)
{
- return newWriter(baseCfs.metadata().partitionKeyType, descriptor,
Collections.singletonMap(index.getDefinition(), index), tracker.opType());
+ return newWriter(baseCfs.metadata().partitionKeyType, descriptor,
Collections.singletonMap(index.getDefinition(), index), txn.opType());
}
public IndexBuildingSupport getBuildTaskSupport()
diff --git
a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
index 422c6eaa6e..1b4fa8e8fe 100644
--- a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -47,14 +47,13 @@ public class RangeAwareSSTableWriter implements
SSTableMultiWriter
private final boolean isTransient;
private final SSTableFormat<?, ?> format;
private final SerializationHeader header;
- private final LifecycleNewTracker lifecycleNewTracker;
+ private final ILifecycleTransaction txn;
private int currentIndex = -1;
public final ColumnFamilyStore cfs;
private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
- private final List<SSTableReader> finishedReaders = new ArrayList<>();
private SSTableMultiWriter currentWriter = null;
- public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys,
long repairedAt, TimeUUID pendingRepair, boolean isTransient, SSTableFormat<?,
?> format, int sstableLevel, long totalSize, LifecycleNewTracker
lifecycleNewTracker, SerializationHeader header) throws IOException
+ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys,
long repairedAt, TimeUUID pendingRepair, boolean isTransient, SSTableFormat<?,
?> format, int sstableLevel, long totalSize, ILifecycleTransaction txn,
SerializationHeader header) throws IOException
{
DiskBoundaries db = cfs.getDiskBoundaries();
directories = db.directories;
@@ -65,7 +64,7 @@ public class RangeAwareSSTableWriter implements
SSTableMultiWriter
this.pendingRepair = pendingRepair;
this.isTransient = isTransient;
this.format = format;
- this.lifecycleNewTracker = lifecycleNewTracker;
+ this.txn = txn;
this.header = header;
boundaries = db.positions;
if (boundaries == null)
@@ -75,7 +74,7 @@ public class RangeAwareSSTableWriter implements
SSTableMultiWriter
throw new IOException(String.format("Insufficient disk space
to store %s",
FBUtilities.prettyPrintMemory(totalSize)));
Descriptor desc =
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir),
format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys,
repairedAt, pendingRepair, isTransient, null, sstableLevel, header,
lifecycleNewTracker);
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys,
repairedAt, pendingRepair, isTransient, null, sstableLevel, header, txn);
}
}
@@ -97,7 +96,7 @@ public class RangeAwareSSTableWriter implements
SSTableMultiWriter
finishedWriters.add(currentWriter);
Descriptor desc =
cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)),
format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys,
repairedAt, pendingRepair, isTransient, null, sstableLevel, header,
lifecycleNewTracker);
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys,
repairedAt, pendingRepair, isTransient, null, sstableLevel, header, txn);
}
}
@@ -113,6 +112,7 @@ public class RangeAwareSSTableWriter implements
SSTableMultiWriter
if (currentWriter != null)
finishedWriters.add(currentWriter);
currentWriter = null;
+ List<SSTableReader> finishedReaders = new
ArrayList<>(finishedWriters.size());
for (SSTableMultiWriter writer : finishedWriters)
{
if (writer.getBytesWritten() > 0)
@@ -126,6 +126,12 @@ public class RangeAwareSSTableWriter implements
SSTableMultiWriter
@Override
public Collection<SSTableReader> finished()
{
+ List<SSTableReader> finishedReaders = new
ArrayList<>(finishedWriters.size());
+ for (SSTableMultiWriter writer : finishedWriters)
+ {
+ if (writer != null)
+ finishedReaders.addAll(writer.finished());
+ }
return finishedReaders;
}
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
new file mode 100644
index 0000000000..390e63858f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.Throwables;
+
+public class SSTableTxnSingleStreamWriter implements SSTableMultiWriter
+{
+ private static final Logger logger =
LoggerFactory.getLogger(SSTableTxnSingleStreamWriter.class);
+
+ private final ILifecycleTransaction txn;
+ private final SSTableMultiWriter writer;
+ private boolean complete = false;
+
+ public SSTableTxnSingleStreamWriter(ILifecycleTransaction txn,
SSTableMultiWriter writer)
+ {
+ this.txn = txn;
+ this.writer = writer;
+ }
+
+ @Override
+ public void append(UnfilteredRowIterator partition)
+ {
+ writer.append(partition);
+ }
+
+ public synchronized Collection<SSTableReader>
transferOwnershipTo(StreamingLifecycleTransaction globalTxn)
+ {
+ failIfComplete();
+ writer.setOpenResult(true);
+ writer.prepareToCommit();
+ txn.prepareToCommit();
+ globalTxn.takeOwnership(txn);
+ Throwables.maybeFail(writer.commit(txn.commit(null)));
+ complete = true;
+ return writer.finished();
+ }
+
+ @Override
+ public synchronized Throwable abort(Throwable accumulate)
+ {
+ if (complete)
+ {
+ logger.debug("Already completed writer for '{}'. Nothing to
abort.", getFilename());
+ return accumulate;
+ }
+
+ complete = true;
+ return txn.abort(writer.abort(accumulate));
+ }
+
+ @Override
+ public synchronized void close()
+ {
+ complete = true;
+ writer.close();
+ }
+
+ private void failIfComplete()
+ {
+ if (complete)
+ throw new IllegalStateException("Writer "+getFilename()+" has
already completed");
+ }
+
+ @Override
+ public String getFilename()
+ {
+ return writer.getFilename();
+ }
+
+ @Override
+ public long getBytesWritten()
+ {
+ return writer.getBytesWritten();
+ }
+
+ @Override
+ public long getOnDiskBytesWritten()
+ {
+ return writer.getOnDiskBytesWritten();
+ }
+
+ @Override
+ public TableId getTableId()
+ {
+ return writer.getTableId();
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter
should be finished via transferOwnershipTo");
+ }
+
+ @Override
+ public Collection<SSTableReader> finished()
+ {
+ throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter
should be finished via transferOwnershipTo");
+ }
+
+ @Override
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter
should be finished via transferOwnershipTo");
+ }
+
+ @Override
+ public void prepareToCommit()
+ {
+ throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter
should be finished via transferOwnershipTo");
+ }
+
+ @Override
+ public Throwable commit(Throwable accumulate)
+ {
+ throw new UnsupportedOperationException("SSTableTxnSingleStreamWriter
should be finished via transferOwnershipTo");
+ }
+
+}
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
index 46a490974e..47566ee550 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
@@ -56,12 +56,11 @@ public class SSTableZeroCopyWriter extends SSTable
implements SSTableMultiWriter
private final Map<String, ZeroCopySequentialWriter> componentWriters; //
indexed by component name
public SSTableZeroCopyWriter(Builder<?, ?> builder,
- LifecycleNewTracker lifecycleNewTracker,
+ ILifecycleTransaction txn,
SSTable.Owner owner)
{
super(builder, owner);
-
- lifecycleNewTracker.trackNew(this);
+ txn.trackNew(this);
this.componentWriters = new HashMap<>();
Set<Component> unsupported = components.stream()
diff --git
a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 99406dba31..6c7b7b6d7f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -23,7 +23,7 @@ import java.util.Collections;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -36,11 +36,11 @@ import org.apache.cassandra.utils.TimeUUID;
public class SimpleSSTableMultiWriter implements SSTableMultiWriter
{
private final SSTableWriter writer;
- private final LifecycleNewTracker lifecycleNewTracker;
+ private final ILifecycleTransaction txn;
- protected SimpleSSTableMultiWriter(SSTableWriter writer,
LifecycleNewTracker lifecycleNewTracker)
+ protected SimpleSSTableMultiWriter(SSTableWriter writer,
ILifecycleTransaction txn)
{
- this.lifecycleNewTracker = lifecycleNewTracker;
+ this.txn = txn;
this.writer = writer;
}
@@ -92,7 +92,7 @@ public class SimpleSSTableMultiWriter implements
SSTableMultiWriter
public Throwable abort(Throwable accumulate)
{
- lifecycleNewTracker.untrackNew(writer);
+ txn.untrackNew(writer);
return writer.abort(accumulate);
}
@@ -116,7 +116,7 @@ public class SimpleSSTableMultiWriter implements
SSTableMultiWriter
int sstableLevel,
SerializationHeader header,
Collection<Index.Group>
indexGroups,
- LifecycleNewTracker
lifecycleNewTracker,
+ ILifecycleTransaction txn,
SSTable.Owner owner)
{
MetadataCollector metadataCollector = new
MetadataCollector(metadata.get().comparator)
@@ -132,7 +132,7 @@ public class SimpleSSTableMultiWriter implements
SSTableMultiWriter
.setSerializationHeader(header)
.addDefaultComponents(indexGroups)
.setSecondaryIndexGroups(indexGroups)
- .build(lifecycleNewTracker, owner);
- return new SimpleSSTableMultiWriter(writer, lifecycleNewTracker);
+ .build(txn, owner);
+ return new SimpleSSTableMultiWriter(writer, txn);
}
}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index 654880c2c1..6ffaa8a7ba 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -24,7 +24,6 @@ import javax.annotation.Nonnull;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
@@ -137,7 +136,7 @@ public interface SSTableFormat<R extends SSTableReader, W
extends SSTableWriter>
/**
* Returns a new builder which can create instance of {@link
SSTableWriter} with the provided parameters.
* Similarly to the loading builder, it should open the required
resources when
- * the {@link SSTableWriter.Builder#build(LifecycleNewTracker,
SSTable.Owner)} method is called.
+ * the {@link
SSTableWriter.Builder#build(org.apache.cassandra.db.lifecycle.ILifecycleTransaction,
SSTable.Owner)} method is called.
* It should not let the caller passing any closeable resources
directly, that is, via setters.
* If building fails, all the opened resources should be released.
*/
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 38efd1955e..a3f24c0e5b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
@@ -63,7 +63,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
/**
* A root class for a writer implementation. A writer must be created by
passing an implementation-specific
- * {@link Builder}, a {@link LifecycleNewTracker} and {@link SSTable.Owner}
instances. Implementing classes should
+ * {@link Builder}, a {@link ILifecycleTransaction} and {@link SSTable.Owner}
instances. Implementing classes should
* not extend that list and all the additional properties should be included
in the builder.
*/
public abstract class SSTableWriter extends SSTable implements Transactional
@@ -80,7 +80,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
protected final List<SSTableFlushObserver> observers;
protected final MmappedRegionsCache mmappedRegionsCache;
protected final TransactionalProxy txnProxy = txnProxy();
- protected final LifecycleNewTracker lifecycleNewTracker;
+ protected final ILifecycleTransaction txn;
protected DecoratedKey first;
protected DecoratedKey last;
@@ -90,7 +90,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
*/
protected abstract TransactionalProxy txnProxy();
- protected SSTableWriter(Builder<?, ?> builder, LifecycleNewTracker
lifecycleNewTracker, SSTable.Owner owner)
+ protected SSTableWriter(Builder<?, ?> builder, ILifecycleTransaction txn,
SSTable.Owner owner)
{
super(builder, owner);
checkNotNull(builder.getIndexGroups());
@@ -104,7 +104,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
this.metadataCollector = builder.getMetadataCollector();
this.header = builder.getSerializationHeader();
this.mmappedRegionsCache = builder.getMmappedRegionsCache();
- this.lifecycleNewTracker = lifecycleNewTracker;
+ this.txn = txn;
// We need to ensure that no sstable components exist before the
lifecycle transaction starts tracking it.
// Otherwise, it means that we either want to overwrite some existing
sstable, which is not allowed, or some
@@ -116,7 +116,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
descriptor.directory,
existingComponents);
- lifecycleNewTracker.trackNew(this);
+ txn.trackNew(this);
try
{
@@ -124,7 +124,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
this.observers = Collections.unmodifiableList(observers);
for (Index.Group group : builder.getIndexGroups())
{
- SSTableFlushObserver observer =
group.getFlushObserver(descriptor, lifecycleNewTracker, metadata.getLocal());
+ SSTableFlushObserver observer =
group.getFlushObserver(descriptor, txn, metadata.getLocal());
if (observer != null)
{
observer.begin();
@@ -146,7 +146,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
* The caught exception should be then rethrown so the {@link Builder} can
handle it and close any resources opened
* implicitly by the builder.
* <p>
- * See {@link
SortedTableWriter#SortedTableWriter(SortedTableWriter.Builder,
LifecycleNewTracker, Owner)} as of CASSANDRA-18737.
+ * See {@link
SortedTableWriter#SortedTableWriter(SortedTableWriter.Builder,
ILifecycleTransaction, Owner)} as of CASSANDRA-18737.
*
* @param ex the exception thrown during the construction
*/
@@ -156,7 +156,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
for (int i = observers.size()-1; i >= 0; i--)
observers.get(i).abort(ex);
descriptor.getFormat().deleteOrphanedComponents(descriptor,
components);
- lifecycleNewTracker.untrackNew(this);
+ txn.untrackNew(this);
}
@Override
@@ -422,7 +422,7 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
/**
* A builder of this sstable writer. It should be extended for each
implementation with the specific fields.
*
- * An implementation should open all the resources when {@link
#build(LifecycleNewTracker, Owner)} and pass them
+ * An implementation should open all the resources when {@link
#build(ILifecycleTransaction, Owner)} and pass them
* in builder fields to the writer, so that the writer can access them via
getters.
*
* @param <W> type of the sstable writer to be build with this builder
@@ -557,20 +557,20 @@ public abstract class SSTableWriter extends SSTable
implements Transactional
super(descriptor);
}
- public W build(LifecycleNewTracker lifecycleNewTracker, Owner owner)
+ public W build(ILifecycleTransaction txn, Owner owner)
{
checkNotNull(getComponents());
validateRepairedMetadata(getRepairedAt(), getPendingRepair(),
isTransientSSTable());
- return buildInternal(lifecycleNewTracker, owner);
+ return buildInternal(txn, owner);
}
- protected abstract W buildInternal(LifecycleNewTracker
lifecycleNewTracker, Owner owner);
+ protected abstract W buildInternal(ILifecycleTransaction txn, Owner
owner);
- public SSTableZeroCopyWriter createZeroCopyWriter(LifecycleNewTracker
lifecycleNewTracker, Owner owner)
+ public SSTableZeroCopyWriter
createZeroCopyWriter(ILifecycleTransaction txn, Owner owner)
{
- return new SSTableZeroCopyWriter(this, lifecycleNewTracker, owner);
+ return new SSTableZeroCopyWriter(this, txn, owner);
}
}
}
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
index 6fba07ba1e..5ccaf26710 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.db.DeletionPurger;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.guardrails.Threshold;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.PartitionSerializationException;
import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
@@ -88,9 +88,9 @@ public abstract class SortedTableWriter<P extends
SortedTablePartitionWriter, I
private long lastEarlyOpenLength;
private final Supplier<Double> crcCheckChanceSupplier;
- public SortedTableWriter(Builder<P, I, ?, ?> builder, LifecycleNewTracker
lifecycleNewTracker, SSTable.Owner owner)
+ public SortedTableWriter(Builder<P, I, ?, ?> builder,
ILifecycleTransaction txn, SSTable.Owner owner)
{
- super(builder, lifecycleNewTracker, owner);
+ super(builder, txn, owner);
TableMetadataRef ref = builder.getTableMetadataRef();
crcCheckChanceSupplier = () -> ref.getLocal().params.crcCheckChance;
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 1cd63e56b1..3233ca4c06 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
@@ -74,16 +73,15 @@ public class BigTableWriter extends
SortedTableWriter<BigFormatPartitionWriter,
private final Map<DecoratedKey, AbstractRowIndexEntry> cachedKeys = new
HashMap<>();
private final boolean shouldMigrateKeyCache;
- public BigTableWriter(Builder builder, LifecycleNewTracker
lifecycleNewTracker, SSTable.Owner owner)
+ public BigTableWriter(Builder builder, ILifecycleTransaction txn,
SSTable.Owner owner)
{
- super(builder, lifecycleNewTracker, owner);
+ super(builder, txn, owner);
this.rowIndexEntrySerializer = builder.getRowIndexEntrySerializer();
checkNotNull(this.rowIndexEntrySerializer);
this.shouldMigrateKeyCache =
DatabaseDescriptor.shouldMigrateKeycacheOnCompaction()
- && lifecycleNewTracker instanceof
ILifecycleTransaction
- && !((ILifecycleTransaction)
lifecycleNewTracker).isOffline();
+ && !txn.isOffline();
}
@Override
@@ -114,7 +112,7 @@ public class BigTableWriter extends
SortedTableWriter<BigFormatPartitionWriter,
if (shouldMigrateKeyCache)
{
- for (SSTableReader reader : ((ILifecycleTransaction)
lifecycleNewTracker).originals())
+ for (SSTableReader reader : txn.originals())
{
if (reader instanceof KeyCacheSupport<?> &&
((KeyCacheSupport<?>) reader).getCachedPosition(key, false) != null)
{
@@ -431,14 +429,14 @@ public class BigTableWriter extends
SortedTableWriter<BigFormatPartitionWriter,
}
@Override
- protected BigTableWriter buildInternal(LifecycleNewTracker
lifecycleNewTracker, Owner owner)
+ protected BigTableWriter buildInternal(ILifecycleTransaction txn,
Owner owner)
{
try
{
- this.operationType = lifecycleNewTracker.opType();
+ this.operationType = txn.opType();
this.mmappedRegionsCache = new MmappedRegionsCache();
this.rowIndexEntrySerializer = new
RowIndexEntry.Serializer(descriptor.version, getSerializationHeader(), owner !=
null ? owner.getMetrics() : null);
- return new BigTableWriter(this, lifecycleNewTracker, owner);
+ return new BigTableWriter(this, txn, owner);
}
catch (RuntimeException | Error ex)
{
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java
index 7aad38511f..074c5c1708 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -65,9 +65,9 @@ public class BtiTableWriter extends
SortedTableWriter<BtiFormatPartitionWriter,
{
private static final Logger logger =
LoggerFactory.getLogger(BtiTableWriter.class);
- public BtiTableWriter(Builder builder, LifecycleNewTracker
lifecycleNewTracker, SSTable.Owner owner)
+ public BtiTableWriter(Builder builder, ILifecycleTransaction txn,
SSTable.Owner owner)
{
- super(builder, lifecycleNewTracker, owner);
+ super(builder, txn, owner);
}
@Override
@@ -371,14 +371,14 @@ public class BtiTableWriter extends
SortedTableWriter<BtiFormatPartitionWriter,
}
@Override
- protected BtiTableWriter buildInternal(LifecycleNewTracker
lifecycleNewTracker, Owner owner)
+ protected BtiTableWriter buildInternal(ILifecycleTransaction txn,
Owner owner)
{
try
{
this.mmappedRegionsCache = new MmappedRegionsCache();
- this.operationType = lifecycleNewTracker.opType();
+ this.operationType = txn.opType();
- return new BtiTableWriter(this, lifecycleNewTracker, owner);
+ return new BtiTableWriter(this, txn, owner);
}
catch (RuntimeException | Error ex)
{
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java
b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 25ab62630c..22141ef85f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -36,7 +36,7 @@ public interface IncomingStream
* Read in the stream data.
*/
void read(DataInputPlus inputPlus, int version) throws Throwable;
-
+ default Throwable abort(Throwable t) { return t; }
String getName();
long getSize();
int getNumFiles();
diff --git
a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
index 2d05322543..a9ddabb689 100644
--- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -54,9 +55,9 @@ public class StreamDeserializingTask implements Runnable
public void run()
{
StreamingDataInputPlus input = channel.in();
+ StreamMessage message = null;
try
{
- StreamMessage message;
while (null != (message = StreamMessage.deserialize(input,
messagingVersion)))
{
// keep-alives don't necessarily need to be tied to a session
(they could be arrive before or after
@@ -93,6 +94,8 @@ public class StreamDeserializingTask implements Runnable
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
+ if ((session == null || session.isFailedOrAborted()) && message
instanceof IncomingStreamMessage)
+ t = ((IncomingStreamMessage) message).stream.abort(t);
if (session != null)
{
session.onError(t);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java
new file mode 100644
index 0000000000..767689f8cc
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/IndexBuildFailsAfterStreamingTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.index.SecondaryIndexManager;
+
+import static net.bytebuddy.implementation.MethodDelegation.to;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IndexBuildFailsAfterStreamingTest extends TestBaseImpl
+{
+ @Test
+ public void test() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c ->
c.with(Feature.values())
+
.set("stream_entire_sstables", false)
+
.set("disk_failure_policy", "die"))
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (p int, c
int, v int, PRIMARY KEY(p, c))"));
+ cluster.schemaChange(withKeyspace("CREATE INDEX idx ON
%s.tbl(v)"));
+
+ for (int i = 0; i < 100; i++)
+ cluster.get(1).executeInternal(withKeyspace("insert into
%s.tbl (p, c, v) values (?, ?, ?)"), i, i, i);
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).runOnInstance(() -> BBHelper.enabled.set(true));
+
+ // pre-existing weird behaivour - nodetool repair fails, but the
sstables are actually streamed & live on node2:
+ cluster.get(2).runOnInstance(() ->
assertTrue(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty()));
+ cluster.get(1).nodetoolResult("repair",
KEYSPACE).asserts().failure();
+ cluster.get(2).runOnInstance(() ->
assertFalse(Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().isEmpty()));
+ for (int i = 0; i < 100; i++)
+ assertRows(cluster.get(2).executeInternal(withKeyspace("select
* from %s.tbl where p = ? and c = ?"), i, i), row(i, i, i));
+
+ assertRows(cluster.get(1).executeInternal("select * from
system.\"IndexInfo\" where table_name=? and index_name=?", KEYSPACE, "idx"),
row(KEYSPACE, "idx", null));
+ // index not built:
+ assertEquals(0, cluster.get(2).executeInternal("select * from
system.\"IndexInfo\" where table_name=? and index_name=?", KEYSPACE,
"idx").length);
+ }
+ }
+
+ public static class BBHelper
+ {
+ public static void install(ClassLoader classLoader, int num)
+ {
+ if (num == 2)
+ {
+ new ByteBuddy().rebase(SecondaryIndexManager.class)
+
.method(named("calculateIndexingPageSize").and(takesNoArguments()))
+ .intercept(to(BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+ public static AtomicBoolean enabled = new AtomicBoolean();
+ public static int calculateIndexingPageSize(@SuperCall
Callable<Integer> zuper) throws Exception
+ {
+ if (enabled.get())
+ throw new RuntimeException("On purpose fail 2i build");
+ return zuper.call();
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java
new file mode 100644
index 0000000000..acffd1eee1
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterReceivingTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+import static net.bytebuddy.implementation.MethodDelegation.to;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailedAfterReceivingTest extends TestBaseImpl
+{
+ @Test
+ public void zcsTest() throws IOException, ExecutionException,
InterruptedException
+ {
+ leftoverFilesTest(true);
+ }
+
+ @Test
+ public void nozcsTest() throws IOException, ExecutionException,
InterruptedException
+ {
+ leftoverFilesTest(false);
+ }
+
+ public void leftoverFilesTest(boolean zcs) throws IOException,
ExecutionException, InterruptedException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", zcs)
+
.set("autocompaction_on_startup_enabled", false)
+
.set("disk_failure_policy", "die"))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY, x int)"));
+ cluster.forEach(i ->
i.nodetoolResult("disableautocompaction").asserts().success());
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ for (int i = 1; i <= 1000000; i++)
+ {
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x)
VALUES (?,?)"), i, i);
+ if (i % 100000 == 0)
+ node1.flush(KEYSPACE);
+ }
+ node1.flush(KEYSPACE);
+ node2.runOnInstance(() -> BBHelper.enabled.set(true));
+ cluster.setUncaughtExceptionsFilter((e) ->
e.getClass().getName().contains("TransactionAlreadyCompletedException"));
+ node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE,
"tbl").asserts().failure();
+ node2.runOnInstance(() -> BBHelper.cdl.awaitUninterruptibly());
+ node2.runOnInstance(() -> BBHelper.enabled.set(false));
+ node2.shutdown().get();
+ node2.startup();
+ }
+ }
+
+
+ public static class BBHelper
+ {
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ if (num == 2)
+ {
+ // in this case we need to throw after trackNew:ing the
sstable, but before it is finished
+ new ByteBuddy().rebase(LifecycleTransaction.class)
+
.method(named("trackNew").and(takesArguments(1)))
+
.intercept(to(StreamFailedAfterReceivingTest.BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ static AtomicInteger waiting = new AtomicInteger();
+ static AtomicBoolean enabled = new AtomicBoolean();
+ static CountDownLatch cdl = CountDownLatch.newCountDownLatch(1);
+
+ public static void trackNew(SSTable sstable, @SuperCall Callable<Void>
zuper) throws Exception
+ {
+ zuper.call();
+ if (enabled.get())
+ {
+ if (waiting.incrementAndGet() > 4)
+ throw new RuntimeException();
+
+ // using a sleep instead of a horrible nesting of latches -
this should
+ // not make the test flaky, just might flakily pass without
hitting the
+ // right condition
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ cdl.decrement();
+ }
+ }
+ }
+
+ @Test
+ public void basicStreamTest() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", false)
+
.set("autocompaction_on_startup_enabled", false)
+
.set("disk_failure_policy", "die"))
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY, x int)"));
+ cluster.forEach(i ->
i.nodetoolResult("disableautocompaction").asserts().success());
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ for (int i = 1; i <= 1000; i++)
+ {
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x)
VALUES (?,?)"), i, i);
+ if (i % 100 == 0)
+ node1.flush(KEYSPACE);
+ }
+ node1.flush(KEYSPACE);
+
+ node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE,
"tbl").asserts().success();
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java
new file mode 100644
index 0000000000..b91cbf60a9
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedAfterTransferTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.util.File;
+
+import static net.bytebuddy.implementation.MethodDelegation.to;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
+
+public class StreamFailedAfterTransferTest extends TestBaseImpl
+{
+ @Test
+ public void throwAfterTransferOwnership() throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", false)
+
.set("disk_failure_policy", "die"))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY, x int) with compaction = {'class':'SizeTieredCompactionStrategy',
'enabled':'false'}"));
+
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ for (int i = 1; i <= 1000; i++)
+ {
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk, x)
VALUES (?,?)"), i, i);
+ if (i % 100 == 0)
+ node1.flush(KEYSPACE);
+ }
+ node1.flush(KEYSPACE);
+ cluster.setUncaughtExceptionsFilter((e) ->
e.getClass().getName().contains("TransactionAlreadyCompletedException"));
+ node1.nodetoolResult("repair", "-pr", "-full", KEYSPACE,
"tbl").asserts().failure();
+
+ node2.runOnInstance(() -> {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ LifecycleTransaction.waitForDeletions();
+ for (File f : cfs.getDirectories().getCFDirectories())
+ {
+ try
+ {
+ int i = 0;
+ while (f.list().length > 0 && i++ < 20)
+ {
+ Uninterruptibles.sleepUninterruptibly(1,
TimeUnit.SECONDS);
+ LifecycleTransaction.waitForDeletions();
+ }
+ File [] files = f.list();
+ assertEquals(Arrays.toString(files), 0, files.length);
+
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+ }
+
+
+
+ public static class BBHelper
+ {
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ if (num == 2)
+ {
+ // in this case we need to throw after trackNew:ing the
sstable, but before it is finished
+ new ByteBuddy().rebase(LifecycleTransaction.class)
+
.method(named("takeOwnership").and(takesArguments(1)))
+ .intercept(to(BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ static AtomicInteger cnt = new AtomicInteger();
+ public static void takeOwnership(ILifecycleTransaction txn, @SuperCall
Callable<Void> zuper) throws Exception
+ {
+ zuper.call();
+ if (cnt.incrementAndGet() > 3)
+ throw new RuntimeException();
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
index fb3cccace2..53459a4af6 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.lifecycle;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -33,6 +35,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState;
import
org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.utils.Pair;
@@ -200,6 +203,54 @@ public class LifecycleTransactionTest extends
AbstractTransactionalTest
Assert.assertTrue(failed);
}
+ @Test
+ public void testTransferAbort()
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ List<SSTableReader> readers = readers(0, 4, cfs);
+ LifecycleTransaction sharedTxn =
LifecycleTransaction.offline(OperationType.UNKNOWN);
+ LifecycleTransaction txn =
LifecycleTransaction.offline(OperationType.UNKNOWN);
+ readers.forEach(txn::trackNew);
+ txn.prepareToCommit();
+ sharedTxn.takeOwnership(txn);
+ txn.commit();
+ sharedTxn.abort();
+ assertFilesGone(readers);
+ }
+
+ private void assertFilesGone(List<SSTableReader> readers)
+ {
+ readers.forEach(s -> {
+ int i = 0;
+ while
(s.descriptor.fileFor(SSTableFormat.Components.DATA).exists() && i++ < 20)
+ {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ LifecycleTransaction.waitForDeletions();
+ }
+
Assert.assertFalse(s.descriptor.fileFor(SSTableFormat.Components.DATA).exists());
+ });
+ }
+
+ @Test
+ public void testTransferAbortEarly()
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ List<SSTableReader> readers = readers(0, 4, cfs);
+ LifecycleTransaction sharedTxn =
LifecycleTransaction.offline(OperationType.UNKNOWN);
+ LifecycleTransaction txn =
LifecycleTransaction.offline(OperationType.UNKNOWN);
+ readers.forEach(txn::trackNew);
+ txn.prepareToCommit();
+ txn.abort();
+ try
+ {
+ sharedTxn.takeOwnership(txn);
+ Assert.fail("child txn is aborted, we should not take ownership");
+ }
+ catch (Exception ignored) {}
+
+ assertFilesGone(readers);
+ }
+
private static void testBadUpdate(LifecycleTransaction txn, SSTableReader
update, boolean original)
{
boolean failed = false;
diff --git
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index abb6c2dc1b..a87878c70b 100644
---
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -40,8 +40,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -163,10 +164,11 @@ public class CassandraEntireSSTableStreamWriterTest
CassandraEntireSSTableStreamReader reader = new
CassandraEntireSSTableStreamReader(new
StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0,
0, null), header, session);
- SSTableMultiWriter sstableWriter = reader.read(new
DataInputBuffer(serializedFile.nioBuffer(), false));
- Collection<SSTableReader> newSstables = sstableWriter.finished();
-
+ SSTableTxnSingleStreamWriter sstableWriter =
(SSTableTxnSingleStreamWriter) reader.read(new
DataInputBuffer(serializedFile.nioBuffer(), false));
+ StreamingLifecycleTransaction stt = new
StreamingLifecycleTransaction();
+ Collection<SSTableReader> newSstables =
sstableWriter.transferOwnershipTo(stt);
assertEquals(1, newSstables.size());
+ stt.abort();
}
}
diff --git
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
index 55793c9cac..3f19da0ad7 100644
---
a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
+++
b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -52,10 +52,12 @@ import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.StreamingLifecycleTransaction;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.SSTableUtils;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.indexsummary.IndexSummaryManager;
@@ -236,9 +238,11 @@ public class
EntireSSTableStreamConcurrentComponentMutationTest
{
CassandraStreamHeader header =
CassandraStreamHeader.serializer.deserialize(in,
MessagingService.current_version);
CassandraEntireSSTableStreamReader reader = new
CassandraEntireSSTableStreamReader(messageHeader, header, session);
- SSTableReader streamedSSTable =
Iterables.getOnlyElement(reader.read(in).finished());
-
+ StreamingLifecycleTransaction stt = new
StreamingLifecycleTransaction();
+ SSTableTxnSingleStreamWriter writer =
(SSTableTxnSingleStreamWriter) reader.read(in);
+ SSTableReader streamedSSTable =
Iterables.getOnlyElement(writer.transferOwnershipTo(stt));
SSTableUtils.assertContentEquals(sstable, streamedSSTable);
+ stt.abort();
}
}
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 483d109ec5..d6ce95a90e 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -56,7 +56,6 @@ import
org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.db.ColumnFamilyStore.FlushReason;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.CassandraWriteContext;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -68,6 +67,7 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.WriteContext;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
@@ -1287,7 +1287,7 @@ public class CustomIndexTest extends CQLTester
}
@Override
- public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker)
+ public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn)
{
return new SSTableFlushObserver() {
@@ -1672,11 +1672,11 @@ public class CustomIndexTest extends CQLTester
}
@Override
- public SSTableFlushObserver getFlushObserver(Descriptor
descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata)
+ public SSTableFlushObserver getFlushObserver(Descriptor
descriptor, ILifecycleTransaction txn, TableMetadata tableMetadata)
{
Set<SSTableFlushObserver> observers = indexes.values()
.stream()
- .map(i ->
i.getFlushObserver(descriptor, tracker))
+ .map(i ->
i.getFlushObserver(descriptor, txn))
.filter(Objects::nonNull)
.collect(Collectors.toSet());
diff --git a/test/unit/org/apache/cassandra/index/StubIndexGroup.java
b/test/unit/org/apache/cassandra/index/StubIndexGroup.java
index 22dfbe262b..ec7f457d34 100644
--- a/test/unit/org/apache/cassandra/index/StubIndexGroup.java
+++ b/test/unit/org/apache/cassandra/index/StubIndexGroup.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.io.sstable.Component;
@@ -93,7 +93,7 @@ public class StubIndexGroup implements Index.Group
}
@Override
- public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
LifecycleNewTracker tracker, TableMetadata tableMetadata)
+ public SSTableFlushObserver getFlushObserver(Descriptor descriptor,
ILifecycleTransaction txn, TableMetadata tableMetadata)
{
return null;
}
diff --git a/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java
b/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java
index 323cd01fbb..bd8f846572 100644
--- a/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/ScrubTest.java
@@ -69,7 +69,7 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
@@ -847,9 +847,9 @@ public class ScrubTest
private static class TestMultiWriter extends SimpleSSTableMultiWriter
{
- TestMultiWriter(SSTableWriter writer, LifecycleNewTracker
lifecycleNewTracker)
+ TestMultiWriter(SSTableWriter writer, ILifecycleTransaction txn)
{
- super(writer, lifecycleNewTracker);
+ super(writer, txn);
}
}
diff --git a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java
b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java
index 0a854c0744..c156048276 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
@@ -499,7 +500,7 @@ public class StreamReaderTest
super(header, streamHeader, session);
}
- protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long
totalSize, long repairedAt, TimeUUID pendingRepair, SSTableFormat<?,?> format)
throws IOException
+ protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore
cfs, long totalSize, long repairedAt, TimeUUID pendingRepair,
SSTableFormat<?,?> format) throws IOException
{
return super.createWriter(cfs, totalSize, repairedAt,
pendingRepair, format);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]