This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 58ea38d0 CASSANALYTICS-122: Use long for absolute times and support C* 
5.0 extended localDeletionTime (#183)
58ea38d0 is described below

commit 58ea38d0dc808f5357cabb46bcc182136e95524f
Author: Shailaja Koppu <[email protected]>
AuthorDate: Thu Mar 26 18:45:30 2026 +0000

    CASSANALYTICS-122: Use long for absolute times and support C* 5.0 extended 
localDeletionTime (#183)
    
    Patch by Shailaja Koppu; Reviewed by Jyothsna Konisa, Yifan Cai for 
CASSANALYTICS-122
---
 .circleci/config.yml                               |  10 +-
 CHANGES.txt                                        |   2 +-
 .../apache/cassandra/cdc/avro/CdcEventUtils.java   |   4 +-
 .../src/main/resources/cdc_bytes.avsc              |   2 +-
 .../src/main/resources/cdc_generic_record.avsc     |   2 +-
 .../cassandra/cdc/json/JsonSerializerTests.java    |  27 +-
 cassandra-analytics-cdc/build.gradle               |   3 +
 .../cdc/AvroByteRecordTransformerTest.java         | 178 ++++++++
 .../cdc/AvroGenericRecordTransformerTest.java      | 502 +++++++++++++++++++++
 .../apache/cassandra/cdc/avro/TestSchemaStore.java |  65 +++
 .../org/apache/cassandra/cdc/msg/CdcEvent.java     |   4 +-
 .../apache/cassandra/cdc/msg/CdcEventBuilder.java  |   2 +-
 .../cassandra/spark/utils/ReaderTimeProvider.java  |   6 +-
 .../apache/cassandra/spark/utils/TimeProvider.java |  10 +-
 cassandra-analytics-core/build.gradle              |   7 +-
 .../cassandra/spark/data/CassandraDataLayer.java   |   8 +-
 .../apache/cassandra/spark/SSTableReaderTests.java |   6 +-
 .../analytics/BulkWriteComplexTypeTtlTest.java     | 195 ++++++++
 .../cassandra/cdc/msg/FourZeroCdcEventBuilder.java |   6 +-
 .../main/java/org/apache/cassandra/db/DbUtils.java |   8 +-
 .../io/sstable/SSTableTombstoneWriter.java         |   2 +-
 .../spark/reader/CompactionStreamScanner.java      |   2 +-
 .../org/apache/cassandra/spark/data/CqlType.java   |   2 +-
 .../main/java/org/apache/cassandra/db/DbUtils.java |  14 +-
 .../io/sstable/SSTableTombstoneWriter.java         |   3 +-
 .../spark/reader/AbstractStreamScanner.java        |   5 +-
 .../spark/reader/CompactionStreamScanner.java      |   6 +-
 .../cassandra/spark/data/AbstractCqlType.java      |   8 +-
 .../org/apache/cassandra/spark/data/CqlType.java   |   2 +
 .../spark/data/complex/AbstractCqlList.java        |   6 +-
 .../cassandra/spark/data/complex/CqlMap.java       |   6 +-
 .../cassandra/spark/data/complex/CqlSet.java       |   6 +-
 32 files changed, 1053 insertions(+), 56 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index ff7eb500..0e9f576f 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -64,10 +64,13 @@ commands:
             JDK_VERSION: "<<parameters.jdk>>"
             INTEGRATION_MAX_PARALLEL_FORKS: 1
             INTEGRATION_MAX_HEAP_SIZE: "1500M"
+            CORE_MAX_PARALLEL_FORKS: 2
+            CORE_TEST_MAX_HEAP_SIZE: "2048m"
             CASSANDRA_USE_JDK11: <<parameters.use_jdk11>>
           command: |
+            export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g"
             # Run compile/unit tests, skipping integration tests
-            ./gradlew --stacktrace clean assemble check -x 
cassandra-analytics-integration-tests:test 
-Dcassandra.analytics.bridges.sstable_format=<<parameters.sstable_format>>
+            ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble 
check -x cassandra-analytics-integration-tests:test 
-Dcassandra.analytics.bridges.sstable_format=<<parameters.sstable_format>>
 
   run_integration:
     parameters:
@@ -93,10 +96,11 @@ commands:
             INTEGRATION_MAX_HEAP_SIZE: "2500M"
             CASSANDRA_USE_JDK11: <<parameters.use_jdk11>>
           command: |
+            export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g"
             export DTEST_JAR="dtest-<< parameters.cassandra >>.jar"
             export CASSANDRA_VERSION=$(echo << parameters.cassandra >> | cut 
-d'.' -f 1,2)
             # Run compile but not unit tests (which are run in run_build)
-            ./gradlew --stacktrace clean assemble
+            ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble
             # Run integration tests in parallel
             cd cassandra-analytics-integration-tests/src/test/java
             # Get list of classnames of tests that should run on this node
@@ -132,7 +136,7 @@ jobs:
           name: Build dependencies for jdk11 builds
           command: |
             CASSANDRA_USE_JDK11=true ./scripts/build-dependencies.sh
-            ./gradlew codeCheckTasks
+            ./gradlew --no-daemon --max-workers=2 codeCheckTasks
       - persist_to_workspace:
           root: dependencies
           paths:
diff --git a/CHANGES.txt b/CHANGES.txt
index b2e71b6e..35afa6cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 0.4.0
 -----
- * Setup CI Pipeline with GitHub Actions (CASSANALYTICS-106)
+ * Support extended deletion time in CDC for Cassandra 5.0
  * Flush event consumer before persisting CDC state to prevent data loss on 
failure (CASSANALYTICS-126)
  * Fix ReadStatusTracker to distinguish clean completion from error 
termination in BufferingCommitLogReader (CASSANALYTICS-129)
  * Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
diff --git 
a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
 
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
index ddb1b48a..fef91ab6 100644
--- 
a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
+++ 
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
@@ -281,14 +281,14 @@ public final class CdcEventUtils
         return ttlRecord;
     }
 
-    public static Map<String, Integer> getTTL(CdcEvent event)
+    public static Map<String, Long> getTTL(CdcEvent event)
     {
         CdcEvent.TimeToLive ttl = event.getTtl();
         if (ttl == null)
         {
             return null;
         }
-        return mapOf(AvroConstants.TTL_KEY, ttl.ttlInSec, 
AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec);
+        return mapOf(AvroConstants.TTL_KEY, (long) ttl.ttlInSec, 
AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec);
     }
 
     public static UpdatedEvent getUpdatedEvent(CdcEvent event,
diff --git a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc 
b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
index 8609b7bf..55104298 100644
--- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
+++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
@@ -145,7 +145,7 @@
             },
             {
               "name": "deletedAt",
-              "type": "int",
+              "type": "long",
               "doc": "Future timestamp in seconds"
             }
           ]
diff --git 
a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc 
b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
index 9f4a3a47..4535fe1d 100644
--- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
+++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
@@ -145,7 +145,7 @@
             },
             {
               "name": "deletedAt",
-              "type": "int",
+              "type": "long",
               "doc": "Future timestamp in seconds"
             }
           ]
diff --git 
a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
 
b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
index 8a582f29..ee2aca02 100644
--- 
a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
+++ 
b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
@@ -101,7 +101,7 @@ public class JsonSerializerTests
         assertThat(root.has(AvroConstants.TTL_KEY)).isTrue();
         JsonNode ttl = root.get(AvroConstants.TTL_KEY);
         assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10);
-        
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asInt()).isEqualTo(1658269);
+        
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(1658269);
     }
 
     @Test
@@ -138,4 +138,29 @@ public class JsonSerializerTests
         InetAddress address = 
InetAddress.getByAddress(Base64.getDecoder().decode(base64Str));
         assertThat(address).isEqualTo(InetAddress.getByName("127.0.0.1"));
     }
+
+    @Test
+    public void testJsonSerializerWithLongExpirationTime() throws IOException
+    {
+        long expirationTimePastIntMax = 2_147_483_648L;
+        CdcEventBuilder eventBuilder = 
CdcEventBuilder.of(CdcEvent.Kind.INSERT, TEST_KS, TEST_TBL_BASIC);
+        eventBuilder.setPartitionKeys(listOf(Value.of(TEST_KS, "a", "int", 
TYPES.aInt().serialize(1))));
+        eventBuilder.setValueColumns(listOf(
+        Value.of(TEST_KS, "b", "int", TYPES.aInt().serialize(2))
+        ));
+        
eventBuilder.setMaxTimestampMicros(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+        eventBuilder.setTimeToLive(new CdcEvent.TimeToLive(10, 
expirationTimePastIntMax));
+
+        byte[] ar;
+        try (JsonSerializer serializer = new JsonSerializer(TYPE_LOOKUP))
+        {
+            ar = serializer.serialize("topic", eventBuilder.build());
+        }
+        assertThat(ar).isNotNull();
+
+        JsonNode root = MAPPER.readTree(ar);
+        JsonNode ttl = root.get(AvroConstants.TTL_KEY);
+        assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10);
+        
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(expirationTimePastIntMax);
+    }
 }
diff --git a/cassandra-analytics-cdc/build.gradle 
b/cassandra-analytics-cdc/build.gradle
index 6fdd7910..38c28406 100644
--- a/cassandra-analytics-cdc/build.gradle
+++ b/cassandra-analytics-cdc/build.gradle
@@ -89,6 +89,9 @@ dependencies {
     implementation 
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
 
     testImplementation project(":cassandra-analytics-common")
+    testImplementation project(":cassandra-analytics-cdc-codec")
+    testImplementation "org.apache.avro:avro:${avroVersion}"
+    testImplementation "org.apache.kafka:kafka-clients:${kafkaClientVersion}"
 
     // pull in cassandra-bridge so we can re-use TestSchema to generate 
arbitrary schemas for the cdc tests
     testImplementation project(":cassandra-bridge")
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java
new file mode 100644
index 00000000..985b3b1b
--- /dev/null
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.avro.AvroByteRecordTransformer;
+import org.apache.cassandra.cdc.avro.AvroConstants;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.avro.TestSchemaStore;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.test.CdcTestBase;
+import org.apache.cassandra.cdc.test.CdcTester;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+
+import static org.apache.cassandra.cdc.test.CdcTester.testWith;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that exercise the CDC-to-Avro byte-serialization pipeline ({@code 
cdc_bytes.avsc}),
+ */
+public class AvroByteRecordTransformerTest extends CdcTestBase
+{
+    private static final int NUM_ROWS = 50;
+
+    private CqlToAvroSchemaConverter getConverter(CassandraVersion version)
+    {
+        CqlToAvroSchemaConverter converter = 
CdcBridgeFactory.getCqlToAvroSchemaConverter(version);
+        assertThat(converter).isNotNull();
+        return converter;
+    }
+
+    /**
+     * Build the Avro byte transformer by converting the CQL table schema
+     * into an Avro schema and registering it in the test schema store.
+     */
+    private AvroByteRecordTransformer 
buildTransformer(CqlToAvroSchemaConverter converter,
+                                                        CqlTable cqlTable,
+                                                        TestSchemaStore 
schemaStore)
+    {
+        Schema avroSchema = converter.convert(cqlTable);
+        String namespace = cqlTable.keyspace() + "." + cqlTable.table();
+        schemaStore.registerSchema(namespace, avroSchema);
+
+        Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup = key -> 
bridge.parseType(key.type);
+        return new AvroByteRecordTransformer(schemaStore, typeLookup);
+    }
+
+    /**
+     * Deserialize a byte payload using the table's Avro schema from the 
schema store.
+     */
+    private GenericRecord deserializePayload(ByteBuffer payloadBytes, 
GenericDatumReader<GenericRecord> reader) throws IOException
+    {
+        byte[] bytes = new byte[payloadBytes.remaining()];
+        payloadBytes.get(bytes);
+        return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, 
null));
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testTtlDeletedAtByteAvroEncoding(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        int ttlSeconds = 3600;
+        long beforeTestEpochSec = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withColumn("c1", 
bridge.bigint())
+                                                     .withColumn("c2", 
bridge.text());
+
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            for (int i = 0; i < tester.numRows; i++)
+            {
+                TestSchema.TestRow testRow = 
CdcTester.newUniqueRow(tester.schema, rows);
+                testRow.setTTL(ttlSeconds);
+                writer.accept(testRow, 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+            }
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            long afterTestEpochSec = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+            assertThat(events).hasSize(NUM_ROWS);
+
+            TestSchemaStore schemaStore = new TestSchemaStore();
+            AvroByteRecordTransformer transformer = 
buildTransformer(converter, tableRef.get(), schemaStore);
+            String namespace = tableRef.get().keyspace() + "." + 
tableRef.get().table();
+
+            for (CdcEvent event : events)
+            {
+                GenericData.Record record = transformer.transform(event);
+                assertThat(record).isNotNull();
+
+                // Validate header-level fields
+                
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+                
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+                
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT");
+                
assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull();
+                
assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION);
+
+                // TTL record should be present
+                Object ttlField = record.get(AvroConstants.TTL_KEY);
+                assertThat(ttlField).isNotNull();
+                GenericRecord ttlRecord = (GenericRecord) ttlField;
+
+                // Validate TTL value
+                
assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds);
+
+                // Validate deletedAt is a Long (confirms long type in 
cdc_bytes.avsc)
+                Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY);
+                assertThat(deletedAt).isInstanceOf(Long.class);
+
+                // Validate deletedAt is approximately nowInSeconds + TTL
+                long deletedAtValue = (Long) deletedAt;
+                assertThat(deletedAtValue)
+                    .as("deletedAt should be approximately nowInSeconds + TTL")
+                    .isBetween(beforeTestEpochSec + ttlSeconds, 
afterTestEpochSec + ttlSeconds);
+
+                // Validate payload: bytes in the byte schema need 
deserialization to verify content
+                Object payloadObj = record.get(AvroConstants.PAYLOAD_KEY);
+                assertThat(payloadObj).isInstanceOf(ByteBuffer.class);
+                GenericRecord payloadRecord;
+                try
+                {
+                    payloadRecord = deserializePayload((ByteBuffer) payloadObj,
+                                                        
schemaStore.getReader(namespace, null));
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException("Failed to deserialize 
payload", e);
+                }
+                assertThat(payloadRecord.get("pk")).isNotNull();
+            }
+        })
+        .run();
+    }
+}
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java
new file mode 100644
index 00000000..2672e59d
--- /dev/null
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.avro.AvroConstants;
+import org.apache.cassandra.cdc.avro.AvroGenericRecordTransformer;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CdcEventUtils;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.avro.TestSchemaStore;
+import org.apache.cassandra.cdc.kafka.AvroGenericRecordSerializer;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.test.CdcTestBase;
+import org.apache.cassandra.cdc.test.CdcTester;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+
+import static 
org.apache.cassandra.cdc.test.CdcTester.newUniquePartitionDeletion;
+import static org.apache.cassandra.cdc.test.CdcTester.testWith;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that exercise the full CDC-to-Avro pipeline:
+ * write mutations -> read CDC events from commit logs -> convert to Avro 
GenericRecord -> validate.
+ */
+@SuppressWarnings("DataFlowIssue")
+public class AvroGenericRecordTransformerTest extends CdcTestBase
+{
+    private static final int NUM_ROWS = 50;
+
+    private CqlToAvroSchemaConverter getConverter(CassandraVersion version)
+    {
+        CqlToAvroSchemaConverter converter = 
CdcBridgeFactory.getCqlToAvroSchemaConverter(version);
+        assertThat(converter).isNotNull();
+        return converter;
+    }
+
+    /**
+     * Build the Avro transformer by converting the CQL table schema (already 
registered by CdcTester)
+     * into an Avro schema and registering it in the test schema store.
+     */
+    private AvroGenericRecordTransformer 
buildTransformer(CqlToAvroSchemaConverter converter,
+                                                          CqlTable cqlTable)
+    {
+        TestSchemaStore schemaStore = new TestSchemaStore();
+        Schema avroSchema = converter.convert(cqlTable);
+        String namespace = cqlTable.keyspace() + "." + cqlTable.table();
+        schemaStore.registerSchema(namespace, avroSchema);
+
+        Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup = key -> 
bridge.parseType(key.type);
+        return new AvroGenericRecordTransformer(schemaStore, typeLookup, "");
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testBasicInsertAvroEncoding(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withClusteringKey("ck", 
bridge.bigint())
+                                                     .withColumn("c1", 
bridge.bigint())
+                                                     .withColumn("c2", 
bridge.text());
+
+        // Capture CqlTable from the tester via the writer callback
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            long timestampMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+            IntStream.range(0, tester.numRows)
+                     .forEach(i -> 
writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros));
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            assertThat(events).hasSize(NUM_ROWS);
+            AvroGenericRecordTransformer transformer = 
buildTransformer(converter, tableRef.get());
+
+            for (CdcEvent event : events)
+            {
+                GenericData.Record record = transformer.transform(event);
+                assertThat(record).isNotNull();
+
+                // Validate header fields
+                
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+                
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+                
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT");
+                
assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull();
+                
assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION);
+
+                // Validate payload contains expected fields including 
clustering key
+                GenericRecord payload = (GenericRecord) 
record.get(AvroConstants.PAYLOAD_KEY);
+                assertThat(payload).isNotNull();
+                assertThat(payload.getSchema().getField("pk")).isNotNull();
+                assertThat(payload.getSchema().getField("ck")).isNotNull();
+                assertThat(payload.getSchema().getField("c1")).isNotNull();
+                assertThat(payload.getSchema().getField("c2")).isNotNull();
+                // Payload fields should have data
+                assertThat(payload.get("pk")).isNotNull();
+                assertThat(payload.get("ck")).isNotNull();
+                assertThat(payload.get("c1")).isNotNull();
+
+                // Validate updateFields lists all updated column names
+                List<String> updateFields = 
CdcEventUtils.updatedFieldNames(event);
+                assertThat(updateFields).contains("pk", "ck", "c1", "c2");
+            }
+        })
+        .run();
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testCollectionTypesAvroEncoding(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withColumn("m", 
bridge.map(bridge.text(), bridge.text()))
+                                                     .withColumn("s", 
bridge.set(bridge.aInt()))
+                                                     .withColumn("l", 
bridge.list(bridge.text()));
+
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            long timestampMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+            IntStream.range(0, tester.numRows)
+                     .forEach(i -> 
writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros));
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            assertThat(events).hasSize(NUM_ROWS);
+            AvroGenericRecordTransformer transformer = 
buildTransformer(converter, tableRef.get());
+
+            for (CdcEvent event : events)
+            {
+                GenericData.Record record = transformer.transform(event);
+                assertThat(record).isNotNull();
+
+                GenericRecord payload = (GenericRecord) 
record.get(AvroConstants.PAYLOAD_KEY);
+                assertThat(payload).isNotNull();
+                assertThat(payload.getSchema().getField("m")).isNotNull();
+                assertThat(payload.getSchema().getField("s")).isNotNull();
+                assertThat(payload.getSchema().getField("l")).isNotNull();
+            }
+        })
+        .run();
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testAvroSerializeDeserializeRoundTrip(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withColumn("c1", 
bridge.bigint())
+                                                     .withColumn("c2", 
bridge.text());
+
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            long timestampMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+            IntStream.range(0, tester.numRows)
+                     .forEach(i -> 
writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros));
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            assertThat(events).hasSize(NUM_ROWS);
+            TestSchemaStore schemaStore = new TestSchemaStore();
+            CqlTable cqlTable = tableRef.get();
+            Schema avroSchema = converter.convert(cqlTable);
+            String namespace = cqlTable.keyspace() + "." + cqlTable.table();
+            schemaStore.registerSchema(namespace, avroSchema);
+
+            Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup = key -> 
bridge.parseType(key.type);
+            AvroGenericRecordSerializer serializer = new 
AvroGenericRecordSerializer(schemaStore, typeLookup, "");
+
+            for (CdcEvent event : events)
+            {
+                // Serialize: CdcEvent -> bytes
+                byte[] bytes = serializer.serialize("test-topic", event);
+                assertThat(bytes).isNotNull();
+                assertThat(bytes.length).isGreaterThan(0);
+
+                // Get the Avro record's schema for deserialization
+                GenericData.Record record = 
serializer.getTransformer().transform(event);
+                Schema recordSchema = record.getSchema();
+
+                // Deserialize: bytes -> CdcEnvelope
+                org.apache.cassandra.cdc.avro.msg.CdcEnvelope envelope =
+                    serializer.deserializer().deserialize(event.keyspace, 
event.table, bytes, recordSchema);
+                assertThat(envelope).isNotNull();
+                assertThat(envelope.header).isNotNull();
+                assertThat(envelope.payload).isNotNull();
+
+                // Validate round-trip preserves operation type and table info
+                
assertThat(envelope.header.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+                
assertThat(envelope.header.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+                
assertThat(envelope.header.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT");
+
+                // Validate payload fields survived round-trip
+                assertThat(envelope.payload.get("pk")).isNotNull();
+                assertThat(envelope.payload.get("c1")).isNotNull();
+            }
+        })
+        .run();
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testDeleteEventAvroEncoding(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withColumn("c1", 
bridge.bigint())
+                                                     .withColumn("c2", 
bridge.text());
+
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            for (int i = 0; i < tester.numRows; i++)
+            {
+                TestSchema.TestRow testRow = 
CdcTester.newUniqueRow(tester.schema, rows);
+                testRow = testRow.copy("c1", 
org.apache.cassandra.bridge.CdcBridge.UNSET_MARKER);
+                testRow = testRow.copy("c2", null);
+                writer.accept(testRow, 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+            }
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            assertThat(events).hasSize(NUM_ROWS);
+            AvroGenericRecordTransformer transformer = 
buildTransformer(converter, tableRef.get());
+
+            for (CdcEvent event : events)
+            {
+                GenericData.Record record = transformer.transform(event);
+                assertThat(record).isNotNull();
+
+                // Verify the Avro record correctly encodes the operation type 
from the event
+                CdcEventUtils.OperationType opType = 
CdcEventUtils.getOperationType(event);
+                
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo(opType.name());
+                
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+                
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+
+                GenericRecord payload = (GenericRecord) 
record.get(AvroConstants.PAYLOAD_KEY);
+                assertThat(payload).isNotNull();
+                // pk should still be present
+                assertThat(payload.get("pk")).isNotNull();
+                // c2 is deleted (null in the payload)
+                assertThat(payload.get("c2")).isNull();
+            }
+        })
+        .run();
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testMaxSupportedTtlAvroEncoding(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        // Use a large TTL value (~20 years). Safe for both 4.0 and 5.0.
+        int largeTtl = 20 * 365 * 24 * 3600;
+        long beforeTestEpochSec = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withColumn("c1", 
bridge.aInt());
+
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            for (int i = 0; i < tester.numRows; i++)
+            {
+                TestSchema.TestRow testRow = 
CdcTester.newUniqueRow(tester.schema, rows);
+                testRow.setTTL(largeTtl);
+                writer.accept(testRow, 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+            }
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            long afterTestEpochSec = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+            assertThat(events).hasSize(NUM_ROWS);
+            AvroGenericRecordTransformer transformer = 
buildTransformer(converter, tableRef.get());
+
+            for (CdcEvent event : events)
+            {
+                // Validate CdcEvent-level TTL
+                assertThat(event.getTtl()).isNotNull();
+                assertThat(event.getTtl().ttlInSec).isEqualTo(largeTtl);
+                long expirationTime = event.getTtl().expirationTimeInSec;
+                long expectedLower = beforeTestEpochSec + largeTtl;
+                long expectedUpper = afterTestEpochSec + largeTtl;
+                if (expirationTime <= Integer.MAX_VALUE && expectedLower > 
Integer.MAX_VALUE)
+                {
+                    // Cassandra 4.0 caps the value at Integer.MAX_VALUE
+                    assertThat(expirationTime)
+                        .as("expirationTimeInSec should be capped near 
Integer.MAX_VALUE on Cassandra 4.0")
+                        .isBetween((long) Integer.MAX_VALUE - 1, (long) 
Integer.MAX_VALUE);
+                }
+                else
+                {
+                    assertThat(expirationTime)
+                        .as("expirationTimeInSec should be approximately 
nowInSeconds + largeTtl")
+                        .isBetween(expectedLower, expectedUpper);
+                }
+
+                // Validate Avro-level TTL encoding
+                GenericData.Record record = transformer.transform(event);
+                assertThat(record).isNotNull();
+
+                Object ttlField = record.get(AvroConstants.TTL_KEY);
+                assertThat(ttlField).isNotNull();
+                GenericRecord ttlRecord = (GenericRecord) ttlField;
+                
assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(largeTtl);
+
+                Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY);
+                assertThat(deletedAt).isInstanceOf(Long.class);
+            }
+        })
+        .run();
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testPartitionDeleteAvroEncoding(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        long beforeTestMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withColumn("c1", 
bridge.aInt());
+
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            for (int i = 0; i < tester.numRows; i++)
+            {
+                TestSchema.TestRow testRow = 
newUniquePartitionDeletion(tester.schema, rows);
+                writer.accept(testRow, 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+            }
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            long afterTestMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+            assertThat(events).hasSize(NUM_ROWS);
+            AvroGenericRecordTransformer transformer = 
buildTransformer(converter, tableRef.get());
+
+            for (CdcEvent event : events)
+            {
+                // Validate CdcEvent-level partition delete
+                
assertThat(event.getKind()).isEqualTo(CdcEvent.Kind.PARTITION_DELETE);
+                long eventTimestampMicros = 
event.getTimestamp(TimeUnit.MICROSECONDS);
+                assertThat(eventTimestampMicros)
+                    .as("deletion timestamp should be within the test time 
window")
+                    .isBetween(beforeTestMicros, afterTestMicros);
+
+                // Validate Avro-level encoding
+                GenericData.Record record = transformer.transform(event);
+                assertThat(record).isNotNull();
+                
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("DELETE_PARTITION");
+                
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+                
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+            }
+        })
+        .run();
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+    public void testMixedTtlAndNonTtlAvroEncoding(CassandraVersion version)
+    {
+        AvroSchemas.registerLogicalTypes();
+        CqlToAvroSchemaConverter converter = getConverter(version);
+
+        int ttlSeconds = 3600;
+        long beforeTestEpochSec = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+
+        TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+                                                     .withPartitionKey("pk", 
bridge.uuid())
+                                                     .withColumn("c1", 
bridge.aInt());
+
+        AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+        testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+        .withNumRows(NUM_ROWS)
+        .clearWriters()
+        .withWriter((tester, rows, writer) -> {
+            tableRef.set(tester.cqlTable);
+            for (int i = 0; i < tester.numRows; i++)
+            {
+                TestSchema.TestRow testRow = 
CdcTester.newUniqueRow(tester.schema, rows);
+                if (i % 2 == 0)
+                {
+                    testRow.setTTL(ttlSeconds);
+                }
+                writer.accept(testRow, 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+            }
+        })
+        .withCdcEventChecker((testRows, events) -> {
+            long afterTestEpochSec = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+            assertThat(events).hasSize(NUM_ROWS);
+            AvroGenericRecordTransformer transformer = 
buildTransformer(converter, tableRef.get());
+
+            int withTtl = 0;
+            int withoutTtl = 0;
+            for (CdcEvent event : events)
+            {
+                GenericData.Record record = transformer.transform(event);
+                assertThat(record).isNotNull();
+
+                if (event.getTtl() != null)
+                {
+                    withTtl++;
+                    assertThat(event.getTtl().ttlInSec).isEqualTo(ttlSeconds);
+                    
assertThat(event.getTtl().expirationTimeInSec).isGreaterThan(0L);
+
+                    // Avro TTL field should be present
+                    Object ttlField = record.get(AvroConstants.TTL_KEY);
+                    assertThat(ttlField).as("Avro TTL record should be present 
for TTL row").isNotNull();
+                    GenericRecord ttlRecord = (GenericRecord) ttlField;
+                    
assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds);
+
+                    // Validate deletedAt is a Long with expected value
+                    Object deletedAt = 
ttlRecord.get(AvroConstants.DELETED_AT_KEY);
+                    assertThat(deletedAt).isInstanceOf(Long.class);
+                    long deletedAtValue = (Long) deletedAt;
+                    assertThat(deletedAtValue)
+                        .as("deletedAt should be approximately nowInSeconds + 
TTL")
+                        .isBetween(beforeTestEpochSec + ttlSeconds, 
afterTestEpochSec + ttlSeconds);
+                }
+                else
+                {
+                    withoutTtl++;
+                    // Avro TTL field should be absent
+                    assertThat(record.get(AvroConstants.TTL_KEY))
+                        .as("Avro TTL record should be null for non-TTL row")
+                        .isNull();
+                }
+            }
+            assertThat(withTtl).isEqualTo(NUM_ROWS / 2);
+            assertThat(withoutTtl).isEqualTo(NUM_ROWS / 2);
+        })
+        .run();
+    }
+}
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java
new file mode 100644
index 00000000..b9a5249a
--- /dev/null
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+
+/**
+ * In-memory {@link SchemaStore} implementation for tests.
+ * Schemas are registered manually via {@link #registerSchema(String, Schema)}.
+ */
+public class TestSchemaStore implements SchemaStore
+{
+    private final Map<String, Schema> schemas = new HashMap<>();
+    private final Map<String, GenericDatumWriter<GenericRecord>> writers = new 
HashMap<>();
+    private final Map<String, GenericDatumReader<GenericRecord>> readers = new 
HashMap<>();
+
+    public void registerSchema(String namespace, Schema schema)
+    {
+        schemas.put(namespace, schema);
+        writers.put(namespace, new GenericDatumWriter<>(schema));
+        readers.put(namespace, new GenericDatumReader<>(schema));
+    }
+
+    @Override
+    public Schema getSchema(String namespace, String name)
+    {
+        return schemas.get(namespace);
+    }
+
+    @Override
+    public GenericDatumWriter<GenericRecord> getWriter(String namespace, 
String name)
+    {
+        return writers.get(namespace);
+    }
+
+    @Override
+    public GenericDatumReader<GenericRecord> getReader(String namespace, 
String name)
+    {
+        return readers.get(namespace);
+    }
+}
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
index d3b5647c..ae3687be 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
@@ -50,9 +50,9 @@ public class CdcEvent
     public static class TimeToLive
     {
         public final int ttlInSec;
-        public final int expirationTimeInSec;
+        public final long expirationTimeInSec;
 
-        public TimeToLive(int ttlInSec, int expirationTimeInSec)
+        public TimeToLive(int ttlInSec, long expirationTimeInSec)
         {
             this.ttlInSec = ttlInSec;
             this.expirationTimeInSec = expirationTimeInSec;
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
index abf66151..60c760b0 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
@@ -107,7 +107,7 @@ public class CdcEventBuilder
         this.maxTimestampMicros = maxTimestampMicros;
     }
 
-    public void setTTL(int ttlInSec, int expirationTimeInSec)
+    public void setTTL(int ttlInSec, long expirationTimeInSec)
     {
         // Skip updating TTL if it already has been set.
         // For the same row, the upsert query can only set one TTL value.
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
index cbc17517..c6189bbb 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
@@ -24,7 +24,7 @@ package org.apache.cassandra.spark.utils;
  */
 public class ReaderTimeProvider implements TimeProvider
 {
-    private final int referenceEpochInSeconds;
+    private final long referenceEpochInSeconds;
 
     public ReaderTimeProvider()
     {
@@ -35,13 +35,13 @@ public class ReaderTimeProvider implements TimeProvider
      * Constructor used for deserialization
      * @param referenceEpochInSeconds reference epoch to set
      */
-    public ReaderTimeProvider(int referenceEpochInSeconds)
+    public ReaderTimeProvider(long referenceEpochInSeconds)
     {
         this.referenceEpochInSeconds = referenceEpochInSeconds;
     }
 
     @Override
-    public int referenceEpochInSeconds()
+    public long referenceEpochInSeconds()
     {
         return referenceEpochInSeconds;
     }
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
index 40a25125..1ea9db28 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
@@ -38,10 +38,10 @@ public interface TimeProvider
     @VisibleForTesting
     TimeProvider DEFAULT = new TimeProvider()
     {
-        private final int referenceEpochInSeconds = nowInSeconds();
+        private final long referenceEpochInSeconds = nowInSeconds();
 
         @Override
-        public int referenceEpochInSeconds()
+        public long referenceEpochInSeconds()
         {
             return referenceEpochInSeconds;
         }
@@ -50,9 +50,9 @@ public interface TimeProvider
     /**
      * @return current time in seconds
      */
-    default int nowInSeconds()
+    default long nowInSeconds()
     {
-        return (int) 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
     }
 
     /**
@@ -62,5 +62,5 @@ public interface TimeProvider
      * <p>
      * Note that the actual constant value returned is implementation dependent
      */
-    int referenceEpochInSeconds();
+    long referenceEpochInSeconds();
 }
diff --git a/cassandra-analytics-core/build.gradle 
b/cassandra-analytics-core/build.gradle
index 259c3252..ffce6660 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -182,7 +182,7 @@ tasks.register('testSequential', Test) {
 
     systemProperty "cassandra.analytics.bridges.sstable_format", 
System.getProperty("cassandra.analytics.bridges.sstable_format", "big")
     minHeapSize = '1024m'
-    maxHeapSize = '3072m'
+    maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m'
     maxParallelForks = 1
     forkEvery = 1  // Enables different end-to-end test classes use Spark 
contexts with different configurations
 
@@ -217,8 +217,9 @@ tasks.register('testSequential', Test) {
 test {
     systemProperty "cassandra.analytics.bridges.sstable_format", 
System.getProperty("cassandra.analytics.bridges.sstable_format", "big")
     minHeapSize = '1024m'
-    maxHeapSize = '3072m'
-    maxParallelForks = Math.max(Runtime.runtime.availableProcessors() * 2, 8)
+    maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m'
+    maxParallelForks = System.getenv('CORE_MAX_PARALLEL_FORKS')?.toInteger()
+                       ?: Math.max(Runtime.runtime.availableProcessors() * 2, 
8)
     forkEvery = 1  // Enables different end-to-end test classes use Spark 
contexts with different configurations
 
     // Make it so unit tests run on a Jar with Cassandra bridge 
implementations built in
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index b6e84c6f..99cbc682 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -771,7 +771,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             aliasLastModifiedTimestamp(this.requestedFeatures, 
this.lastModifiedTimestampField);
         }
         this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
-        this.timeProvider = new ReaderTimeProvider(in.readInt());
+        this.timeProvider = new ReaderTimeProvider(in.readLong());
         this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject();
         this.maybeQuoteKeyspaceAndTable();
         this.initSidecarClient();
@@ -817,7 +817,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             out.writeUTF(feature.optionName());
         }
         out.writeObject(this.rfMap);
-        out.writeInt(timeProvider.referenceEpochInSeconds());
+        out.writeLong(timeProvider.referenceEpochInSeconds());
         out.writeObject(this.sstableTimeRangeFilter);
     }
 
@@ -893,7 +893,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                                                                            
.collect(Collectors.toList());
             kryo.writeObject(out, listWrapper);
             kryo.writeObject(out, dataLayer.rfMap);
-            out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds());
+            out.writeLong(dataLayer.timeProvider.referenceEpochInSeconds());
             kryo.writeObject(out, dataLayer.sstableTimeRangeFilter);
         }
 
@@ -936,7 +936,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             in.readString(),
             kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(),
             kryo.readObject(in, HashMap.class),
-            new ReaderTimeProvider(in.readInt()),
+            new ReaderTimeProvider(in.readLong()),
             kryo.readObject(in, SSTableTimeRangeFilter.class));
         }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
index 1bec25ea..53072e17 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -73,7 +73,7 @@ public class SSTableReaderTests
                                                          int rows,
                                                          int expectedValues)
     {
-        AtomicInteger referenceEpoch = new AtomicInteger(0);
+        AtomicLong referenceEpoch = new AtomicLong(0);
         TimeProvider navigatableTimeProvider = referenceEpoch::get;
 
         Set<Integer> expectedColValue = new HashSet<>(Arrays.asList(1, 2, 3));
@@ -90,7 +90,7 @@ public class SSTableReaderTests
                 }
                 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
             });
-            int t1 = navigatableTimeProvider.nowInSeconds();
+            long t1 = navigatableTimeProvider.nowInSeconds();
             assertThat(countSSTables(dir)).isEqualTo(1);
 
             // open CompactionStreamScanner over SSTables
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java
new file mode 100644
index 00000000..3ddb0353
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.TTLOption;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createArrayType;
+import static org.apache.spark.sql.types.DataTypes.createMapType;
+import static org.apache.spark.sql.types.DataTypes.createStructType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that bulk writes with TTL work correctly for complex types (list, 
set, map, UDT).
+ * This ensures the expiring cell path is invoked correctly for collection and 
composite types.
+ * TODO: add tuple ttl test after adding support for writing tuples
+ */
+class BulkWriteComplexTypeTtlTest extends SharedClusterSparkIntegrationTestBase
+{
+    static final int ROW_COUNT = 100;
+    static final int TTL_SECONDS = 5;
+
+    static final QualifiedName LIST_TTL_TABLE = new 
QualifiedName(TEST_KEYSPACE, "test_list_ttl");
+    static final QualifiedName SET_TTL_TABLE = new 
QualifiedName(TEST_KEYSPACE, "test_set_ttl");
+    static final QualifiedName MAP_TTL_TABLE = new 
QualifiedName(TEST_KEYSPACE, "test_map_ttl");
+    static final QualifiedName UDT_TTL_TABLE = new 
QualifiedName(TEST_KEYSPACE, "test_udt_ttl");
+
+    static final String SIMPLE_UDT_NAME = "simple_udt";
+
+    @Test
+    void testComplexTypesWithTtl()
+    {
+        SparkSession spark = getOrCreateSparkSession();
+
+        // Write all complex types with TTL
+        writeListData(spark);
+        writeSetData(spark);
+        writeMapData(spark);
+        writeUdtData(spark);
+
+        // Wait for TTL to expire (TTL + 1 second margin)
+        Uninterruptibles.sleepUninterruptibly(TTL_SECONDS + 1, 
TimeUnit.SECONDS);
+
+        // Verify all types have expired
+        assertThat(bulkReaderDataFrame(LIST_TTL_TABLE).load().collectAsList())
+            .as("list TTL post-expiry")
+            .isEmpty();
+        assertThat(bulkReaderDataFrame(SET_TTL_TABLE).load().collectAsList())
+            .as("set TTL post-expiry")
+            .isEmpty();
+        assertThat(bulkReaderDataFrame(MAP_TTL_TABLE).load().collectAsList())
+            .as("map TTL post-expiry")
+            .isEmpty();
+        assertThat(bulkReaderDataFrame(UDT_TTL_TABLE).load().collectAsList())
+            .as("UDT TTL post-expiry")
+            .isEmpty();
+    }
+
+    private void writeListData(SparkSession spark)
+    {
+        StructType schema = new StructType()
+                            .add("id", IntegerType, false)
+                            .add("listdata", createArrayType(IntegerType), 
false);
+
+        List<Row> rows = IntStream.range(0, ROW_COUNT)
+                                  .mapToObj(i -> RowFactory.create(i, 
Arrays.asList(i, i + 1)))
+                                  .collect(Collectors.toList());
+        Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+        bulkWriterDataFrameWriter(df, 
LIST_TTL_TABLE).option(WriterOptions.TTL.name(), 
TTLOption.constant(TTL_SECONDS))
+                                                     .save();
+    }
+
+    private void writeSetData(SparkSession spark)
+    {
+        StructType schema = new StructType()
+                            .add("id", IntegerType, false)
+                            .add("setdata", createArrayType(StringType), 
false);
+
+        List<Row> rows = IntStream.range(0, ROW_COUNT)
+                                  .mapToObj(i -> RowFactory.create(i, 
ImmutableSet.of("item" + i)))
+                                  .collect(Collectors.toList());
+        Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+        bulkWriterDataFrameWriter(df, 
SET_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS))
+                                                    .save();
+    }
+
+    private void writeMapData(SparkSession spark)
+    {
+        StructType schema = new StructType()
+                            .add("id", IntegerType, false)
+                            .add("mapdata", createMapType(StringType, 
IntegerType), false);
+
+        List<Row> rows = IntStream.range(0, ROW_COUNT)
+                                  .mapToObj(i -> RowFactory.create(i, 
ImmutableMap.of("key" + i, i)))
+                                  .collect(Collectors.toList());
+        Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+        bulkWriterDataFrameWriter(df, 
MAP_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS))
+                                                    .save();
+    }
+
+    private void writeUdtData(SparkSession spark)
+    {
+        StructType udtType = createStructType(new StructField[]{
+            new StructField("f1", StringType, true, Metadata.empty()),
+            new StructField("f2", IntegerType, true, Metadata.empty())
+        });
+        StructType schema = new StructType()
+                            .add("id", IntegerType, false)
+                            .add("udtfield", udtType, false);
+
+        List<Row> rows = IntStream.range(0, ROW_COUNT)
+                                  .mapToObj(i -> RowFactory.create(i, 
RowFactory.create("course" + i, i)))
+                                  .collect(Collectors.toList());
+        Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+        bulkWriterDataFrameWriter(df, 
UDT_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS))
+                                                    .save();
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .nodesPerDc(3);
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(LIST_TTL_TABLE, DC1_RF3);
+
+        cluster.schemaChangeIgnoringStoppedInstances("CREATE TYPE " + 
TEST_KEYSPACE + "." + SIMPLE_UDT_NAME
+                                                     + " (f1 text, f2 int)");
+
+        cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + 
LIST_TTL_TABLE + " ("
+                                                     + "id int PRIMARY KEY, "
+                                                     + "listdata list<int>)");
+
+        cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + 
SET_TTL_TABLE + " ("
+                                                     + "id int PRIMARY KEY, "
+                                                     + "setdata set<text>)");
+
+        cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + 
MAP_TTL_TABLE + " ("
+                                                     + "id int PRIMARY KEY, "
+                                                     + "mapdata map<text, 
int>)");
+
+        cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + 
UDT_TTL_TABLE + " ("
+                                                     + "id int PRIMARY KEY, "
+                                                     + "udtfield " + 
SIMPLE_UDT_NAME + ")");
+    }
+}
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
index b4e7d8b1..62c9d685 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
@@ -242,9 +242,7 @@ public class FourZeroCdcEventBuilder extends CdcEventBuilder
                 holder.add(makeValue(cell.buffer(), cell.column()));
                 if (cell.isExpiring())
                 {
-                    // TODO: CASSANDRA-14227 Support unit interpretation,
-                    //  so that TTL does not overflow and become negative.
-                    setTTL(cell.ttl(), 
Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime()));
+                    setTTL(cell.ttl(), cell.localDeletionTime());
                 }
             }
         }
@@ -274,7 +272,7 @@ public class FourZeroCdcEventBuilder extends CdcEventBuilder
                 buffer.addCell(cell);
                 if (cell.isExpiring())
                 {
-                    setTTL(cell.ttl(), 
Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime()));
+                    setTTL(cell.ttl(), cell.localDeletionTime());
                 }
             }
         }
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
index 963f3454..dd0cc171 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.TableMetadata;
 
@@ -31,21 +33,25 @@ public class DbUtils
         throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
     }
 
-    public static DeletionTime deletionTime(long markedForDeleteAt, int 
localDeletionTime)
+    @VisibleForTesting
+    public static DeletionTime deletionTime(long markedForDeleteAt, long 
localDeletionTime)
     {
         return DeletionTime.build(markedForDeleteAt, localDeletionTime);
     }
 
+    @VisibleForTesting
     public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds)
     {
         return LivenessInfo.create(timestamp, nowInSeconds);
     }
 
+    @VisibleForTesting
     public static PartitionUpdate fullPartitionDeletion(TableMetadata 
metadata, ByteBuffer key, long timestamp, long nowInSec)
     {
         return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, 
nowInSec);
     }
 
+    @VisibleForTesting
     public static PartitionUpdate.SimpleBuilder 
partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long 
nowInSec)
     {
         return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(nowInSec);
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
index 63ab2fdb..dfd18bb3 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
@@ -187,7 +187,7 @@ public final class SSTableTombstoneWriter implements 
Closeable
                                                        
ClientState.forInternalCalls(),
                                                        options,
                                                        
delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
-                                                       (int) 
TimeUnit.MILLISECONDS.toSeconds(now),
+                                                       
TimeUnit.MILLISECONDS.toSeconds(now),
                                                        
delete.getTimeToLive(options),
                                                        Collections.emptyMap());
 
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
index df6af138..34426e63 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
@@ -117,7 +117,7 @@ public class CompactionStreamScanner extends 
AbstractStreamScanner
     @Override
     UnfilteredPartitionIterator initializePartitions()
     {
-        int nowInSec = timeProvider.referenceEpochInSeconds();
+        long nowInSec = timeProvider.referenceEpochInSeconds();
         Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace);
         ColumnFamilyStore cfStore = 
keyspace.getColumnFamilyStore(metadata.name);
         controller = new PurgingCompactionController(cfStore, 
CompactionParams.TombstoneOption.NONE);
diff --git 
a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
 
b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index 8dffb296..72a656a3 100644
--- 
a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++ 
b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -48,7 +48,7 @@ public abstract class CqlType extends AbstractCqlType
                                     long deletionTime)
     {
         Preconditions.checkArgument(cd.isComplex(), "The method only works 
with complex columns");
-        rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime, 
(int) TimeUnit.MICROSECONDS.toSeconds(deletionTime)));
+        rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime, 
TimeUnit.MICROSECONDS.toSeconds(deletionTime)));
     }
 
     public static BufferCell tombstone(ColumnMetadata column, long timestamp, 
long nowInSec, CellPath path)
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
index 7965272e..4284b0cc 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -33,23 +34,32 @@ public class DbUtils
         throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
     }
 
-    public static DeletionTime deletionTime(long markedForDeleteAt, int 
localDeletionTime)
+
+    @VisibleForTesting
+    public static DeletionTime deletionTime(long markedForDeleteAt, long 
localDeletionTime)
     {
-        return new DeletionTime(markedForDeleteAt, localDeletionTime);
+        // C* 4.0 DeletionTime constructor requires int for localDeletionTime; 
checked cast will throw after Y2038
+        return new DeletionTime(markedForDeleteAt, 
Ints.checkedCast(localDeletionTime));
     }
 
+    @VisibleForTesting
     public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds)
     {
+        // C* 4.0 LivenessInfo.create requires int for nowInSeconds; checked 
cast will throw after Y2038
         return LivenessInfo.create(timestamp, Ints.checkedCast(nowInSeconds));
     }
 
+    @VisibleForTesting
     public static PartitionUpdate fullPartitionDeletion(TableMetadata 
metadata, ByteBuffer key, long timestamp, long nowInSec)
     {
+        // C* 4.0 fullPartitionDelete requires int for nowInSec; checked cast 
will throw after Y2038
         return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, 
Ints.checkedCast(nowInSec));
     }
 
+    @VisibleForTesting
     public static PartitionUpdate.SimpleBuilder 
partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long 
nowInSec)
     {
+        // C* 4.0 simpleBuilder.nowInSec requires int; checked cast will throw 
after Y2038
         return PartitionUpdate.simpleBuilder(metadata, 
key).nowInSec(Ints.checkedCast(nowInSec));
     }
 }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
index 32ff7053..57868d39 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.bridge.CassandraSchema;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -185,7 +186,7 @@ public final class SSTableTombstoneWriter implements 
Closeable
                                                        delete.updatedColumns(),
                                                        options,
                                                        
delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
-                                                       (int) 
TimeUnit.MILLISECONDS.toSeconds(now),
+                                                       
Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(now)),
                                                        
delete.getTimeToLive(options),
                                                        Collections.emptyMap());
 
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
index 05d9d45e..0853325c 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
@@ -25,6 +25,7 @@ import java.math.BigInteger;
 import java.util.Iterator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringPrefix;
@@ -438,12 +439,14 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<RowData>, C
             {
                 AbstractComplexTypeBuffer buffer = 
AbstractComplexTypeBuffer.newBuffer(column.type, cellCount);
                 long maxTimestamp = Long.MIN_VALUE;
+                // C* 4.0 Cell.isLive requires int for nowInSec; checked cast 
will throw after Y2038
+                int referenceEpochInSecondsAsInt = 
Ints.checkedCast(timeProvider.referenceEpochInSeconds());
                 while (cells.hasNext())
                 {
                     Cell<?> cell = cells.next();
                     // Re: isLive vs. isTombstone - isLive considers TTL so 
that if a cell is expiring soon,
                     // it is handled as tombstone
-                    if (cell.isLive(timeProvider.referenceEpochInSeconds()))
+                    if (cell.isLive(referenceEpochInSecondsAsInt))
                     {
                         buffer.addCell(cell);
                     }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
index 89a2df3f..2925eb59 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
@@ -28,6 +28,7 @@ import java.util.function.LongPredicate;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.db.AbstractCompactionController;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -117,7 +118,7 @@ public class CompactionStreamScanner extends 
AbstractStreamScanner
     @Override
     UnfilteredPartitionIterator initializePartitions()
     {
-        int nowInSec = timeProvider.referenceEpochInSeconds();
+        long nowInSec = timeProvider.referenceEpochInSeconds();
         Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace);
         ColumnFamilyStore cfStore = 
keyspace.getColumnFamilyStore(metadata.name);
         controller = new PurgingCompactionController(cfStore, 
CompactionParams.TombstoneOption.NONE);
@@ -125,7 +126,8 @@ public class CompactionStreamScanner extends 
AbstractStreamScanner
                                                      .map(Scannable::scanner)
                                                      
.collect(Collectors.toList());
         scanners = new AbstractCompactionStrategy.ScannerList(scannerList);
-        ci = new CompactionIterator(OperationType.COMPACTION, 
scanners.scanners, controller, nowInSec, taskId);
+        // C* 4.0 CompactionIterator requires int for nowInSec; checked cast 
will throw after Y2038
+        ci = new CompactionIterator(OperationType.COMPACTION, 
scanners.scanners, controller, Ints.checkedCast(nowInSec), taskId);
         return ci;
     }
 
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
index e4425d75..008d34cc 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.bridge.CassandraVersion;
 import org.apache.cassandra.cdc.api.Row;
@@ -138,7 +139,7 @@ public abstract class AbstractCqlType implements 
CqlField.CqlType
                         ColumnMetadata cd,
                         long timestamp,
                         int ttl,
-                        int now,
+                        long now,
                         Object value)
     {
         addCell(rowBuilder, cd, timestamp, ttl, now, value, null);
@@ -149,7 +150,7 @@ public abstract class AbstractCqlType implements 
CqlField.CqlType
                         ColumnMetadata cd,
                         long timestamp,
                         int ttl,
-                        int now,
+                        long now,
                         Object value,
                         CellPath cellPath)
     {
@@ -199,6 +200,7 @@ public abstract class AbstractCqlType implements 
CqlField.CqlType
                                     long deletionTime)
     {
         Preconditions.checkArgument(cd.isComplex(), "The method only works 
with complex columns");
-        rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, (int) 
TimeUnit.MICROSECONDS.toSeconds(deletionTime)));
+        // C* 4.0 DeletionTime constructor requires int for localDeletionTime
+        rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, 
Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(deletionTime))));
     }
 }
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index e91027a6..3912d9a8 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -31,11 +31,13 @@ public abstract class CqlType extends AbstractCqlType
 {
     public static BufferCell tombstone(ColumnMetadata column, long timestamp, 
long nowInSec, CellPath path)
     {
+        // C* 4.0 BufferCell.tombstone requires int for nowInSec; checked cast 
will throw after Y2038
         return BufferCell.tombstone(column, timestamp, 
Ints.checkedCast(nowInSec), path);
     }
 
     public static BufferCell expiring(ColumnMetadata column, long timestamp, 
int ttl, long nowInSec, ByteBuffer value, CellPath path)
     {
+        // C* 4.0 BufferCell.expiring requires int for nowInSec; checked cast 
will throw after Y2038
         return BufferCell.expiring(column, timestamp, ttl, 
Ints.checkedCast(nowInSec), value, path);
     }
 }
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
index a7cf8bea..43619e6f 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
@@ -105,15 +105,15 @@ public abstract class AbstractCqlList extends 
CqlCollection implements CqlField.
                         ColumnMetadata cd,
                         long timestamp,
                         int ttl,
-                        int now,
+                        long now,
                         Object value)
     {
         for (Object o : (List<?>) value)
         {
             if (ttl != NO_TTL)
             {
-                rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, 
now, type().serialize(o),
-                                                       
CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))));
+                rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, 
type().serialize(o),
+                                                    
CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))));
             }
             else
             {
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
index 6d0e49d3..e9003b09 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
@@ -120,15 +120,15 @@ public class CqlMap extends CqlCollection implements 
CqlField.CqlMap
                         ColumnMetadata cd,
                         long timestamp,
                         int ttl,
-                        int now,
+                        long now,
                         Object value)
     {
         for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet())
         {
             if (ttl != NO_TTL)
             {
-                rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, 
now, valueType().serialize(entry.getValue()),
-                                                       
CellPath.create(keyType().serialize(entry.getKey()))));
+                rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, 
valueType().serialize(entry.getValue()),
+                                                    
CellPath.create(keyType().serialize(entry.getKey()))));
             }
             else
             {
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
index c50b8059..bbe504ae 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
@@ -104,15 +104,15 @@ public class CqlSet extends CqlList implements 
CqlField.CqlSet
                         ColumnMetadata cd,
                         long timestamp,
                         int ttl,
-                        int now,
+                        long now,
                         Object value)
     {
         for (Object o : (Set<?>) value)
         {
             if (ttl != NO_TTL)
             {
-                rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, 
now, ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                       
CellPath.create(type().serialize(o))));
+                rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, 
ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                    
CellPath.create(type().serialize(o))));
             }
             else
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to