yifan-c commented on code in PR #131:
URL: 
https://github.com/apache/cassandra-analytics/pull/131#discussion_r2220918707


##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.cdc.FourZeroMutation;
+import org.apache.cassandra.cdc.api.CassandraSource;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.cdc.api.CommitLogInstance;
+import org.apache.cassandra.cdc.api.CommitLogMarkers;
+import org.apache.cassandra.cdc.api.CommitLogReader;
+import org.apache.cassandra.cdc.api.Marker;
+import org.apache.cassandra.cdc.api.RangeTombstoneData;
+import org.apache.cassandra.cdc.api.Row;
+import org.apache.cassandra.cdc.api.TableIdLookup;
+import org.apache.cassandra.cdc.scanner.CdcSortedStreamScanner;
+import org.apache.cassandra.cdc.scanner.CdcStreamScanner;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.BufferingCommitLogReader;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentManagerCDC;
+import org.apache.cassandra.db.commitlog.FourZeroPartitionUpdateWrapper;
+import org.apache.cassandra.db.commitlog.PartitionUpdateWrapper;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.CqlType;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.TimeProvider;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CdcBridgeImplementation extends CdcBridge
+{
+    public static volatile boolean setup = false;
+
+    public static void setup(Path path, int commitLogSegmentSize, boolean 
enableCompression)
+    {
+        CassandraTypesImplementation.setup();
+        setCDC(path, commitLogSegmentSize, enableCompression);
+    }
+
+    public CdcBridgeImplementation()
+    {
+    }
+
+    protected static synchronized void setCDC(Path path, int 
commitLogSegmentSize, boolean enableCompression)
+    {
+        if (setup)
+        {
+            return;
+        }
+        Path commitLogPath = path.resolve("commitlog");
+        DatabaseDescriptor.getRawConfig().commitlog_directory = 
commitLogPath.toString();
+        DatabaseDescriptor.getRawConfig().hints_directory = 
path.resolve("hints").toString();
+        DatabaseDescriptor.getRawConfig().saved_caches_directory = 
path.resolve("saved_caches").toString();
+        DatabaseDescriptor.getRawConfig().cdc_raw_directory = 
path.resolve("cdc").toString();
+        DatabaseDescriptor.setCDCEnabled(true);
+        DatabaseDescriptor.setCDCTotalSpaceInMiB(1024); // Cassandra 4.x vs 5.x

Review Comment:
   Similar to commit log disk access mode, this config is unrelated when 
running in production, as no commit log is to be produced. 



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.spark.utils.ByteBufferUtils.split;
+
+import org.apache.cassandra.cdc.api.CassandraSource;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.reader.ComplexTypeBuffer;
+
+public class FourZeroCdcEventBuilder extends CdcEventBuilder

Review Comment:
   We should drop the version string `FourZero` from the class name. The 
Cassandra version information is expressed in the subproject name already. 
   The refactoring does not need to be addressed in this patch, as it is not 
closely related. 



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.spark.utils.ByteBufferUtils.split;
+
+import org.apache.cassandra.cdc.api.CassandraSource;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.reader.ComplexTypeBuffer;
+
+public class FourZeroCdcEventBuilder extends CdcEventBuilder
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FourZeroCdcEventBuilder.class);
+
+    private TableMetadata tableMetadata;
+    private UnfilteredRowIterator partition = null;
+
+    FourZeroCdcEventBuilder(CdcEvent.Kind kind, UnfilteredRowIterator 
partition, String trackingId, CassandraSource cassandraSource)
+    {
+        this(kind, partition.metadata().keyspace, partition.metadata().name, 
trackingId, cassandraSource);
+        this.tableMetadata = partition.metadata();
+        this.partition = partition;
+        setPartitionKeys(partition);
+        setStaticColumns(partition);
+    }
+
+    FourZeroCdcEventBuilder(CdcEvent.Kind kind, String keyspace, String table, 
String trackingId, CassandraSource cassandraSource)
+    {
+        super(kind, keyspace, table, trackingId, cassandraSource);
+        this.kind = kind;
+        this.keyspace = keyspace;
+        this.table = table;
+        this.trackingId = trackingId;
+        this.track = trackingId != null;
+        this.cassandraSource = cassandraSource;
+    }
+
+    public static FourZeroCdcEventBuilder of(CdcEvent.Kind kind,
+                                             UnfilteredRowIterator partition,
+                                             String trackingId,
+                                             CassandraSource cassandraSource)
+    {
+        return new FourZeroCdcEventBuilder(kind, partition, trackingId, 
cassandraSource);
+    }
+
+    public static CdcEvent build(CdcEvent.Kind kind,
+                                 UnfilteredRowIterator partition,
+                                 String trackingId,
+                                 CassandraSource cassandraSource)
+    {
+        return of(kind, partition, trackingId, cassandraSource).build();
+    }
+
+    public static CdcEvent build(CdcEvent.Kind kind,
+                                 UnfilteredRowIterator partition,
+                                 Row row,
+                                 String trackingId,
+                                 CassandraSource cassandraSource)
+    {
+        return of(kind, partition, trackingId, cassandraSource)
+               .withRow(row)
+               .build();
+    }
+
+    public FourZeroCdcEventBuilder withRow(Row row)
+    {
+        Preconditions.checkNotNull(partition, "Cannot build with an empty 
builder.");
+        setClusteringKeys(row, partition);
+        setValueColumns(row);
+        return this;
+    }
+
+    void setPartitionKeys(UnfilteredRowIterator partition)
+    {
+        if (kind == CdcEvent.Kind.PARTITION_DELETE)
+        {
+            
updateMaxTimestamp(partition.partitionLevelDeletion().markedForDeleteAt());
+        }
+
+        ImmutableList<ColumnMetadata> columnMetadatas = 
partition.metadata().partitionKeyColumns();
+        List<Value> pk = new ArrayList<>(columnMetadatas.size());
+
+        ByteBuffer pkbb = partition.partitionKey().getKey();
+        // single partition key
+        if (columnMetadatas.size() == 1)
+        {
+            pk.add(makeValue(pkbb, columnMetadatas.get(0)));
+        }
+        else // composite partition key
+        {
+            ByteBuffer[] pkbbs = split(pkbb, columnMetadatas.size());
+            for (int i = 0; i < columnMetadatas.size(); i++)
+            {
+                pk.add(makeValue(pkbbs[i], columnMetadatas.get(i)));
+            }
+        }
+        this.partitionKeys = pk;
+    }
+
+    void setStaticColumns(UnfilteredRowIterator partition)
+    {
+        Row staticRow = partition.staticRow();
+
+        if (staticRow.isEmpty())
+        {
+            return;
+        }
+
+        List<Value> sc = new ArrayList<>(staticRow.columnCount());
+        for (ColumnData cd : staticRow)
+        {
+            addColumn(sc, cd);
+        }
+        this.staticColumns = sc;
+    }
+
+    void setClusteringKeys(Unfiltered unfiltered, UnfilteredRowIterator 
partition)
+    {
+        ImmutableList<ColumnMetadata> columnMetadatas = 
partition.metadata().clusteringColumns();
+        if (columnMetadatas.isEmpty()) // the table has no clustering keys
+        {
+            return;
+        }
+
+        List<Value> ck = new ArrayList<>(columnMetadatas.size());
+        for (ColumnMetadata cm : columnMetadatas)
+        {
+            ByteBuffer ckbb = unfiltered.clustering().bufferAt(cm.position());
+            ck.add(makeValue(ckbb, cm));
+        }
+        this.clusteringKeys = ck;
+    }
+
+    void setValueColumns(Row row)
+    {
+        if (kind == CdcEvent.Kind.ROW_DELETE)
+        {
+            updateMaxTimestamp(row.deletion().time().markedForDeleteAt());
+            return;
+        }
+
+        // Just a sanity check. An empty row will not be added to the 
PartitionUpdate/cdc, so not really expect the case
+        if (row.isEmpty())
+        {
+            LOGGER.warn("Encountered an unexpected empty row in CDC. 
keyspace={}, table={}", keyspace, table);
+            return;
+        }
+
+        List<Value> vc = new ArrayList<>(row.columnCount());
+        for (ColumnData cd : row)
+        {
+            addColumn(vc, cd);
+        }
+        this.valueColumns = vc;
+    }
+
+    private void addColumn(List<Value> holder, ColumnData cd)
+    {
+        ColumnMetadata columnMetadata = cd.column();
+        String columnName = columnMetadata.name.toCQLString();
+        if (columnMetadata.isComplex()) // multi-cell column
+        {
+            ComplexColumnData complex = (ComplexColumnData) cd;
+            DeletionTime deletionTime = complex.complexDeletion();
+            if (deletionTime.isLive())
+            {
+                // the complex data is live, but there could be element 
deletion inside.
+                if (complex.column().type instanceof ListType)
+                {
+                    // In the case of unfrozen lists, it reads the value from 
C*
+                    readFromCassandra(holder, complex);
+                }
+                else
+                {
+                    processComplexData(holder, complex);
+                }
+            }
+            else if (complex.cellsCount() > 0)
+            {
+                // The condition, complex data is not live && cellCount > 0, 
indicates that a new value is set to the column.
+                // The CQL operation could be either insert or update the 
column.
+                // Since the new value is in the mutation already, reading 
from C* can be skipped
+                processComplexData(holder, complex);
+            }
+            else // the entire multi-cell collection/UDT is deleted.
+            {
+                kind = CdcEvent.Kind.DELETE;
+                updateMaxTimestamp(deletionTime.markedForDeleteAt());
+                holder.add(makeValue(null, complex.column()));
+            }
+        }
+        else // simple column
+        {
+            Cell<?> cell = (Cell<?>) cd;
+            updateMaxTimestamp(cell.timestamp());
+            if (cell.isTombstone())
+            {
+                holder.add(makeValue(null, cell.column()));
+            }
+            else
+            {
+                holder.add(makeValue(cell.buffer(), cell.column()));
+                if (cell.isExpiring())
+                {
+                    setTTL(cell.ttl(), 
Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime())); // Cassandra 
4.x vs 5.x

Review Comment:
   This change is concerning.
   
   In CASSANDRA-14227, the original deletion time is extended from `int` to 
`uint`, which basically doubles the TTL range by using one more bit. 
   However, `uint` interpretation is implemented specifically in Cassandra. The 
downstream cdc consumers won't be able to understand the field as `uint`. They 
will start to see negative values (treating them as signed) and break. 
   
   Could you please leave a `TODO` comment here? It is to be fixed by a 
separate patch.



##########
cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraSchema.java:
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.antlr.runtime.RecognitionException;
+import org.apache.cassandra.cql3.CQLFragmentParser;
+import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.reader.SchemaBuilder;
+import org.jetbrains.annotations.NotNull;
+
+import org.apache.cassandra.cql3.CqlParser;
+
+import org.apache.cassandra.cdc.api.TableIdLookup;
+import org.jetbrains.annotations.Nullable;
+
+public final class CassandraSchema
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraSchema.class);
+
+    private CassandraSchema()
+    {
+        throw new IllegalStateException("Do not instantiate!");
+    }
+
+    /**
+     * Update cassandra schema with synchronization
+     *
+     * @param updater updates schema
+     */
+    public static void update(Consumer<Schema> updater)
+    {
+        synchronized (Schema.instance)
+        {
+            updater.accept(Schema.instance);
+        }
+    }
+
+    /**
+     * Update cassandra schema and return a result with synchronization
+     *
+     * @param <T> type of the returned value
+     * @param updater updates schema and return a result
+     * @return a new value depending on the updater
+     */
+    public static <T> T apply(Function<Schema, T> updater)
+    {
+        synchronized (Schema.instance)
+        {
+            return updater.apply(Schema.instance);
+        }
+    }
+
+    public static Types buildTypes(String keyspace,
+                                   Set<String> udtStmts)
+    {
+        List<CreateTypeStatement.Raw> typeStatements = new 
ArrayList<>(udtStmts.size());
+        for (String udt : udtStmts)
+        {
+            try
+            {
+                typeStatements.add((CreateTypeStatement.Raw) 
CQLFragmentParser.parseAnyUnhandled(CqlParser::query, udt));
+            }
+            catch (RecognitionException e)
+            {
+                LOGGER.error("Failed to parse type expression '{}'", udt);
+                throw new IllegalStateException(e);
+            }
+        }
+        Types.RawBuilder typesBuilder = Types.rawBuilder(keyspace);
+        for (CreateTypeStatement.Raw st : typeStatements)
+        {
+            st.addToRawBuilder(typesBuilder);
+        }
+        return typesBuilder.build();
+    }
+
+    public static TableMetadata buildTableMetadata(String keyspace,
+                                                   String createStmt,
+                                                   Types types,
+                                                   Partitioner partitioner,
+                                                   @Nullable UUID tableId,
+                                                   boolean enableCdc)
+    {
+        TableMetadata.Builder builder = 
CQLFragmentParser.parseAny(CqlParser::createTableStatement, createStmt, "CREATE 
TABLE")
+                                                         .keyspace(keyspace)
+                                                         .prepare(null)
+                                                         .builder(types)
+                                                         
.partitioner(CassandraTypesImplementation.getPartitioner(partitioner));
+
+        if (tableId != null)
+        {
+            builder.id(TableId.fromUUID(tableId));
+        }
+
+        TableMetadata tableMetadata = builder.build();
+        if (tableMetadata.params.cdc == enableCdc)
+        {
+            return tableMetadata;
+        }
+        else
+        {
+            return tableMetadata.unbuild()
+                                .params(tableMetadata.params.unbuild()
+                                                            .cdc(enableCdc)
+                                                            .build())
+                                .build();
+        }
+    }
+
+    public static boolean keyspaceExists(Schema schema, String keyspace)
+    {
+        return getKeyspace(schema, keyspace).isPresent();
+    }
+
+    public static boolean tableExists(Schema schema, String keyspace, String 
table)
+    {
+        return getTable(schema, keyspace, table).isPresent();
+    }
+
+    public static Optional<Keyspace> getKeyspace(Schema schema, String 
keyspace)
+    {
+        return Optional.ofNullable(schema.getKeyspaceInstance(keyspace));
+    }
+
+    public static Optional<KeyspaceMetadata> getKeyspaceMetadata(Schema 
schema, String keyspace)
+    {
+        return getKeyspace(schema, keyspace).map(Keyspace::getMetadata);
+    }
+
+    public static Optional<TableMetadata> getTable(String keyspace, String 
table)
+    {
+        return getTable(Schema.instance, keyspace, table);
+    }
+
+    public static Optional<TableMetadata> getTable(Schema schema, String 
keyspace, String table)
+    {
+        return Optional.ofNullable(schema.getTableMetadata(keyspace, table));
+    }
+
+    public static boolean has(Schema schema, CqlTable cqlTable)
+    {
+        return has(schema, cqlTable.keyspace(), cqlTable.table());
+    }
+
+    public static boolean has(Schema schema, String keyspace, String table)
+    {
+        return keyspaceExists(schema, keyspace) && tableExists(schema, 
keyspace, table);
+    }
+
+    // cdc
+
+    public static boolean isCdcEnabled(Schema schema, CqlTable cqlTable)
+    {
+        return isCdcEnabled(schema, cqlTable.keyspace(), cqlTable.table());
+    }
+
+    public static boolean isCdcEnabled(String keyspace, String table)
+    {
+        return isCdcEnabled(Schema.instance, keyspace, table);
+    }
+
+    public static boolean isCdcEnabled(Schema schema, String keyspace, String 
table)
+    {
+        KeyspaceMetadata ks = schema.getKeyspaceMetadata(keyspace);
+        if (ks == null)
+        {
+            return false;
+        }
+        TableMetadata tb = ks.getTableOrViewNullable(table);
+        return tb != null && tb.params.cdc;
+    }
+
+    // maps keyspace -> set of table names
+    public static Map<String, Set<String>> cdcEnabledTables(Schema schema)
+    {
+        return Schema.instance.getKeyspaces()
+                              .stream()
+                              .collect(Collectors.toMap(Function.identity(),
+                                                        keyspace -> 
cdcEnabledTables(schema, keyspace)));
+    }
+
+    public static Set<String> cdcEnabledTables(Schema schema, String keyspace)
+    {
+        return Objects.requireNonNull(schema.getKeyspaceMetadata(keyspace))
+               .tables.stream()
+                      .filter(t -> t.params.cdc)
+                      .map(f -> f.name)
+                      .collect(Collectors.toSet());
+    }
+
+    public static void updateCdcSchema(@NotNull Set<CqlTable> cdcTables,
+                                       @NotNull Partitioner partitioner,
+                                       @NotNull TableIdLookup tableIdLookup)
+    {
+        updateCdcSchema(Schema.instance, cdcTables, partitioner, 
tableIdLookup);
+    }
+
+    public static void maybeUpdateSchema(Schema schema,
+                                         Partitioner partitioner,
+                                         CqlTable cqlTable,
+                                         @Nullable UUID tableId,
+                                         boolean enableCdc)
+    {
+        String keyspace = cqlTable.keyspace();
+        String table = cqlTable.table();
+        Optional<TableMetadata> currTable = getTable(schema, keyspace, table);
+        if (!currTable.isPresent())
+        {
+            throw notExistThrowable(keyspace, table);
+        }
+
+        Set<String> udts = cqlTable.udts()
+                                   .stream()
+                                   .map(f -> 
f.createStatement(CassandraTypesImplementation.INSTANCE, keyspace))
+                                   .collect(Collectors.toSet());
+        TableMetadata updatedTable = buildTableMetadata(keyspace,
+                                                        
cqlTable.createStatement(),
+                                                        buildTypes(keyspace, 
udts),
+                                                        partitioner,
+                                                        tableId != null ? 
tableId : currTable.get().id.asUUID(),
+                                                        enableCdc);
+        if (updatedTable.equals(currTable.get()))
+        {
+            // no changes
+            return;
+        }
+
+        update(s -> {
+            Optional<KeyspaceMetadata> ks = getKeyspaceMetadata(s, keyspace);
+            Optional<TableMetadata> tableOpt = getTable(s, keyspace, table);
+            if (!ks.isPresent() || !tableOpt.isPresent())
+            {
+                throw notExistThrowable(keyspace, table);
+            }
+            if (updatedTable.equals(tableOpt.get()))
+            {
+                // no changes
+                return;
+            }
+
+            LOGGER.info("Schema change detected, updating new table schema 
keyspace={} table={}", keyspace, cqlTable.table());
+            // Cassandra 4.x vs 5.x START
+            schema.transform(st -> 
st.withAddedOrUpdated(ks.get().withSwapped(ks.get().tables.withSwapped(updatedTable))));
+            // Cassandra 4.x vs 5.x END

Review Comment:
   new schema update api in 5



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.cdc.FourZeroMutation;
+import org.apache.cassandra.cdc.api.CassandraSource;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.cdc.api.CommitLogInstance;
+import org.apache.cassandra.cdc.api.CommitLogMarkers;
+import org.apache.cassandra.cdc.api.CommitLogReader;
+import org.apache.cassandra.cdc.api.Marker;
+import org.apache.cassandra.cdc.api.RangeTombstoneData;
+import org.apache.cassandra.cdc.api.Row;
+import org.apache.cassandra.cdc.api.TableIdLookup;
+import org.apache.cassandra.cdc.scanner.CdcSortedStreamScanner;
+import org.apache.cassandra.cdc.scanner.CdcStreamScanner;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.BufferingCommitLogReader;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentManagerCDC;
+import org.apache.cassandra.db.commitlog.FourZeroPartitionUpdateWrapper;
+import org.apache.cassandra.db.commitlog.PartitionUpdateWrapper;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.CqlType;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.TimeProvider;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CdcBridgeImplementation extends CdcBridge
+{
+    public static volatile boolean setup = false;
+
+    public static void setup(Path path, int commitLogSegmentSize, boolean 
enableCompression)
+    {
+        CassandraTypesImplementation.setup();
+        setCDC(path, commitLogSegmentSize, enableCompression);
+    }
+
+    public CdcBridgeImplementation()
+    {
+    }
+
+    protected static synchronized void setCDC(Path path, int 
commitLogSegmentSize, boolean enableCompression)
+    {
+        if (setup)
+        {
+            return;
+        }
+        Path commitLogPath = path.resolve("commitlog");
+        DatabaseDescriptor.getRawConfig().commitlog_directory = 
commitLogPath.toString();
+        DatabaseDescriptor.getRawConfig().hints_directory = 
path.resolve("hints").toString();
+        DatabaseDescriptor.getRawConfig().saved_caches_directory = 
path.resolve("saved_caches").toString();
+        DatabaseDescriptor.getRawConfig().cdc_raw_directory = 
path.resolve("cdc").toString();
+        DatabaseDescriptor.setCDCEnabled(true);
+        DatabaseDescriptor.setCDCTotalSpaceInMiB(1024); // Cassandra 4.x vs 5.x
+        DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+        if (enableCompression)
+        {
+            DatabaseDescriptor.setCommitLogCompression(new 
ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
+        }
+        DatabaseDescriptor.setEncryptionContext(new EncryptionContext());
+        DatabaseDescriptor.setCommitLogSyncPeriod(30);
+        DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3);
+        DatabaseDescriptor.setCommitLogSyncGroupWindow(30);
+        DatabaseDescriptor.setCommitLogSegmentSize(commitLogSegmentSize);
+        DatabaseDescriptor.getRawConfig().commitlog_total_space = new 
DataStorageSpec.IntMebibytesBound(1024);
+        
DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.direct);
 // Cassandra 4.x vs 5.x

Review Comment:
   The `CommitLogWriteDiskAccessMode` is for write commit log segments. 
Analytics code on CDC is reading the commit logs via streaming. So the option 
should not be relevant. 



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.bridge.CassandraSchema;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UpdateParameters;
+import org.apache.cassandra.cql3.functions.types.TypeCodec;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.cql3.statements.DeleteStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SchemaTransformations;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.schema.UserFunctions;
+import org.apache.cassandra.schema.Views;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JavaDriverUtils;
+
+/**
+ * Re-write of CQLSSTableWriter for writing tombstones to an SSTable for 
testing
+ * Used for testing purpose only
+ */
+@VisibleForTesting
+public final class SSTableTombstoneWriter implements Closeable
+{
+    private static final ByteBuffer UNSET_VALUE = 
ByteBufferUtil.UNSET_BYTE_BUFFER;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization(false);
+        // Partitioner is not set in client mode
+        if (DatabaseDescriptor.getPartitioner() == null)
+        {
+            
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+        }
+    }
+
+    private final AbstractSSTableSimpleWriter writer;
+    private final DeleteStatement delete;
+    private final List<ColumnSpecification> boundNames;
+    private final List<TypeCodec> typeCodecs;
+    private final ClusteringComparator comparator;
+
+    private SSTableTombstoneWriter(AbstractSSTableSimpleWriter writer,
+                                   DeleteStatement delete,
+                                   List<ColumnSpecification> boundNames,
+                                   ClusteringComparator comparator)
+    {
+        this.writer = writer;
+        this.delete = delete;
+        this.boundNames = boundNames;
+        this.typeCodecs = boundNames.stream().map(bn -> 
JavaDriverUtils.codecFor(JavaDriverUtils.driverType(bn.type)))
+                                    .collect(Collectors.toList());
+        this.comparator = comparator;
+    }
+
+    /**
+     * Returns a new builder for a SSTableTombstoneWriter
+     *
+     * @return the new builder
+     */
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    /**
+     * Adds a new row to the writer.
+     *
+     * This is a shortcut for {@code addRow(Arrays.asList(values))}.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     *               deletion statement used when creating by this writer)
+     * @throws IOException when adding a row with the given {@code values} 
fails
+     */
+    public void addRow(Object... values) throws InvalidRequestException, 
IOException
+    {
+        addRow(Arrays.asList(values));
+    }
+
+    /**
+     * Adds a new row to the writer.
+     * <p>
+     * Each provided value type should correspond to the types of the CQL 
column the value is for.
+     * The correspondence between java type and CQL type is the same one than 
the one documented at
+     * 
www.datastax.com/drivers/java/2.0/apidocs/com/datastax/driver/core/DataType.Name.html#asJavaClass().
+     * <p>
+     * If you prefer providing the values directly as binary, use
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     *               deletion statement used when creating by this writer)
+     */
+    private void addRow(List<Object> values) throws InvalidRequestException, 
IOException
+    {
+        int size = Math.min(values.size(), boundNames.size());
+        List<ByteBuffer> rawValues = new ArrayList<>(size);
+
+        for (int index = 0; index < size; index++)
+        {
+            Object value = values.get(index);
+            rawValues.add(serialize(value, typeCodecs.get(index)));
+        }
+
+        rawAddRow(rawValues);
+    }
+
+    /**
+     * Adds a new row to the writer given already serialized values.
+     *
+     * This is a shortcut for {@code rawAddRow(Arrays.asList(values))}.
+     *
+     * @param values the row values (corresponding to the bind variables of the
+     *               deletion statement used when creating by this writer) as 
binary
+     */
+    private void rawAddRow(List<ByteBuffer> values) throws 
InvalidRequestException, IOException
+    {
+        if (values.size() != boundNames.size())
+        {
+            throw new InvalidRequestException(
+                    String.format("Invalid number of arguments, expecting %d 
values but got %d",
+                                  boundNames.size(), values.size()));
+        }
+
+        QueryOptions options = QueryOptions.forInternalCalls(null, values);
+        List<ByteBuffer> keys = delete.buildPartitionKeyNames(options, 
ClientState.forInternalCalls());
+
+        long now = System.currentTimeMillis();
+        // NOTE: We ask indexes to not validate values (the last 'false' arg 
below) because that
+        //       triggers a 'Keyspace.open' and that forces a lot of 
initialization that we don't want
+        UpdateParameters params = new UpdateParameters(delete.metadata,
+                                                       delete.updatedColumns(),
+                                                       
ClientState.forInternalCalls(),
+                                                       options,
+                                                       
delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
+                                                       (int) 
TimeUnit.MILLISECONDS.toSeconds(now),
+                                                       
delete.getTimeToLive(options),
+                                                       Collections.emptyMap());
+
+        if (delete.hasSlices())
+        {
+            // Write out range tombstones
+            SortedSet<ClusteringBound<?>> startBounds = 
delete.getRestrictions().getClusteringColumnsBounds(Bound.START, options);
+            SortedSet<ClusteringBound<?>> endBounds = 
delete.getRestrictions().getClusteringColumnsBounds(Bound.END, options);
+            Slices slices = toSlices(startBounds, endBounds);
+
+            try
+            {
+                for (ByteBuffer key : keys)
+                {
+                    for (Slice slice : slices)
+                    {
+                        delete.addUpdateForKey(writer.getUpdateFor(key), 
slice, params);
+                    }
+                }
+                return;
+            }
+            catch (SSTableSimpleUnsortedWriter.SyncException exception)
+            {
+                // If we use a BufferedWriter and had a problem writing to 
disk, the IOException has been
+                // wrapped in a SyncException (see BufferedWriter below). We 
want to extract that IOException.
+                throw (IOException) exception.getCause();
+            }
+        }
+
+        SortedSet<Clustering<?>> clusterings = 
delete.createClustering(options, ClientState.forInternalCalls());
+        try
+        {
+            for (ByteBuffer key : keys)
+            {
+                for (Clustering<?> clustering : clusterings)
+                {
+                    delete.addUpdateForKey(writer.getUpdateFor(key), 
clustering, params);
+                }
+            }
+        }
+        catch (SSTableSimpleUnsortedWriter.SyncException exception)
+        {
+            // If we use a BufferedWriter and had a problem writing to disk, 
the IOException has been
+            // wrapped in a SyncException (see BufferedWriter below). We want 
to extract that IOException.
+            throw (IOException) exception.getCause();
+        }
+    }
+
+    private Slices toSlices(SortedSet<ClusteringBound<?>> startBounds, 
SortedSet<ClusteringBound<?>> endBounds)
+    {
+        assert startBounds.size() == endBounds.size();
+
+        Slices.Builder builder = new Slices.Builder(comparator);
+
+        Iterator<ClusteringBound<?>> starts = startBounds.iterator();
+        Iterator<ClusteringBound<?>> ends = endBounds.iterator();
+
+        while (starts.hasNext())
+        {
+            Slice slice = Slice.make(starts.next(), ends.next());
+            if (!slice.isEmpty(comparator))
+            {
+                builder.add(slice);
+            }
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Close this writer.
+     * <p>
+     * This method should be called, otherwise the produced SSTables are not
+     * guaranteed to be complete (and won't be in practice).
+     */
+    public void close() throws IOException
+    {
+        writer.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    private ByteBuffer serialize(Object value, TypeCodec codec)
+    {
+        if (value == null || value == UNSET_VALUE)
+        {
+            return (ByteBuffer) value;
+        }
+
+        return codec.serialize(value, ProtocolVersion.CURRENT);
+    }
+
+    /**
+     * A Builder for a SSTableTombstoneWriter object
+     */
+    public static class Builder
+    {
+        private File directory;
+
+        SSTableFormat<?, ?> formatType = null;
+
+        private CreateTableStatement.Raw schemaStatement;
+        private final List<CreateTypeStatement.Raw> typeStatements;
+        private ModificationStatement.Parsed deleteStatement;
+        private IPartitioner partitioner;
+
+        private long bufferSizeInMB = 128;
+
+        Builder()
+        {
+            this.typeStatements = new ArrayList<>();
+        }
+
+        /**
+         * The directory where to write the SSTables (mandatory option).
+         *
+         * This is a mandatory option.
+         *
+         * @param directory the directory to use, which should exists and be 
writable
+         * @return this builder
+         * @throws IllegalArgumentException if {@code directory} doesn't exist 
or is not writable
+         */
+        public Builder inDirectory(File directory)
+        {
+            if (!directory.exists())
+            {
+                throw new IllegalArgumentException(directory + " doesn't 
exists");
+            }
+            if (!directory.isWritable())
+            {
+                throw new IllegalArgumentException(directory + " exists but is 
not writable");
+            }
+
+            this.directory = directory;
+            return this;
+        }
+
+        /**
+         * The schema (CREATE TABLE statement) for the table for which SSTable 
are to be created.
+         * <p>
+         * Please note that the provided CREATE TABLE statement <b>must</b> 
use a fully-qualified
+         * table name, one that include the keyspace name.
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param schema the schema of the table for which SSTables are to be 
created
+         * @return this builder
+         * @throws IllegalArgumentException if {@code schema} is not a valid 
CREATE TABLE statement
+         *                                  or does not have a fully-qualified 
table name
+         */
+        public Builder forTable(String schema)
+        {
+            schemaStatement = QueryProcessor.parseStatement(schema, 
CreateTableStatement.Raw.class, "CREATE TABLE");
+            return this;
+        }
+
+        /**
+         * The partitioner to use.
+         * <p>
+         * By default, {@code Murmur3Partitioner} will be used. If this is not 
the partitioner used
+         * by the cluster for which the SSTables are created, you need to use 
this method to
+         * provide the correct partitioner.
+         *
+         * @param partitioner the partitioner to use
+         * @return this builder
+         */
+        public Builder withPartitioner(IPartitioner partitioner)
+        {
+            this.partitioner = partitioner;
+            return this;
+        }
+
+        /**
+         * The DELETE statement defining the values to remove for a given CQL 
row.
+         * <p>
+         * Please note that the provided DELETE statement <b>must</b> use a 
fully-qualified
+         * table name, one that include the keyspace name. Moreover, said 
statement must use
+         * bind variables since these variables will be bound to values by the 
resulting writer.
+         * <p>
+         * This is a mandatory option.
+         *
+         * @param delete a delete statement that defines the order of column 
values to use
+         * @return this builder
+         * @throws IllegalArgumentException if {@code deleteStatement} is not 
a valid deletion statement,
+         *                                  does not have a fully-qualified 
table name or have no bind variables
+         */
+        public Builder using(String delete)
+        {
+            deleteStatement = QueryProcessor.parseStatement(delete, 
ModificationStatement.Parsed.class, "DELETE");
+            return this;
+        }
+
+        /**
+         * The size of the buffer to use.
+         * <p>
+         * This defines how much data will be buffered before being written as
+         * a new SSTable. This correspond roughly to the data size that will 
have the created
+         * SSTable.
+         * <p>
+         * The default is 128MB, which should be reasonable for a 1GB heap. If 
you experience
+         * OOM while using the writer, you should lower this value.
+         *
+         * @param size the size to use in MB
+         * @return this builder
+         */
+        public Builder withBufferSizeInMB(int size)
+        {
+            bufferSizeInMB = size;
+            return this;
+        }
+
+        public SSTableTombstoneWriter build()
+        {
+            if (directory == null)
+            {
+                throw new IllegalStateException("No ouptut directory 
specified, you should provide a directory with inDirectory()");
+            }
+            if (schemaStatement == null)
+            {
+                throw new IllegalStateException("Missing schema, you should 
provide the schema for the SSTable to create with forTable()");
+            }
+            if (deleteStatement == null)
+            {
+                throw new IllegalStateException("No delete statement 
specified, you should provide a delete statement through using()");
+            }
+
+            TableMetadata tableMetadata = CassandraSchema.apply(schema -> {
+                if 
(schema.getKeyspaceMetadata(SchemaConstants.SYSTEM_KEYSPACE_NAME) == null)
+                {
+                    // Cassandra 4.x vs 5.x START
+                    
schema.transform(SchemaTransformations.addKeyspace(SystemKeyspace.metadata(), 
false));
+                    // Cassandra 4.x vs 5.x END

Review Comment:
   This and others in the file are just the 5.0 way to update schema. Looks 
fine. 



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.spark.utils.ByteBufferUtils.split;
+
+import org.apache.cassandra.cdc.api.CassandraSource;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.reader.ComplexTypeBuffer;
+
+public class FourZeroCdcEventBuilder extends CdcEventBuilder
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FourZeroCdcEventBuilder.class);
+
+    private TableMetadata tableMetadata;
+    private UnfilteredRowIterator partition = null;
+
+    FourZeroCdcEventBuilder(CdcEvent.Kind kind, UnfilteredRowIterator 
partition, String trackingId, CassandraSource cassandraSource)
+    {
+        this(kind, partition.metadata().keyspace, partition.metadata().name, 
trackingId, cassandraSource);
+        this.tableMetadata = partition.metadata();
+        this.partition = partition;
+        setPartitionKeys(partition);
+        setStaticColumns(partition);
+    }
+
+    FourZeroCdcEventBuilder(CdcEvent.Kind kind, String keyspace, String table, 
String trackingId, CassandraSource cassandraSource)
+    {
+        super(kind, keyspace, table, trackingId, cassandraSource);
+        this.kind = kind;
+        this.keyspace = keyspace;
+        this.table = table;
+        this.trackingId = trackingId;
+        this.track = trackingId != null;
+        this.cassandraSource = cassandraSource;
+    }
+
+    public static FourZeroCdcEventBuilder of(CdcEvent.Kind kind,
+                                             UnfilteredRowIterator partition,
+                                             String trackingId,
+                                             CassandraSource cassandraSource)
+    {
+        return new FourZeroCdcEventBuilder(kind, partition, trackingId, 
cassandraSource);
+    }
+
+    public static CdcEvent build(CdcEvent.Kind kind,
+                                 UnfilteredRowIterator partition,
+                                 String trackingId,
+                                 CassandraSource cassandraSource)
+    {
+        return of(kind, partition, trackingId, cassandraSource).build();
+    }
+
+    public static CdcEvent build(CdcEvent.Kind kind,
+                                 UnfilteredRowIterator partition,
+                                 Row row,
+                                 String trackingId,
+                                 CassandraSource cassandraSource)
+    {
+        return of(kind, partition, trackingId, cassandraSource)
+               .withRow(row)
+               .build();
+    }
+
+    public FourZeroCdcEventBuilder withRow(Row row)
+    {
+        Preconditions.checkNotNull(partition, "Cannot build with an empty 
builder.");
+        setClusteringKeys(row, partition);
+        setValueColumns(row);
+        return this;
+    }
+
+    void setPartitionKeys(UnfilteredRowIterator partition)
+    {
+        if (kind == CdcEvent.Kind.PARTITION_DELETE)
+        {
+            
updateMaxTimestamp(partition.partitionLevelDeletion().markedForDeleteAt());
+        }
+
+        ImmutableList<ColumnMetadata> columnMetadatas = 
partition.metadata().partitionKeyColumns();
+        List<Value> pk = new ArrayList<>(columnMetadatas.size());
+
+        ByteBuffer pkbb = partition.partitionKey().getKey();
+        // single partition key
+        if (columnMetadatas.size() == 1)
+        {
+            pk.add(makeValue(pkbb, columnMetadatas.get(0)));
+        }
+        else // composite partition key
+        {
+            ByteBuffer[] pkbbs = split(pkbb, columnMetadatas.size());
+            for (int i = 0; i < columnMetadatas.size(); i++)
+            {
+                pk.add(makeValue(pkbbs[i], columnMetadatas.get(i)));
+            }
+        }
+        this.partitionKeys = pk;
+    }
+
+    void setStaticColumns(UnfilteredRowIterator partition)
+    {
+        Row staticRow = partition.staticRow();
+
+        if (staticRow.isEmpty())
+        {
+            return;
+        }
+
+        List<Value> sc = new ArrayList<>(staticRow.columnCount());
+        for (ColumnData cd : staticRow)
+        {
+            addColumn(sc, cd);
+        }
+        this.staticColumns = sc;
+    }
+
+    void setClusteringKeys(Unfiltered unfiltered, UnfilteredRowIterator 
partition)
+    {
+        ImmutableList<ColumnMetadata> columnMetadatas = 
partition.metadata().clusteringColumns();
+        if (columnMetadatas.isEmpty()) // the table has no clustering keys
+        {
+            return;
+        }
+
+        List<Value> ck = new ArrayList<>(columnMetadatas.size());
+        for (ColumnMetadata cm : columnMetadatas)
+        {
+            ByteBuffer ckbb = unfiltered.clustering().bufferAt(cm.position());
+            ck.add(makeValue(ckbb, cm));
+        }
+        this.clusteringKeys = ck;
+    }
+
+    void setValueColumns(Row row)
+    {
+        if (kind == CdcEvent.Kind.ROW_DELETE)
+        {
+            updateMaxTimestamp(row.deletion().time().markedForDeleteAt());
+            return;
+        }
+
+        // Just a sanity check. An empty row will not be added to the 
PartitionUpdate/cdc, so not really expect the case
+        if (row.isEmpty())
+        {
+            LOGGER.warn("Encountered an unexpected empty row in CDC. 
keyspace={}, table={}", keyspace, table);
+            return;
+        }
+
+        List<Value> vc = new ArrayList<>(row.columnCount());
+        for (ColumnData cd : row)
+        {
+            addColumn(vc, cd);
+        }
+        this.valueColumns = vc;
+    }
+
+    private void addColumn(List<Value> holder, ColumnData cd)
+    {
+        ColumnMetadata columnMetadata = cd.column();
+        String columnName = columnMetadata.name.toCQLString();
+        if (columnMetadata.isComplex()) // multi-cell column
+        {
+            ComplexColumnData complex = (ComplexColumnData) cd;
+            DeletionTime deletionTime = complex.complexDeletion();
+            if (deletionTime.isLive())
+            {
+                // the complex data is live, but there could be element 
deletion inside.
+                if (complex.column().type instanceof ListType)
+                {
+                    // In the case of unfrozen lists, it reads the value from 
C*
+                    readFromCassandra(holder, complex);
+                }
+                else
+                {
+                    processComplexData(holder, complex);
+                }
+            }
+            else if (complex.cellsCount() > 0)
+            {
+                // The condition, complex data is not live && cellCount > 0, 
indicates that a new value is set to the column.
+                // The CQL operation could be either insert or update the 
column.
+                // Since the new value is in the mutation already, reading 
from C* can be skipped
+                processComplexData(holder, complex);
+            }
+            else // the entire multi-cell collection/UDT is deleted.
+            {
+                kind = CdcEvent.Kind.DELETE;
+                updateMaxTimestamp(deletionTime.markedForDeleteAt());
+                holder.add(makeValue(null, complex.column()));
+            }
+        }
+        else // simple column
+        {
+            Cell<?> cell = (Cell<?>) cd;
+            updateMaxTimestamp(cell.timestamp());
+            if (cell.isTombstone())
+            {
+                holder.add(makeValue(null, cell.column()));
+            }
+            else
+            {
+                holder.add(makeValue(cell.buffer(), cell.column()));
+                if (cell.isExpiring())
+                {
+                    setTTL(cell.ttl(), 
Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime())); // Cassandra 
4.x vs 5.x
+                }
+            }
+        }
+    }
+
+    private void processComplexData(List<Value> holder, ComplexColumnData 
complex)
+    {
+        ComplexTypeBuffer buffer = 
ComplexTypeBuffer.newBuffer(complex.column().type, complex.cellsCount());
+        boolean allTombstone = true;
+        String columnName = complex.column().name.toCQLString();
+        for (Cell<?> cell : complex)
+        {
+            updateMaxTimestamp(cell.timestamp());
+            if (cell.isTombstone())
+            {
+                kind = CdcEvent.Kind.COMPLEX_ELEMENT_DELETE;
+
+                CellPath path = cell.path();
+                if (path.size() > 0) // size can either be 0 (EmptyCellPath) 
or 1 (SingleItemCellPath).
+                {
+                    addCellTombstoneInComplex(columnName, path.get(0));
+                }
+            }
+            else // cell is alive
+            {
+                allTombstone = false;
+                buffer.addCell(cell);
+                if (cell.isExpiring())
+                {
+                    setTTL(cell.ttl(), 
Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime())); // Cassandra 
4.x vs 5.x

Review Comment:
   Same as line#245



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.LongPredicate;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionIterator;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.IOUtils;
+import org.apache.cassandra.spark.utils.TimeProvider;
+import org.apache.cassandra.utils.TimeUUID;
+import org.jetbrains.annotations.NotNull;
+
+public class CompactionStreamScanner extends AbstractStreamScanner
+{
+    private final Collection<? extends Scannable> toCompact;
+    private final TimeUUID taskId;
+
+    private PurgingCompactionController controller;
+    private AbstractCompactionStrategy.ScannerList scanners;
+    private CompactionIterator ci;
+
+    @VisibleForTesting
+    CompactionStreamScanner(@NotNull TableMetadata cfMetaData,
+                            @NotNull Partitioner partitionerType,
+                            @NotNull Collection<? extends Scannable> toCompact)
+    {
+        this(cfMetaData, partitionerType, TimeProvider.DEFAULT, toCompact);
+    }
+
+    public CompactionStreamScanner(@NotNull TableMetadata cfMetaData,
+                                   @NotNull Partitioner partitionerType,
+                                   @NotNull TimeProvider timeProvider,
+                                   @NotNull Collection<? extends Scannable> 
toCompact)
+    {
+        super(cfMetaData, partitionerType, timeProvider);
+        this.toCompact = toCompact;
+        this.taskId = TimeUUID.Generator.nextTimeUUID(); // Cassandra 4.x vs 
5.x

Review Comment:
   it is trivial change from `UUID` to `TimeUUID`. The changes is actually 
first available since 4.1. No specific logic depends on time uuid. It is used 
as basically uuid. 



##########
cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.esotericsoftware.kryo.io.Input;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentManagerStandard;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.spark.data.CassandraTypes;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.complex.CqlCollection;
+import org.apache.cassandra.spark.data.complex.CqlFrozen;
+import org.apache.cassandra.spark.data.complex.CqlUdt;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.data.types.Ascii;
+import org.apache.cassandra.spark.data.types.BigInt;
+import org.apache.cassandra.spark.data.types.Blob;
+import org.apache.cassandra.spark.data.types.Boolean;
+import org.apache.cassandra.spark.data.types.Counter;
+import org.apache.cassandra.spark.data.types.Date;
+import org.apache.cassandra.spark.data.types.Decimal;
+import org.apache.cassandra.spark.data.types.Double;
+import org.apache.cassandra.spark.data.types.Duration;
+import org.apache.cassandra.spark.data.types.Empty;
+import org.apache.cassandra.spark.data.types.Float;
+import org.apache.cassandra.spark.data.types.Inet;
+import org.apache.cassandra.spark.data.types.Int;
+import org.apache.cassandra.spark.data.types.SmallInt;
+import org.apache.cassandra.spark.data.types.Text;
+import org.apache.cassandra.spark.data.types.Time;
+import org.apache.cassandra.spark.data.types.TimeUUID;
+import org.apache.cassandra.spark.data.types.Timestamp;
+import org.apache.cassandra.spark.data.types.TinyInt;
+import org.apache.cassandra.spark.data.types.VarChar;
+import org.apache.cassandra.spark.data.types.VarInt;
+
+public class CassandraTypesImplementation extends CassandraTypes
+{
+    private static volatile boolean setup = false;
+
+    public static final CassandraTypesImplementation INSTANCE = new 
CassandraTypesImplementation();
+
+    public static synchronized void setup()
+    {
+        if (!CassandraTypesImplementation.setup)
+        {
+            // We never want to enable mbean registration in the Cassandra 
code we use so disable it here
+            
System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+            System.setProperty("cassandra.schema.force_load_local_keyspaces", 
"true"); // Cassandra 4.x vs 5.x
+            Config.setClientMode(true);
+            // When we create a TableStreamScanner, we will set the 
partitioner directly on the table metadata
+            // using the supplied IIndexStreamScanner.Partitioner. 
CFMetaData::compile requires a partitioner to
+            // be set in DatabaseDescriptor before we can do that though, so 
we set one here in preparation.
+            
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+            Config config = new Config();
+            config.memtable_flush_writers = 8;
+            config.memtable_heap_space = new 
DataStorageSpec.IntMebibytesBound(1024); // Cassandra 4.x vs 5.x
+            config.memtable_offheap_space = new 
DataStorageSpec.IntMebibytesBound(1024); // Cassandra 4.x vs 5.x

Review Comment:
   I do not think they are relevant. The values could be smaller. 



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java:
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Streams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.UnfilteredDeserializer;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.DroppedColumn;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.analytics.reader.common.RawInputStream;
+import org.apache.cassandra.spark.reader.common.SSTableStreamException;
+import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
+import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+@SuppressWarnings("unused")
+public class SSTableReader implements SparkSSTableReader, Scannable
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableReader.class);
+
+    private final TableMetadata metadata;
+    @NotNull
+    private final SSTable ssTable;
+    private final StatsMetadata statsMetadata;
+    @NotNull
+    private final Version version;
+    @NotNull
+    private final DecoratedKey first;
+    @NotNull
+    private final DecoratedKey last;
+    @NotNull
+    private final BigInteger firstToken;
+    @NotNull
+    private final BigInteger lastToken;
+    private final SerializationHeader header;
+    private final DeserializationHelper helper;
+    @NotNull
+    private final AtomicReference<SSTableStreamReader> reader = new 
AtomicReference<>(null);
+    @Nullable
+    private final SparkRangeFilter sparkRangeFilter;
+    @NotNull
+    private final List<PartitionKeyFilter> partitionKeyFilters;
+    @NotNull
+    private final Stats stats;
+    @Nullable
+    private Long startOffset = null;
+    private Long openedNanos = null;
+    @NotNull
+    private final Function<StatsMetadata, Boolean> isRepaired;
+
+    public static class Builder
+    {
+        @NotNull
+        final TableMetadata metadata;
+        @NotNull
+        final SSTable ssTable;
+        @Nullable
+        PruneColumnFilter columnFilter = null;
+        boolean readIndexOffset = true;
+        @NotNull
+        Stats stats = Stats.DoNothingStats.INSTANCE;
+        boolean useIncrementalRepair = true;
+        boolean isRepairPrimary = false;
+        Function<StatsMetadata, Boolean> isRepaired = stats -> 
stats.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+        @Nullable
+        SparkRangeFilter sparkRangeFilter = null;
+        @NotNull
+        final List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
+
+        Builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
+        {
+            this.metadata = metadata;
+            this.ssTable = ssTable;
+        }
+
+        public Builder withSparkRangeFilter(@Nullable SparkRangeFilter 
sparkRangeFilter)
+        {
+            this.sparkRangeFilter = sparkRangeFilter;
+            return this;
+        }
+
+        public Builder withPartitionKeyFilters(@Nullable 
Collection<PartitionKeyFilter> partitionKeyFilters)
+        {
+            if (partitionKeyFilters != null)
+            {
+                this.partitionKeyFilters.addAll(partitionKeyFilters);
+            }
+            return this;
+        }
+
+        public Builder withPartitionKeyFilter(@NotNull PartitionKeyFilter 
partitionKeyFilter)
+        {
+            partitionKeyFilters.add(partitionKeyFilter);
+            return this;
+        }
+
+        public Builder withColumnFilter(@Nullable PruneColumnFilter 
columnFilter)
+        {
+            this.columnFilter = columnFilter;
+            return this;
+        }
+
+        public Builder withReadIndexOffset(boolean readIndexOffset)
+        {
+            this.readIndexOffset = readIndexOffset;
+            return this;
+        }
+
+        public Builder withStats(@NotNull Stats stats)
+        {
+            this.stats = stats;
+            return this;
+        }
+
+        public Builder useIncrementalRepair(boolean useIncrementalRepair)
+        {
+            this.useIncrementalRepair = useIncrementalRepair;
+            return this;
+        }
+
+        public Builder isRepairPrimary(boolean isRepairPrimary)
+        {
+            this.isRepairPrimary = isRepairPrimary;
+            return this;
+        }
+
+        public Builder withIsRepairedFunction(Function<StatsMetadata, Boolean> 
isRepaired)
+        {
+            this.isRepaired = isRepaired;
+            return this;
+        }
+
+        public SSTableReader build() throws IOException
+        {
+            return new SSTableReader(metadata,
+                                     ssTable,
+                                     sparkRangeFilter,
+                                     partitionKeyFilters,
+                                     columnFilter,
+                                     readIndexOffset,
+                                     stats,
+                                     useIncrementalRepair,
+                                     isRepairPrimary,
+                                     isRepaired);
+        }
+    }
+
+    public static Builder builder(@NotNull TableMetadata metadata, @NotNull 
SSTable ssTable)
+    {
+        return new Builder(metadata, ssTable);
+    }
+
+    // CHECKSTYLE IGNORE: Constructor with many parameters
+    public SSTableReader(@NotNull TableMetadata metadata,
+                         @NotNull SSTable ssTable,
+                         @Nullable SparkRangeFilter sparkRangeFilter,
+                         @NotNull List<PartitionKeyFilter> partitionKeyFilters,
+                         @Nullable PruneColumnFilter columnFilter,
+                         boolean readIndexOffset,
+                         @NotNull Stats stats,
+                         boolean useIncrementalRepair,
+                         boolean isRepairPrimary,
+                         @NotNull Function<StatsMetadata, Boolean> isRepaired) 
throws IOException
+    {
+        long startTimeNanos = System.nanoTime();
+        long now;
+        this.ssTable = ssTable;
+        this.stats = stats;
+        this.isRepaired = isRepaired;
+        this.sparkRangeFilter = sparkRangeFilter;
+
+        Descriptor descriptor = 
ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable);
+        this.version = descriptor.version;
+
+        SummaryDbUtils.Summary summary = null;
+        Pair<DecoratedKey, DecoratedKey> keys = null;
+        try
+        {
+            now = System.nanoTime();
+            summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
+            stats.readSummaryDb(ssTable, System.nanoTime() - now);
+            keys = Pair.of(summary.first(), summary.last());
+        }
+        catch (IOException exception)
+        {
+            LOGGER.warn("Failed to read Summary.db file ssTable='{}'", 
ssTable, exception);
+        }
+
+        if (keys == null)
+        {
+            LOGGER.warn("Could not load first and last key from Summary.db 
file, so attempting Index.db fileName={}",
+                        ssTable.getDataFileName());
+            now = System.nanoTime();
+            keys = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable);
+            stats.readIndexDb(ssTable, System.nanoTime() - now);
+        }
+
+        if (keys == null)
+        {
+            throw new IOException("Could not load SSTable first or last 
tokens");
+        }
+
+        this.first = keys.left;
+        this.last = keys.right;
+        this.firstToken = ReaderUtils.tokenToBigInteger(first.getToken());
+        this.lastToken = ReaderUtils.tokenToBigInteger(last.getToken());
+        TokenRange readerRange = range();
+
+        List<PartitionKeyFilter> matchingKeyFilters = 
partitionKeyFilters.stream()
+                .filter(filter -> readerRange.contains(filter.token()))
+                .collect(Collectors.toList());
+        boolean overlapsSparkRange = sparkRangeFilter == null || 
SparkSSTableReader.overlaps(this, sparkRangeFilter.tokenRange());
+        if (!overlapsSparkRange  // SSTable doesn't overlap with Spark worker 
token range
+                || (matchingKeyFilters.isEmpty() && 
!partitionKeyFilters.isEmpty()))  // No matching partition key filters overlap 
with SSTable
+        {
+            this.partitionKeyFilters = Collections.emptyList();
+            stats.skippedSSTable(sparkRangeFilter, partitionKeyFilters, 
firstToken, lastToken);
+            LOGGER.info("Ignoring SSTableReader with firstToken={} 
lastToken={}, does not overlap with any filter",
+                        firstToken, lastToken);
+            statsMetadata = null;
+            header = null;
+            helper = null;
+            this.metadata = null;
+            return;
+        }
+
+        if (!matchingKeyFilters.isEmpty())
+        {
+            List<PartitionKeyFilter> matchInBloomFilter =
+                    ReaderUtils.filterKeyInBloomFilter(ssTable, 
metadata.partitioner, descriptor, matchingKeyFilters);
+            this.partitionKeyFilters = 
ImmutableList.copyOf(matchInBloomFilter);
+
+            // Check if required keys are actually present
+            if (matchInBloomFilter.isEmpty() || 
!ReaderUtils.anyFilterKeyInIndex(ssTable, matchInBloomFilter))
+            {
+                if (matchInBloomFilter.isEmpty())
+                {
+                    stats.missingInBloomFilter();
+                }
+                else
+                {
+                    stats.missingInIndex();
+                }
+                LOGGER.info("Ignoring SSTable {}, no match found in index file 
for key filters",
+                            this.ssTable.getDataFileName());
+                statsMetadata = null;
+                header = null;
+                helper = null;
+                this.metadata = null;
+                return;
+            }
+        }
+        else
+        {
+            this.partitionKeyFilters = 
ImmutableList.copyOf(partitionKeyFilters);
+        }
+
+        Map<MetadataType, MetadataComponent> componentMap = 
SSTableCache.INSTANCE.componentMapFromStats(ssTable, descriptor);
+
+        ValidationMetadata validation = (ValidationMetadata) 
componentMap.get(MetadataType.VALIDATION);
+        if (validation != null && 
!validation.partitioner.equals(metadata.partitioner.getClass().getName()))
+        {
+            throw new IllegalStateException("Partitioner in ValidationMetadata 
does not match TableMetaData: "
+                                          + validation.partitioner + " vs. " + 
metadata.partitioner.getClass().getName());
+        }
+
+        this.statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);
+        SerializationHeader.Component headerComp = 
(SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
+        if (headerComp == null)
+        {
+            throw new IOException("Cannot read SSTable if cannot deserialize 
stats header info");
+        }
+
+        if (useIncrementalRepair && !isRepairPrimary && isRepaired())
+        {
+            stats.skippedRepairedSSTable(ssTable, statsMetadata.repairedAt);
+            LOGGER.info("Ignoring repaired SSTable on non-primary repair 
replica ssTable='{}' repairedAt={}",
+                        ssTable, statsMetadata.repairedAt);
+            header = null;
+            helper = null;
+            this.metadata = null;
+            return;
+        }
+
+        Set<String> columnNames = Streams.concat(metadata.columns().stream(),
+                                                 
metadata.staticColumns().stream())
+                                         .map(column -> column.name.toString())
+                                         .collect(Collectors.toSet());
+        Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
+        droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
+                                                  metadata.name,
+                                                  ssTable,
+                                                  
headerComp.getRegularColumns(),
+                                                  columnNames,
+                                                  
ColumnMetadata.Kind.REGULAR));
+        droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
+                                                  metadata.name,
+                                                  ssTable,
+                                                  
headerComp.getStaticColumns(),
+                                                  columnNames,
+                                                  ColumnMetadata.Kind.STATIC));
+        if (!droppedColumns.isEmpty())
+        {
+            LOGGER.info("Rebuilding table metadata with dropped columns 
numDroppedColumns={} ssTable='{}'",
+                        droppedColumns.size(), ssTable);
+            metadata = 
metadata.unbuild().droppedColumns(droppedColumns).build();
+        }
+
+        this.header = headerComp.toHeader(metadata);
+        this.helper = new DeserializationHelper(metadata,
+                                                MessagingService.VERSION_30,

Review Comment:
   maybe pick the messaging version based on the sstable version? The version 
here affects the deserialization of the deletion time field. When lower than 
`MessagingService.VERSION_50`, deletion time is a signed int; when it is 
`MessagingService.VERSION_50`, deletion time is a unsigned int.



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java:
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Streams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.UnfilteredDeserializer;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.DroppedColumn;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.analytics.reader.common.RawInputStream;
+import org.apache.cassandra.spark.reader.common.SSTableStreamException;
+import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
+import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+@SuppressWarnings("unused")
+public class SSTableReader implements SparkSSTableReader, Scannable
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableReader.class);
+
+    private final TableMetadata metadata;
+    @NotNull
+    private final SSTable ssTable;
+    private final StatsMetadata statsMetadata;
+    @NotNull
+    private final Version version;
+    @NotNull
+    private final DecoratedKey first;
+    @NotNull
+    private final DecoratedKey last;
+    @NotNull
+    private final BigInteger firstToken;
+    @NotNull
+    private final BigInteger lastToken;
+    private final SerializationHeader header;
+    private final DeserializationHelper helper;
+    @NotNull
+    private final AtomicReference<SSTableStreamReader> reader = new 
AtomicReference<>(null);
+    @Nullable
+    private final SparkRangeFilter sparkRangeFilter;
+    @NotNull
+    private final List<PartitionKeyFilter> partitionKeyFilters;
+    @NotNull
+    private final Stats stats;
+    @Nullable
+    private Long startOffset = null;
+    private Long openedNanos = null;
+    @NotNull
+    private final Function<StatsMetadata, Boolean> isRepaired;
+
+    public static class Builder
+    {
+        @NotNull
+        final TableMetadata metadata;
+        @NotNull
+        final SSTable ssTable;
+        @Nullable
+        PruneColumnFilter columnFilter = null;
+        boolean readIndexOffset = true;
+        @NotNull
+        Stats stats = Stats.DoNothingStats.INSTANCE;
+        boolean useIncrementalRepair = true;
+        boolean isRepairPrimary = false;
+        Function<StatsMetadata, Boolean> isRepaired = stats -> 
stats.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+        @Nullable
+        SparkRangeFilter sparkRangeFilter = null;
+        @NotNull
+        final List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
+
+        Builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
+        {
+            this.metadata = metadata;
+            this.ssTable = ssTable;
+        }
+
+        public Builder withSparkRangeFilter(@Nullable SparkRangeFilter 
sparkRangeFilter)
+        {
+            this.sparkRangeFilter = sparkRangeFilter;
+            return this;
+        }
+
+        public Builder withPartitionKeyFilters(@Nullable 
Collection<PartitionKeyFilter> partitionKeyFilters)
+        {
+            if (partitionKeyFilters != null)
+            {
+                this.partitionKeyFilters.addAll(partitionKeyFilters);
+            }
+            return this;
+        }
+
+        public Builder withPartitionKeyFilter(@NotNull PartitionKeyFilter 
partitionKeyFilter)
+        {
+            partitionKeyFilters.add(partitionKeyFilter);
+            return this;
+        }
+
+        public Builder withColumnFilter(@Nullable PruneColumnFilter 
columnFilter)
+        {
+            this.columnFilter = columnFilter;
+            return this;
+        }
+
+        public Builder withReadIndexOffset(boolean readIndexOffset)
+        {
+            this.readIndexOffset = readIndexOffset;
+            return this;
+        }
+
+        public Builder withStats(@NotNull Stats stats)
+        {
+            this.stats = stats;
+            return this;
+        }
+
+        public Builder useIncrementalRepair(boolean useIncrementalRepair)
+        {
+            this.useIncrementalRepair = useIncrementalRepair;
+            return this;
+        }
+
+        public Builder isRepairPrimary(boolean isRepairPrimary)
+        {
+            this.isRepairPrimary = isRepairPrimary;
+            return this;
+        }
+
+        public Builder withIsRepairedFunction(Function<StatsMetadata, Boolean> 
isRepaired)
+        {
+            this.isRepaired = isRepaired;
+            return this;
+        }
+
+        public SSTableReader build() throws IOException
+        {
+            return new SSTableReader(metadata,
+                                     ssTable,
+                                     sparkRangeFilter,
+                                     partitionKeyFilters,
+                                     columnFilter,
+                                     readIndexOffset,
+                                     stats,
+                                     useIncrementalRepair,
+                                     isRepairPrimary,
+                                     isRepaired);
+        }
+    }
+
+    public static Builder builder(@NotNull TableMetadata metadata, @NotNull 
SSTable ssTable)
+    {
+        return new Builder(metadata, ssTable);
+    }
+
+    // CHECKSTYLE IGNORE: Constructor with many parameters
+    public SSTableReader(@NotNull TableMetadata metadata,
+                         @NotNull SSTable ssTable,
+                         @Nullable SparkRangeFilter sparkRangeFilter,
+                         @NotNull List<PartitionKeyFilter> partitionKeyFilters,
+                         @Nullable PruneColumnFilter columnFilter,
+                         boolean readIndexOffset,
+                         @NotNull Stats stats,
+                         boolean useIncrementalRepair,
+                         boolean isRepairPrimary,
+                         @NotNull Function<StatsMetadata, Boolean> isRepaired) 
throws IOException
+    {
+        long startTimeNanos = System.nanoTime();
+        long now;
+        this.ssTable = ssTable;
+        this.stats = stats;
+        this.isRepaired = isRepaired;
+        this.sparkRangeFilter = sparkRangeFilter;
+
+        Descriptor descriptor = 
ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable);
+        this.version = descriptor.version;
+
+        SummaryDbUtils.Summary summary = null;
+        Pair<DecoratedKey, DecoratedKey> keys = null;
+        try
+        {
+            now = System.nanoTime();
+            summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
+            stats.readSummaryDb(ssTable, System.nanoTime() - now);
+            keys = Pair.of(summary.first(), summary.last());
+        }
+        catch (IOException exception)
+        {
+            LOGGER.warn("Failed to read Summary.db file ssTable='{}'", 
ssTable, exception);
+        }
+
+        if (keys == null)
+        {
+            LOGGER.warn("Could not load first and last key from Summary.db 
file, so attempting Index.db fileName={}",
+                        ssTable.getDataFileName());
+            now = System.nanoTime();
+            keys = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable);
+            stats.readIndexDb(ssTable, System.nanoTime() - now);
+        }
+
+        if (keys == null)
+        {
+            throw new IOException("Could not load SSTable first or last 
tokens");
+        }
+
+        this.first = keys.left;
+        this.last = keys.right;
+        this.firstToken = ReaderUtils.tokenToBigInteger(first.getToken());
+        this.lastToken = ReaderUtils.tokenToBigInteger(last.getToken());
+        TokenRange readerRange = range();
+
+        List<PartitionKeyFilter> matchingKeyFilters = 
partitionKeyFilters.stream()
+                .filter(filter -> readerRange.contains(filter.token()))
+                .collect(Collectors.toList());
+        boolean overlapsSparkRange = sparkRangeFilter == null || 
SparkSSTableReader.overlaps(this, sparkRangeFilter.tokenRange());
+        if (!overlapsSparkRange  // SSTable doesn't overlap with Spark worker 
token range
+                || (matchingKeyFilters.isEmpty() && 
!partitionKeyFilters.isEmpty()))  // No matching partition key filters overlap 
with SSTable
+        {
+            this.partitionKeyFilters = Collections.emptyList();
+            stats.skippedSSTable(sparkRangeFilter, partitionKeyFilters, 
firstToken, lastToken);
+            LOGGER.info("Ignoring SSTableReader with firstToken={} 
lastToken={}, does not overlap with any filter",
+                        firstToken, lastToken);
+            statsMetadata = null;
+            header = null;
+            helper = null;
+            this.metadata = null;
+            return;
+        }
+
+        if (!matchingKeyFilters.isEmpty())
+        {
+            List<PartitionKeyFilter> matchInBloomFilter =
+                    ReaderUtils.filterKeyInBloomFilter(ssTable, 
metadata.partitioner, descriptor, matchingKeyFilters);
+            this.partitionKeyFilters = 
ImmutableList.copyOf(matchInBloomFilter);
+
+            // Check if required keys are actually present
+            if (matchInBloomFilter.isEmpty() || 
!ReaderUtils.anyFilterKeyInIndex(ssTable, matchInBloomFilter))
+            {
+                if (matchInBloomFilter.isEmpty())
+                {
+                    stats.missingInBloomFilter();
+                }
+                else
+                {
+                    stats.missingInIndex();
+                }
+                LOGGER.info("Ignoring SSTable {}, no match found in index file 
for key filters",
+                            this.ssTable.getDataFileName());
+                statsMetadata = null;
+                header = null;
+                helper = null;
+                this.metadata = null;
+                return;
+            }
+        }
+        else
+        {
+            this.partitionKeyFilters = 
ImmutableList.copyOf(partitionKeyFilters);
+        }
+
+        Map<MetadataType, MetadataComponent> componentMap = 
SSTableCache.INSTANCE.componentMapFromStats(ssTable, descriptor);
+
+        ValidationMetadata validation = (ValidationMetadata) 
componentMap.get(MetadataType.VALIDATION);
+        if (validation != null && 
!validation.partitioner.equals(metadata.partitioner.getClass().getName()))
+        {
+            throw new IllegalStateException("Partitioner in ValidationMetadata 
does not match TableMetaData: "
+                                          + validation.partitioner + " vs. " + 
metadata.partitioner.getClass().getName());
+        }
+
+        this.statsMetadata = (StatsMetadata) 
componentMap.get(MetadataType.STATS);
+        SerializationHeader.Component headerComp = 
(SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
+        if (headerComp == null)
+        {
+            throw new IOException("Cannot read SSTable if cannot deserialize 
stats header info");
+        }
+
+        if (useIncrementalRepair && !isRepairPrimary && isRepaired())
+        {
+            stats.skippedRepairedSSTable(ssTable, statsMetadata.repairedAt);
+            LOGGER.info("Ignoring repaired SSTable on non-primary repair 
replica ssTable='{}' repairedAt={}",
+                        ssTable, statsMetadata.repairedAt);
+            header = null;
+            helper = null;
+            this.metadata = null;
+            return;
+        }
+
+        Set<String> columnNames = Streams.concat(metadata.columns().stream(),
+                                                 
metadata.staticColumns().stream())
+                                         .map(column -> column.name.toString())
+                                         .collect(Collectors.toSet());
+        Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
+        droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
+                                                  metadata.name,
+                                                  ssTable,
+                                                  
headerComp.getRegularColumns(),
+                                                  columnNames,
+                                                  
ColumnMetadata.Kind.REGULAR));
+        droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
+                                                  metadata.name,
+                                                  ssTable,
+                                                  
headerComp.getStaticColumns(),
+                                                  columnNames,
+                                                  ColumnMetadata.Kind.STATIC));
+        if (!droppedColumns.isEmpty())
+        {
+            LOGGER.info("Rebuilding table metadata with dropped columns 
numDroppedColumns={} ssTable='{}'",
+                        droppedColumns.size(), ssTable);
+            metadata = 
metadata.unbuild().droppedColumns(droppedColumns).build();
+        }
+
+        this.header = headerComp.toHeader(metadata);
+        this.helper = new DeserializationHelper(metadata,
+                                                MessagingService.VERSION_30,
+                                                
DeserializationHelper.Flag.FROM_REMOTE,
+                                                buildColumnFilter(metadata, 
columnFilter));
+        this.metadata = metadata;
+
+        if (readIndexOffset && summary != null)
+        {
+            SummaryDbUtils.Summary finalSummary = summary;
+            extractRange(sparkRangeFilter, partitionKeyFilters)
+                    .ifPresent(range -> readOffsets(finalSummary.summary(), 
range));
+        }
+        else
+        {
+            LOGGER.warn("Reading SSTable without looking up start/end offset, 
performance will potentially be degraded");
+        }
+
+        // Open SSTableStreamReader so opened in parallel inside thread pool
+        // and buffered + ready to go when CompactionIterator starts reading
+        reader.set(new SSTableStreamReader());
+        stats.openedSSTable(ssTable, System.nanoTime() - startTimeNanos);
+        this.openedNanos = System.nanoTime();
+    }
+
+    private static Map<ByteBuffer, DroppedColumn> buildDroppedColumns(String 
keyspace,
+                                                                      String 
table,
+                                                                      SSTable 
ssTable,
+                                                                      
Map<ByteBuffer, AbstractType<?>> columns,
+                                                                      
Set<String> columnNames,
+                                                                      
ColumnMetadata.Kind kind)
+    {
+        Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
+        for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet())
+        {
+            String colName = UTF8Type.instance.getString((entry.getKey()));
+            if (!columnNames.contains(colName))
+            {
+                AbstractType<?> type = entry.getValue();
+                LOGGER.warn("Dropped column found colName={} sstable='{}'", 
colName, ssTable);
+                ColumnMetadata column = new ColumnMetadata(keyspace,
+                                                           table,
+                                                           
ColumnIdentifier.getInterned(colName, true),
+                                                           type,
+                                                           
ColumnMetadata.NO_POSITION,
+                                                           kind,
+                                                           null); // Cassandra 
4.x vs 5.x

Review Comment:
   Could have a `TODO` on honoring server-side column masking. 



##########
cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java:
##########
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.zip.CRC32;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.RebufferingChannelInputStream;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.spark.utils.Pair;
+import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.BloomFilterSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.TokenUtils;
+import org.apache.cassandra.utils.vint.VIntCoding;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+@SuppressWarnings("WeakerAccess")
+public final class ReaderUtils extends TokenUtils
+{
+    private static final int CHECKSUM_LENGTH = 4;  // CRC32
+    private static final Constructor<?> SERIALIZATION_HEADER =
+    
Arrays.stream(SerializationHeader.Component.class.getDeclaredConstructors())
+          .filter(constructor -> constructor.getParameterCount() == 5)
+          .findFirst()
+          .orElseThrow(() -> new RuntimeException("Could not find 
SerializationHeader.Component constructor"));
+    public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = 
ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+    public static Descriptor constructDescriptor(@NotNull String keyspace, 
@NotNull String table, @NotNull SSTable ssTable)
+    {
+        File file = ReaderUtils.constructFilename(keyspace, table, 
ssTable.getDataFileName());
+        return Descriptor.fromFile(file);
+    }
+
+    /**
+     * Constructs full file path for a given combination of keyspace, table, 
and data file name,
+     * while adjusting for data files with non-standard names prefixed with 
keyspace and table
+     *
+     * @param keyspace Name of the keyspace
+     * @param table    Name of the table
+     * @param filename Name of the data file
+     * @return A full file path, adjusted for non-standard file names
+     */
+    @VisibleForTesting
+    @NotNull
+    public static File constructFilename(@NotNull String keyspace, @NotNull 
String table, @NotNull String filename)
+    {
+        String[] components = filename.split("-");
+        if (components.length == 6
+            && components[0].equals(keyspace)
+            && components[1].equals(table))
+        {
+            filename = filename.substring(keyspace.length() + table.length() + 
2);
+        }
+
+        return new File(String.format("./%s/%s", keyspace, table), filename);
+    }
+
+    static
+    {
+        SERIALIZATION_HEADER.setAccessible(true);
+    }
+
+    private ReaderUtils()
+    {
+        super();
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    static ByteBuffer encodeCellName(TableMetadata metadata,
+                                     ClusteringPrefix clustering,
+                                     ByteBuffer columnName,
+                                     ByteBuffer collectionElement)
+    {
+        boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
+
+        if (!TableMetadata.Flag.isCompound(metadata.flags))
+        {
+            if (isStatic)
+            {
+                return columnName;
+            }
+
+            assert clustering.size() == 1 : "Expected clustering size to be 1, 
but was " + clustering.size();
+            return clustering.bufferAt(0);
+        }
+
+        // We use comparator.size() rather than clustering.size() because of 
static clusterings
+        int clusteringSize = metadata.comparator.size();
+        int size = clusteringSize + 
(TableMetadata.Flag.isDense(metadata.flags) ? 0 : 1)
+                   + (collectionElement == null ? 0 : 1);
+        if (TableMetadata.Flag.isSuper(metadata.flags))
+        {
+            size = clusteringSize + 1;
+        }
+
+        ByteBuffer[] values = new ByteBuffer[size];
+        for (int index = 0; index < clusteringSize; index++)
+        {
+            if (isStatic)
+            {
+                values[index] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                continue;
+            }
+
+            ByteBuffer value = clustering.bufferAt(index);
+            // We can have null (only for dense compound tables for backward 
compatibility reasons),
+            // but that means we're done and should stop there as far as 
building the composite is concerned
+            if (value == null)
+            {
+                return CompositeType.build(ByteBufferAccessor.instance, 
Arrays.copyOfRange(values, 0, index));
+            }
+
+            values[index] = value;
+        }
+
+        if (TableMetadata.Flag.isSuper(metadata.flags))
+        {
+            // We need to set the "column" (in thrift terms) name, i.e. the 
value corresponding to the subcomparator.
+            // What it is depends on whether this is a cell for a declared 
"static" column
+            // or a "dynamic" column part of the super-column internal map.
+            assert columnName != null;  // This should never be null for 
supercolumns, see decodeForSuperColumn() above
+            values[clusteringSize] = columnName.equals(SUPER_COLUMN_MAP_COLUMN)
+                                     ? collectionElement
+                                     : columnName;
+        }
+        else
+        {
+            if (!TableMetadata.Flag.isDense(metadata.flags))
+            {
+                values[clusteringSize] = columnName;
+            }
+            if (collectionElement != null)
+            {
+                values[clusteringSize + 1] = collectionElement;
+            }
+        }
+
+        return CompositeType.build(ByteBufferAccessor.instance, isStatic, 
values);
+    }
+
+    @Nullable
+    public static Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull 
TableMetadata metadata,
+                                                                 @NotNull 
SSTable ssTable) throws IOException
+    {
+        return keysFromIndex(metadata.partitioner, ssTable);
+    }
+
+    @Nullable
+    public static Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull 
IPartitioner partitioner,
+                                                                 @NotNull 
SSTable ssTable) throws IOException
+    {
+        try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+        {
+            if (primaryIndex != null)
+            {
+                Pair<ByteBuffer, ByteBuffer> keys = 
primaryIndexReadFirstAndLastKey(primaryIndex);
+                return Pair.of(partitioner.decorateKey(keys.left), 
partitioner.decorateKey(keys.right));
+            }
+        }
+        return null;
+    }
+
+    public static boolean anyFilterKeyInIndex(@NotNull SSTable ssTable,
+                                              @NotNull 
List<PartitionKeyFilter> filters) throws IOException
+    {
+        if (filters.isEmpty())
+        {
+            return false;
+        }
+
+        try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
+        {
+            if (primaryIndex != null)
+            {
+                return primaryIndexContainsAnyKey(primaryIndex, filters);
+            }
+        }
+
+        return true; // could not read primary index, so to be safe assume it 
contains the keys
+    }
+
+    public static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(String keyspace,
+                                                                               
 String table,
+                                                                               
 SSTable ssTable,
+                                                                               
 EnumSet<MetadataType> selectedTypes) throws IOException
+    {
+        return deserializeStatsMetadata(ssTable, selectedTypes, 
constructDescriptor(keyspace, table, ssTable));
+    }
+
+    public static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(SSTable ssTable,
+                                                                               
 Descriptor descriptor) throws IOException
+    {
+        return deserializeStatsMetadata(ssTable,
+                                        EnumSet.of(MetadataType.VALIDATION, 
MetadataType.STATS, MetadataType.HEADER),
+                                        descriptor);
+    }
+
+    public static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(SSTable ssTable,
+                                                                               
 EnumSet<MetadataType> selectedTypes,
+                                                                               
 Descriptor descriptor) throws IOException
+    {
+        try (InputStream statsStream = ssTable.openStatsStream())
+        {
+            return deserializeStatsMetadata(statsStream,
+                                            selectedTypes,
+                                            descriptor);
+        }
+    }
+
+    /**
+     * Deserialize Statistics.db file to pull out metadata components needed 
for SSTable deserialization
+     *
+     * @param is            input stream for Statistics.db file
+     * @param selectedTypes enum of MetadataType to deserialize
+     * @param descriptor    SSTable file descriptor
+     * @return map of MetadataComponent for each requested MetadataType
+     * @throws IOException
+     */
+    static Map<MetadataType, MetadataComponent> 
deserializeStatsMetadata(InputStream is,
+                                                                         
EnumSet<MetadataType> selectedTypes,
+                                                                         
Descriptor descriptor) throws IOException
+    {
+        DataInputStream in = new DataInputStream(is); // Cassandra 4.x vs 5.x

Review Comment:
   I think it should use 
`org.apache.cassandra.spark.reader.SSTableReader.DataInputStreamPlus`. 
   The `DataInputStreamPlus` can be moved out from `SSTableReader` class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to