This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58ea38d0 CASSANALYTICS-122: Use long for absolute times and support C*
5.0 extended localDeletionTime (#183)
58ea38d0 is described below
commit 58ea38d0dc808f5357cabb46bcc182136e95524f
Author: Shailaja Koppu <[email protected]>
AuthorDate: Thu Mar 26 18:45:30 2026 +0000
CASSANALYTICS-122: Use long for absolute times and support C* 5.0 extended
localDeletionTime (#183)
Patch by Shailaja Koppu; Reviewed by Jyothsna Konisa, Yifan Cai for
CASSANALYTICS-122
---
.circleci/config.yml | 10 +-
CHANGES.txt | 2 +-
.../apache/cassandra/cdc/avro/CdcEventUtils.java | 4 +-
.../src/main/resources/cdc_bytes.avsc | 2 +-
.../src/main/resources/cdc_generic_record.avsc | 2 +-
.../cassandra/cdc/json/JsonSerializerTests.java | 27 +-
cassandra-analytics-cdc/build.gradle | 3 +
.../cdc/AvroByteRecordTransformerTest.java | 178 ++++++++
.../cdc/AvroGenericRecordTransformerTest.java | 502 +++++++++++++++++++++
.../apache/cassandra/cdc/avro/TestSchemaStore.java | 65 +++
.../org/apache/cassandra/cdc/msg/CdcEvent.java | 4 +-
.../apache/cassandra/cdc/msg/CdcEventBuilder.java | 2 +-
.../cassandra/spark/utils/ReaderTimeProvider.java | 6 +-
.../apache/cassandra/spark/utils/TimeProvider.java | 10 +-
cassandra-analytics-core/build.gradle | 7 +-
.../cassandra/spark/data/CassandraDataLayer.java | 8 +-
.../apache/cassandra/spark/SSTableReaderTests.java | 6 +-
.../analytics/BulkWriteComplexTypeTtlTest.java | 195 ++++++++
.../cassandra/cdc/msg/FourZeroCdcEventBuilder.java | 6 +-
.../main/java/org/apache/cassandra/db/DbUtils.java | 8 +-
.../io/sstable/SSTableTombstoneWriter.java | 2 +-
.../spark/reader/CompactionStreamScanner.java | 2 +-
.../org/apache/cassandra/spark/data/CqlType.java | 2 +-
.../main/java/org/apache/cassandra/db/DbUtils.java | 14 +-
.../io/sstable/SSTableTombstoneWriter.java | 3 +-
.../spark/reader/AbstractStreamScanner.java | 5 +-
.../spark/reader/CompactionStreamScanner.java | 6 +-
.../cassandra/spark/data/AbstractCqlType.java | 8 +-
.../org/apache/cassandra/spark/data/CqlType.java | 2 +
.../spark/data/complex/AbstractCqlList.java | 6 +-
.../cassandra/spark/data/complex/CqlMap.java | 6 +-
.../cassandra/spark/data/complex/CqlSet.java | 6 +-
32 files changed, 1053 insertions(+), 56 deletions(-)
diff --git a/.circleci/config.yml b/.circleci/config.yml
index ff7eb500..0e9f576f 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -64,10 +64,13 @@ commands:
JDK_VERSION: "<<parameters.jdk>>"
INTEGRATION_MAX_PARALLEL_FORKS: 1
INTEGRATION_MAX_HEAP_SIZE: "1500M"
+ CORE_MAX_PARALLEL_FORKS: 2
+ CORE_TEST_MAX_HEAP_SIZE: "2048m"
CASSANDRA_USE_JDK11: <<parameters.use_jdk11>>
command: |
+ export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g"
# Run compile/unit tests, skipping integration tests
- ./gradlew --stacktrace clean assemble check -x
cassandra-analytics-integration-tests:test
-Dcassandra.analytics.bridges.sstable_format=<<parameters.sstable_format>>
+ ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble
check -x cassandra-analytics-integration-tests:test
-Dcassandra.analytics.bridges.sstable_format=<<parameters.sstable_format>>
run_integration:
parameters:
@@ -93,10 +96,11 @@ commands:
INTEGRATION_MAX_HEAP_SIZE: "2500M"
CASSANDRA_USE_JDK11: <<parameters.use_jdk11>>
command: |
+ export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g"
export DTEST_JAR="dtest-<< parameters.cassandra >>.jar"
export CASSANDRA_VERSION=$(echo << parameters.cassandra >> | cut
-d'.' -f 1,2)
# Run compile but not unit tests (which are run in run_build)
- ./gradlew --stacktrace clean assemble
+ ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble
# Run integration tests in parallel
cd cassandra-analytics-integration-tests/src/test/java
# Get list of classnames of tests that should run on this node
@@ -132,7 +136,7 @@ jobs:
name: Build dependencies for jdk11 builds
command: |
CASSANDRA_USE_JDK11=true ./scripts/build-dependencies.sh
- ./gradlew codeCheckTasks
+ ./gradlew --no-daemon --max-workers=2 codeCheckTasks
- persist_to_workspace:
root: dependencies
paths:
diff --git a/CHANGES.txt b/CHANGES.txt
index b2e71b6e..35afa6cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
0.4.0
-----
- * Setup CI Pipeline with GitHub Actions (CASSANALYTICS-106)
+ * Support extended deletion time in CDC for Cassandra 5.0
* Flush event consumer before persisting CDC state to prevent data loss on
failure (CASSANALYTICS-126)
* Fix ReadStatusTracker to distinguish clean completion from error
termination in BufferingCommitLogReader (CASSANALYTICS-129)
* Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
diff --git
a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
index ddb1b48a..fef91ab6 100644
---
a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
+++
b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java
@@ -281,14 +281,14 @@ public final class CdcEventUtils
return ttlRecord;
}
- public static Map<String, Integer> getTTL(CdcEvent event)
+ public static Map<String, Long> getTTL(CdcEvent event)
{
CdcEvent.TimeToLive ttl = event.getTtl();
if (ttl == null)
{
return null;
}
- return mapOf(AvroConstants.TTL_KEY, ttl.ttlInSec,
AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec);
+ return mapOf(AvroConstants.TTL_KEY, (long) ttl.ttlInSec,
AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec);
}
public static UpdatedEvent getUpdatedEvent(CdcEvent event,
diff --git a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
index 8609b7bf..55104298 100644
--- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
+++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc
@@ -145,7 +145,7 @@
},
{
"name": "deletedAt",
- "type": "int",
+ "type": "long",
"doc": "Future timestamp in seconds"
}
]
diff --git
a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
index 9f4a3a47..4535fe1d 100644
--- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
+++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc
@@ -145,7 +145,7 @@
},
{
"name": "deletedAt",
- "type": "int",
+ "type": "long",
"doc": "Future timestamp in seconds"
}
]
diff --git
a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
index 8a582f29..ee2aca02 100644
---
a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
+++
b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java
@@ -101,7 +101,7 @@ public class JsonSerializerTests
assertThat(root.has(AvroConstants.TTL_KEY)).isTrue();
JsonNode ttl = root.get(AvroConstants.TTL_KEY);
assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10);
-
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asInt()).isEqualTo(1658269);
+
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(1658269);
}
@Test
@@ -138,4 +138,29 @@ public class JsonSerializerTests
InetAddress address =
InetAddress.getByAddress(Base64.getDecoder().decode(base64Str));
assertThat(address).isEqualTo(InetAddress.getByName("127.0.0.1"));
}
+
+ @Test
+ public void testJsonSerializerWithLongExpirationTime() throws IOException
+ {
+ long expirationTimePastIntMax = 2_147_483_648L;
+ CdcEventBuilder eventBuilder =
CdcEventBuilder.of(CdcEvent.Kind.INSERT, TEST_KS, TEST_TBL_BASIC);
+ eventBuilder.setPartitionKeys(listOf(Value.of(TEST_KS, "a", "int",
TYPES.aInt().serialize(1))));
+ eventBuilder.setValueColumns(listOf(
+ Value.of(TEST_KS, "b", "int", TYPES.aInt().serialize(2))
+ ));
+
eventBuilder.setMaxTimestampMicros(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+ eventBuilder.setTimeToLive(new CdcEvent.TimeToLive(10,
expirationTimePastIntMax));
+
+ byte[] ar;
+ try (JsonSerializer serializer = new JsonSerializer(TYPE_LOOKUP))
+ {
+ ar = serializer.serialize("topic", eventBuilder.build());
+ }
+ assertThat(ar).isNotNull();
+
+ JsonNode root = MAPPER.readTree(ar);
+ JsonNode ttl = root.get(AvroConstants.TTL_KEY);
+ assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10);
+
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(expirationTimePastIntMax);
+ }
}
diff --git a/cassandra-analytics-cdc/build.gradle
b/cassandra-analytics-cdc/build.gradle
index 6fdd7910..38c28406 100644
--- a/cassandra-analytics-cdc/build.gradle
+++ b/cassandra-analytics-cdc/build.gradle
@@ -89,6 +89,9 @@ dependencies {
implementation
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
testImplementation project(":cassandra-analytics-common")
+ testImplementation project(":cassandra-analytics-cdc-codec")
+ testImplementation "org.apache.avro:avro:${avroVersion}"
+ testImplementation "org.apache.kafka:kafka-clients:${kafkaClientVersion}"
// pull in cassandra-bridge so we can re-use TestSchema to generate
arbitrary schemas for the cdc tests
testImplementation project(":cassandra-bridge")
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java
new file mode 100644
index 00000000..985b3b1b
--- /dev/null
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.avro.AvroByteRecordTransformer;
+import org.apache.cassandra.cdc.avro.AvroConstants;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.avro.TestSchemaStore;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.test.CdcTestBase;
+import org.apache.cassandra.cdc.test.CdcTester;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+
+import static org.apache.cassandra.cdc.test.CdcTester.testWith;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that exercise the CDC-to-Avro byte-serialization pipeline ({@code
cdc_bytes.avsc}),
+ */
+public class AvroByteRecordTransformerTest extends CdcTestBase
+{
+ private static final int NUM_ROWS = 50;
+
+ private CqlToAvroSchemaConverter getConverter(CassandraVersion version)
+ {
+ CqlToAvroSchemaConverter converter =
CdcBridgeFactory.getCqlToAvroSchemaConverter(version);
+ assertThat(converter).isNotNull();
+ return converter;
+ }
+
+ /**
+ * Build the Avro byte transformer by converting the CQL table schema
+ * into an Avro schema and registering it in the test schema store.
+ */
+ private AvroByteRecordTransformer
buildTransformer(CqlToAvroSchemaConverter converter,
+ CqlTable cqlTable,
+ TestSchemaStore
schemaStore)
+ {
+ Schema avroSchema = converter.convert(cqlTable);
+ String namespace = cqlTable.keyspace() + "." + cqlTable.table();
+ schemaStore.registerSchema(namespace, avroSchema);
+
+ Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup = key ->
bridge.parseType(key.type);
+ return new AvroByteRecordTransformer(schemaStore, typeLookup);
+ }
+
+ /**
+ * Deserialize a byte payload using the table's Avro schema from the
schema store.
+ */
+ private GenericRecord deserializePayload(ByteBuffer payloadBytes,
GenericDatumReader<GenericRecord> reader) throws IOException
+ {
+ byte[] bytes = new byte[payloadBytes.remaining()];
+ payloadBytes.get(bytes);
+ return reader.read(null, DecoderFactory.get().binaryDecoder(bytes,
null));
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testTtlDeletedAtByteAvroEncoding(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ int ttlSeconds = 3600;
+ long beforeTestEpochSec =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withColumn("c1",
bridge.bigint())
+ .withColumn("c2",
bridge.text());
+
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ for (int i = 0; i < tester.numRows; i++)
+ {
+ TestSchema.TestRow testRow =
CdcTester.newUniqueRow(tester.schema, rows);
+ testRow.setTTL(ttlSeconds);
+ writer.accept(testRow,
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+ }
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ long afterTestEpochSec =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+ assertThat(events).hasSize(NUM_ROWS);
+
+ TestSchemaStore schemaStore = new TestSchemaStore();
+ AvroByteRecordTransformer transformer =
buildTransformer(converter, tableRef.get(), schemaStore);
+ String namespace = tableRef.get().keyspace() + "." +
tableRef.get().table();
+
+ for (CdcEvent event : events)
+ {
+ GenericData.Record record = transformer.transform(event);
+ assertThat(record).isNotNull();
+
+ // Validate header-level fields
+
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT");
+
assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull();
+
assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION);
+
+ // TTL record should be present
+ Object ttlField = record.get(AvroConstants.TTL_KEY);
+ assertThat(ttlField).isNotNull();
+ GenericRecord ttlRecord = (GenericRecord) ttlField;
+
+ // Validate TTL value
+
assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds);
+
+ // Validate deletedAt is a Long (confirms long type in
cdc_bytes.avsc)
+ Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY);
+ assertThat(deletedAt).isInstanceOf(Long.class);
+
+ // Validate deletedAt is approximately nowInSeconds + TTL
+ long deletedAtValue = (Long) deletedAt;
+ assertThat(deletedAtValue)
+ .as("deletedAt should be approximately nowInSeconds + TTL")
+ .isBetween(beforeTestEpochSec + ttlSeconds,
afterTestEpochSec + ttlSeconds);
+
+ // Validate payload: bytes in the byte schema need
deserialization to verify content
+ Object payloadObj = record.get(AvroConstants.PAYLOAD_KEY);
+ assertThat(payloadObj).isInstanceOf(ByteBuffer.class);
+ GenericRecord payloadRecord;
+ try
+ {
+ payloadRecord = deserializePayload((ByteBuffer) payloadObj,
+
schemaStore.getReader(namespace, null));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Failed to deserialize
payload", e);
+ }
+ assertThat(payloadRecord.get("pk")).isNotNull();
+ }
+ })
+ .run();
+ }
+}
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java
new file mode 100644
index 00000000..2672e59d
--- /dev/null
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
+import org.apache.cassandra.cdc.avro.AvroConstants;
+import org.apache.cassandra.cdc.avro.AvroGenericRecordTransformer;
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CdcEventUtils;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.avro.TestSchemaStore;
+import org.apache.cassandra.cdc.kafka.AvroGenericRecordSerializer;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.test.CdcTestBase;
+import org.apache.cassandra.cdc.test.CdcTester;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+
+import static
org.apache.cassandra.cdc.test.CdcTester.newUniquePartitionDeletion;
+import static org.apache.cassandra.cdc.test.CdcTester.testWith;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that exercise the full CDC-to-Avro pipeline:
+ * write mutations -> read CDC events from commit logs -> convert to Avro
GenericRecord -> validate.
+ */
+@SuppressWarnings("DataFlowIssue")
+public class AvroGenericRecordTransformerTest extends CdcTestBase
+{
+ private static final int NUM_ROWS = 50;
+
+ private CqlToAvroSchemaConverter getConverter(CassandraVersion version)
+ {
+ CqlToAvroSchemaConverter converter =
CdcBridgeFactory.getCqlToAvroSchemaConverter(version);
+ assertThat(converter).isNotNull();
+ return converter;
+ }
+
+ /**
+ * Build the Avro transformer by converting the CQL table schema (already
registered by CdcTester)
+ * into an Avro schema and registering it in the test schema store.
+ */
+ private AvroGenericRecordTransformer
buildTransformer(CqlToAvroSchemaConverter converter,
+ CqlTable cqlTable)
+ {
+ TestSchemaStore schemaStore = new TestSchemaStore();
+ Schema avroSchema = converter.convert(cqlTable);
+ String namespace = cqlTable.keyspace() + "." + cqlTable.table();
+ schemaStore.registerSchema(namespace, avroSchema);
+
+ Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup = key ->
bridge.parseType(key.type);
+ return new AvroGenericRecordTransformer(schemaStore, typeLookup, "");
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testBasicInsertAvroEncoding(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withClusteringKey("ck",
bridge.bigint())
+ .withColumn("c1",
bridge.bigint())
+ .withColumn("c2",
bridge.text());
+
+ // Capture CqlTable from the tester via the writer callback
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ long timestampMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+ IntStream.range(0, tester.numRows)
+ .forEach(i ->
writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros));
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ assertThat(events).hasSize(NUM_ROWS);
+ AvroGenericRecordTransformer transformer =
buildTransformer(converter, tableRef.get());
+
+ for (CdcEvent event : events)
+ {
+ GenericData.Record record = transformer.transform(event);
+ assertThat(record).isNotNull();
+
+ // Validate header fields
+
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT");
+
assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull();
+
assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION);
+
+ // Validate payload contains expected fields including
clustering key
+ GenericRecord payload = (GenericRecord)
record.get(AvroConstants.PAYLOAD_KEY);
+ assertThat(payload).isNotNull();
+ assertThat(payload.getSchema().getField("pk")).isNotNull();
+ assertThat(payload.getSchema().getField("ck")).isNotNull();
+ assertThat(payload.getSchema().getField("c1")).isNotNull();
+ assertThat(payload.getSchema().getField("c2")).isNotNull();
+ // Payload fields should have data
+ assertThat(payload.get("pk")).isNotNull();
+ assertThat(payload.get("ck")).isNotNull();
+ assertThat(payload.get("c1")).isNotNull();
+
+ // Validate updateFields lists all updated column names
+ List<String> updateFields =
CdcEventUtils.updatedFieldNames(event);
+ assertThat(updateFields).contains("pk", "ck", "c1", "c2");
+ }
+ })
+ .run();
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testCollectionTypesAvroEncoding(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withColumn("m",
bridge.map(bridge.text(), bridge.text()))
+ .withColumn("s",
bridge.set(bridge.aInt()))
+ .withColumn("l",
bridge.list(bridge.text()));
+
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ long timestampMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+ IntStream.range(0, tester.numRows)
+ .forEach(i ->
writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros));
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ assertThat(events).hasSize(NUM_ROWS);
+ AvroGenericRecordTransformer transformer =
buildTransformer(converter, tableRef.get());
+
+ for (CdcEvent event : events)
+ {
+ GenericData.Record record = transformer.transform(event);
+ assertThat(record).isNotNull();
+
+ GenericRecord payload = (GenericRecord)
record.get(AvroConstants.PAYLOAD_KEY);
+ assertThat(payload).isNotNull();
+ assertThat(payload.getSchema().getField("m")).isNotNull();
+ assertThat(payload.getSchema().getField("s")).isNotNull();
+ assertThat(payload.getSchema().getField("l")).isNotNull();
+ }
+ })
+ .run();
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testAvroSerializeDeserializeRoundTrip(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withColumn("c1",
bridge.bigint())
+ .withColumn("c2",
bridge.text());
+
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ long timestampMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+ IntStream.range(0, tester.numRows)
+ .forEach(i ->
writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros));
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ assertThat(events).hasSize(NUM_ROWS);
+ TestSchemaStore schemaStore = new TestSchemaStore();
+ CqlTable cqlTable = tableRef.get();
+ Schema avroSchema = converter.convert(cqlTable);
+ String namespace = cqlTable.keyspace() + "." + cqlTable.table();
+ schemaStore.registerSchema(namespace, avroSchema);
+
+ Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup = key ->
bridge.parseType(key.type);
+ AvroGenericRecordSerializer serializer = new
AvroGenericRecordSerializer(schemaStore, typeLookup, "");
+
+ for (CdcEvent event : events)
+ {
+ // Serialize: CdcEvent -> bytes
+ byte[] bytes = serializer.serialize("test-topic", event);
+ assertThat(bytes).isNotNull();
+ assertThat(bytes.length).isGreaterThan(0);
+
+ // Get the Avro record's schema for deserialization
+ GenericData.Record record =
serializer.getTransformer().transform(event);
+ Schema recordSchema = record.getSchema();
+
+ // Deserialize: bytes -> CdcEnvelope
+ org.apache.cassandra.cdc.avro.msg.CdcEnvelope envelope =
+ serializer.deserializer().deserialize(event.keyspace,
event.table, bytes, recordSchema);
+ assertThat(envelope).isNotNull();
+ assertThat(envelope.header).isNotNull();
+ assertThat(envelope.payload).isNotNull();
+
+ // Validate round-trip preserves operation type and table info
+
assertThat(envelope.header.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+
assertThat(envelope.header.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+
assertThat(envelope.header.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT");
+
+ // Validate payload fields survived round-trip
+ assertThat(envelope.payload.get("pk")).isNotNull();
+ assertThat(envelope.payload.get("c1")).isNotNull();
+ }
+ })
+ .run();
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testDeleteEventAvroEncoding(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withColumn("c1",
bridge.bigint())
+ .withColumn("c2",
bridge.text());
+
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ for (int i = 0; i < tester.numRows; i++)
+ {
+ TestSchema.TestRow testRow =
CdcTester.newUniqueRow(tester.schema, rows);
+ testRow = testRow.copy("c1",
org.apache.cassandra.bridge.CdcBridge.UNSET_MARKER);
+ testRow = testRow.copy("c2", null);
+ writer.accept(testRow,
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+ }
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ assertThat(events).hasSize(NUM_ROWS);
+ AvroGenericRecordTransformer transformer =
buildTransformer(converter, tableRef.get());
+
+ for (CdcEvent event : events)
+ {
+ GenericData.Record record = transformer.transform(event);
+ assertThat(record).isNotNull();
+
+ // Verify the Avro record correctly encodes the operation type
from the event
+ CdcEventUtils.OperationType opType =
CdcEventUtils.getOperationType(event);
+
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo(opType.name());
+
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+
+ GenericRecord payload = (GenericRecord)
record.get(AvroConstants.PAYLOAD_KEY);
+ assertThat(payload).isNotNull();
+ // pk should still be present
+ assertThat(payload.get("pk")).isNotNull();
+ // c2 is deleted (null in the payload)
+ assertThat(payload.get("c2")).isNull();
+ }
+ })
+ .run();
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testMaxSupportedTtlAvroEncoding(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ // Use a large TTL value (~20 years). Safe for both 4.0 and 5.0.
+ int largeTtl = 20 * 365 * 24 * 3600;
+ long beforeTestEpochSec =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withColumn("c1",
bridge.aInt());
+
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ for (int i = 0; i < tester.numRows; i++)
+ {
+ TestSchema.TestRow testRow =
CdcTester.newUniqueRow(tester.schema, rows);
+ testRow.setTTL(largeTtl);
+ writer.accept(testRow,
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+ }
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ long afterTestEpochSec =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+ assertThat(events).hasSize(NUM_ROWS);
+ AvroGenericRecordTransformer transformer =
buildTransformer(converter, tableRef.get());
+
+ for (CdcEvent event : events)
+ {
+ // Validate CdcEvent-level TTL
+ assertThat(event.getTtl()).isNotNull();
+ assertThat(event.getTtl().ttlInSec).isEqualTo(largeTtl);
+ long expirationTime = event.getTtl().expirationTimeInSec;
+ long expectedLower = beforeTestEpochSec + largeTtl;
+ long expectedUpper = afterTestEpochSec + largeTtl;
+ if (expirationTime <= Integer.MAX_VALUE && expectedLower >
Integer.MAX_VALUE)
+ {
+ // Cassandra 4.0 caps the value at Integer.MAX_VALUE
+ assertThat(expirationTime)
+ .as("expirationTimeInSec should be capped near
Integer.MAX_VALUE on Cassandra 4.0")
+ .isBetween((long) Integer.MAX_VALUE - 1, (long)
Integer.MAX_VALUE);
+ }
+ else
+ {
+ assertThat(expirationTime)
+ .as("expirationTimeInSec should be approximately
nowInSeconds + largeTtl")
+ .isBetween(expectedLower, expectedUpper);
+ }
+
+ // Validate Avro-level TTL encoding
+ GenericData.Record record = transformer.transform(event);
+ assertThat(record).isNotNull();
+
+ Object ttlField = record.get(AvroConstants.TTL_KEY);
+ assertThat(ttlField).isNotNull();
+ GenericRecord ttlRecord = (GenericRecord) ttlField;
+
assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(largeTtl);
+
+ Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY);
+ assertThat(deletedAt).isInstanceOf(Long.class);
+ }
+ })
+ .run();
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testPartitionDeleteAvroEncoding(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ long beforeTestMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withColumn("c1",
bridge.aInt());
+
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ for (int i = 0; i < tester.numRows; i++)
+ {
+ TestSchema.TestRow testRow =
newUniquePartitionDeletion(tester.schema, rows);
+ writer.accept(testRow,
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+ }
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ long afterTestMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
+ assertThat(events).hasSize(NUM_ROWS);
+ AvroGenericRecordTransformer transformer =
buildTransformer(converter, tableRef.get());
+
+ for (CdcEvent event : events)
+ {
+ // Validate CdcEvent-level partition delete
+
assertThat(event.getKind()).isEqualTo(CdcEvent.Kind.PARTITION_DELETE);
+ long eventTimestampMicros =
event.getTimestamp(TimeUnit.MICROSECONDS);
+ assertThat(eventTimestampMicros)
+ .as("deletion timestamp should be within the test time
window")
+ .isBetween(beforeTestMicros, afterTestMicros);
+
+ // Validate Avro-level encoding
+ GenericData.Record record = transformer.transform(event);
+ assertThat(record).isNotNull();
+
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("DELETE_PARTITION");
+
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
+
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
+ }
+ })
+ .run();
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
+ public void testMixedTtlAndNonTtlAvroEncoding(CassandraVersion version)
+ {
+ AvroSchemas.registerLogicalTypes();
+ CqlToAvroSchemaConverter converter = getConverter(version);
+
+ int ttlSeconds = 3600;
+ long beforeTestEpochSec =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+
+ TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
+ .withPartitionKey("pk",
bridge.uuid())
+ .withColumn("c1",
bridge.aInt());
+
+ AtomicReference<CqlTable> tableRef = new AtomicReference<>();
+ testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
+ .withNumRows(NUM_ROWS)
+ .clearWriters()
+ .withWriter((tester, rows, writer) -> {
+ tableRef.set(tester.cqlTable);
+ for (int i = 0; i < tester.numRows; i++)
+ {
+ TestSchema.TestRow testRow =
CdcTester.newUniqueRow(tester.schema, rows);
+ if (i % 2 == 0)
+ {
+ testRow.setTTL(ttlSeconds);
+ }
+ writer.accept(testRow,
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
+ }
+ })
+ .withCdcEventChecker((testRows, events) -> {
+ long afterTestEpochSec =
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+ assertThat(events).hasSize(NUM_ROWS);
+ AvroGenericRecordTransformer transformer =
buildTransformer(converter, tableRef.get());
+
+ int withTtl = 0;
+ int withoutTtl = 0;
+ for (CdcEvent event : events)
+ {
+ GenericData.Record record = transformer.transform(event);
+ assertThat(record).isNotNull();
+
+ if (event.getTtl() != null)
+ {
+ withTtl++;
+ assertThat(event.getTtl().ttlInSec).isEqualTo(ttlSeconds);
+
assertThat(event.getTtl().expirationTimeInSec).isGreaterThan(0L);
+
+ // Avro TTL field should be present
+ Object ttlField = record.get(AvroConstants.TTL_KEY);
+ assertThat(ttlField).as("Avro TTL record should be present
for TTL row").isNotNull();
+ GenericRecord ttlRecord = (GenericRecord) ttlField;
+
assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds);
+
+ // Validate deletedAt is a Long with expected value
+ Object deletedAt =
ttlRecord.get(AvroConstants.DELETED_AT_KEY);
+ assertThat(deletedAt).isInstanceOf(Long.class);
+ long deletedAtValue = (Long) deletedAt;
+ assertThat(deletedAtValue)
+ .as("deletedAt should be approximately nowInSeconds +
TTL")
+ .isBetween(beforeTestEpochSec + ttlSeconds,
afterTestEpochSec + ttlSeconds);
+ }
+ else
+ {
+ withoutTtl++;
+ // Avro TTL field should be absent
+ assertThat(record.get(AvroConstants.TTL_KEY))
+ .as("Avro TTL record should be null for non-TTL row")
+ .isNull();
+ }
+ }
+ assertThat(withTtl).isEqualTo(NUM_ROWS / 2);
+ assertThat(withoutTtl).isEqualTo(NUM_ROWS / 2);
+ })
+ .run();
+ }
+}
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java
new file mode 100644
index 00000000..b9a5249a
--- /dev/null
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.avro;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+
+/**
+ * In-memory {@link SchemaStore} implementation for tests.
+ * Schemas are registered manually via {@link #registerSchema(String, Schema)}.
+ */
+public class TestSchemaStore implements SchemaStore
+{
+ private final Map<String, Schema> schemas = new HashMap<>();
+ private final Map<String, GenericDatumWriter<GenericRecord>> writers = new
HashMap<>();
+ private final Map<String, GenericDatumReader<GenericRecord>> readers = new
HashMap<>();
+
+ public void registerSchema(String namespace, Schema schema)
+ {
+ schemas.put(namespace, schema);
+ writers.put(namespace, new GenericDatumWriter<>(schema));
+ readers.put(namespace, new GenericDatumReader<>(schema));
+ }
+
+ @Override
+ public Schema getSchema(String namespace, String name)
+ {
+ return schemas.get(namespace);
+ }
+
+ @Override
+ public GenericDatumWriter<GenericRecord> getWriter(String namespace,
String name)
+ {
+ return writers.get(namespace);
+ }
+
+ @Override
+ public GenericDatumReader<GenericRecord> getReader(String namespace,
String name)
+ {
+ return readers.get(namespace);
+ }
+}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
index d3b5647c..ae3687be 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java
@@ -50,9 +50,9 @@ public class CdcEvent
public static class TimeToLive
{
public final int ttlInSec;
- public final int expirationTimeInSec;
+ public final long expirationTimeInSec;
- public TimeToLive(int ttlInSec, int expirationTimeInSec)
+ public TimeToLive(int ttlInSec, long expirationTimeInSec)
{
this.ttlInSec = ttlInSec;
this.expirationTimeInSec = expirationTimeInSec;
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
index abf66151..60c760b0 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java
@@ -107,7 +107,7 @@ public class CdcEventBuilder
this.maxTimestampMicros = maxTimestampMicros;
}
- public void setTTL(int ttlInSec, int expirationTimeInSec)
+ public void setTTL(int ttlInSec, long expirationTimeInSec)
{
// Skip updating TTL if it already has been set.
// For the same row, the upsert query can only set one TTL value.
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
index cbc17517..c6189bbb 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java
@@ -24,7 +24,7 @@ package org.apache.cassandra.spark.utils;
*/
public class ReaderTimeProvider implements TimeProvider
{
- private final int referenceEpochInSeconds;
+ private final long referenceEpochInSeconds;
public ReaderTimeProvider()
{
@@ -35,13 +35,13 @@ public class ReaderTimeProvider implements TimeProvider
* Constructor used for deserialization
* @param referenceEpochInSeconds reference epoch to set
*/
- public ReaderTimeProvider(int referenceEpochInSeconds)
+ public ReaderTimeProvider(long referenceEpochInSeconds)
{
this.referenceEpochInSeconds = referenceEpochInSeconds;
}
@Override
- public int referenceEpochInSeconds()
+ public long referenceEpochInSeconds()
{
return referenceEpochInSeconds;
}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
index 40a25125..1ea9db28 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java
@@ -38,10 +38,10 @@ public interface TimeProvider
@VisibleForTesting
TimeProvider DEFAULT = new TimeProvider()
{
- private final int referenceEpochInSeconds = nowInSeconds();
+ private final long referenceEpochInSeconds = nowInSeconds();
@Override
- public int referenceEpochInSeconds()
+ public long referenceEpochInSeconds()
{
return referenceEpochInSeconds;
}
@@ -50,9 +50,9 @@ public interface TimeProvider
/**
* @return current time in seconds
*/
- default int nowInSeconds()
+ default long nowInSeconds()
{
- return (int)
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+ return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
}
/**
@@ -62,5 +62,5 @@ public interface TimeProvider
* <p>
* Note that the actual constant value returned is implementation dependent
*/
- int referenceEpochInSeconds();
+ long referenceEpochInSeconds();
}
diff --git a/cassandra-analytics-core/build.gradle
b/cassandra-analytics-core/build.gradle
index 259c3252..ffce6660 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -182,7 +182,7 @@ tasks.register('testSequential', Test) {
systemProperty "cassandra.analytics.bridges.sstable_format",
System.getProperty("cassandra.analytics.bridges.sstable_format", "big")
minHeapSize = '1024m'
- maxHeapSize = '3072m'
+ maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m'
maxParallelForks = 1
forkEvery = 1 // Enables different end-to-end test classes use Spark
contexts with different configurations
@@ -217,8 +217,9 @@ tasks.register('testSequential', Test) {
test {
systemProperty "cassandra.analytics.bridges.sstable_format",
System.getProperty("cassandra.analytics.bridges.sstable_format", "big")
minHeapSize = '1024m'
- maxHeapSize = '3072m'
- maxParallelForks = Math.max(Runtime.runtime.availableProcessors() * 2, 8)
+ maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m'
+ maxParallelForks = System.getenv('CORE_MAX_PARALLEL_FORKS')?.toInteger()
+ ?: Math.max(Runtime.runtime.availableProcessors() * 2,
8)
forkEvery = 1 // Enables different end-to-end test classes use Spark
contexts with different configurations
// Make it so unit tests run on a Jar with Cassandra bridge
implementations built in
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index b6e84c6f..99cbc682 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -771,7 +771,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
aliasLastModifiedTimestamp(this.requestedFeatures,
this.lastModifiedTimestampField);
}
this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
- this.timeProvider = new ReaderTimeProvider(in.readInt());
+ this.timeProvider = new ReaderTimeProvider(in.readLong());
this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject();
this.maybeQuoteKeyspaceAndTable();
this.initSidecarClient();
@@ -817,7 +817,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
out.writeUTF(feature.optionName());
}
out.writeObject(this.rfMap);
- out.writeInt(timeProvider.referenceEpochInSeconds());
+ out.writeLong(timeProvider.referenceEpochInSeconds());
out.writeObject(this.sstableTimeRangeFilter);
}
@@ -893,7 +893,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
.collect(Collectors.toList());
kryo.writeObject(out, listWrapper);
kryo.writeObject(out, dataLayer.rfMap);
- out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds());
+ out.writeLong(dataLayer.timeProvider.referenceEpochInSeconds());
kryo.writeObject(out, dataLayer.sstableTimeRangeFilter);
}
@@ -936,7 +936,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
in.readString(),
kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(),
kryo.readObject(in, HashMap.class),
- new ReaderTimeProvider(in.readInt()),
+ new ReaderTimeProvider(in.readLong()),
kryo.readObject(in, SSTableTimeRangeFilter.class));
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
index 1bec25ea..53072e17 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java
@@ -25,7 +25,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -73,7 +73,7 @@ public class SSTableReaderTests
int rows,
int expectedValues)
{
- AtomicInteger referenceEpoch = new AtomicInteger(0);
+ AtomicLong referenceEpoch = new AtomicLong(0);
TimeProvider navigatableTimeProvider = referenceEpoch::get;
Set<Integer> expectedColValue = new HashSet<>(Arrays.asList(1, 2, 3));
@@ -90,7 +90,7 @@ public class SSTableReaderTests
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
});
- int t1 = navigatableTimeProvider.nowInSeconds();
+ long t1 = navigatableTimeProvider.nowInSeconds();
assertThat(countSSTables(dir)).isEqualTo(1);
// open CompactionStreamScanner over SSTables
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java
new file mode 100644
index 00000000..3ddb0353
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.TTLOption;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createArrayType;
+import static org.apache.spark.sql.types.DataTypes.createMapType;
+import static org.apache.spark.sql.types.DataTypes.createStructType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that bulk writes with TTL work correctly for complex types (list,
set, map, UDT).
+ * This ensures the expiring cell path is invoked correctly for collection and
composite types.
+ * TODO: add tuple ttl test after adding support for writing tuples
+ */
+class BulkWriteComplexTypeTtlTest extends SharedClusterSparkIntegrationTestBase
+{
+ static final int ROW_COUNT = 100;
+ static final int TTL_SECONDS = 5;
+
+ static final QualifiedName LIST_TTL_TABLE = new
QualifiedName(TEST_KEYSPACE, "test_list_ttl");
+ static final QualifiedName SET_TTL_TABLE = new
QualifiedName(TEST_KEYSPACE, "test_set_ttl");
+ static final QualifiedName MAP_TTL_TABLE = new
QualifiedName(TEST_KEYSPACE, "test_map_ttl");
+ static final QualifiedName UDT_TTL_TABLE = new
QualifiedName(TEST_KEYSPACE, "test_udt_ttl");
+
+ static final String SIMPLE_UDT_NAME = "simple_udt";
+
+ @Test
+ void testComplexTypesWithTtl()
+ {
+ SparkSession spark = getOrCreateSparkSession();
+
+ // Write all complex types with TTL
+ writeListData(spark);
+ writeSetData(spark);
+ writeMapData(spark);
+ writeUdtData(spark);
+
+ // Wait for TTL to expire (TTL + 1 second margin)
+ Uninterruptibles.sleepUninterruptibly(TTL_SECONDS + 1,
TimeUnit.SECONDS);
+
+ // Verify all types have expired
+ assertThat(bulkReaderDataFrame(LIST_TTL_TABLE).load().collectAsList())
+ .as("list TTL post-expiry")
+ .isEmpty();
+ assertThat(bulkReaderDataFrame(SET_TTL_TABLE).load().collectAsList())
+ .as("set TTL post-expiry")
+ .isEmpty();
+ assertThat(bulkReaderDataFrame(MAP_TTL_TABLE).load().collectAsList())
+ .as("map TTL post-expiry")
+ .isEmpty();
+ assertThat(bulkReaderDataFrame(UDT_TTL_TABLE).load().collectAsList())
+ .as("UDT TTL post-expiry")
+ .isEmpty();
+ }
+
+ private void writeListData(SparkSession spark)
+ {
+ StructType schema = new StructType()
+ .add("id", IntegerType, false)
+ .add("listdata", createArrayType(IntegerType),
false);
+
+ List<Row> rows = IntStream.range(0, ROW_COUNT)
+ .mapToObj(i -> RowFactory.create(i,
Arrays.asList(i, i + 1)))
+ .collect(Collectors.toList());
+ Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+ bulkWriterDataFrameWriter(df,
LIST_TTL_TABLE).option(WriterOptions.TTL.name(),
TTLOption.constant(TTL_SECONDS))
+ .save();
+ }
+
+ private void writeSetData(SparkSession spark)
+ {
+ StructType schema = new StructType()
+ .add("id", IntegerType, false)
+ .add("setdata", createArrayType(StringType),
false);
+
+ List<Row> rows = IntStream.range(0, ROW_COUNT)
+ .mapToObj(i -> RowFactory.create(i,
ImmutableSet.of("item" + i)))
+ .collect(Collectors.toList());
+ Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+ bulkWriterDataFrameWriter(df,
SET_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS))
+ .save();
+ }
+
+ private void writeMapData(SparkSession spark)
+ {
+ StructType schema = new StructType()
+ .add("id", IntegerType, false)
+ .add("mapdata", createMapType(StringType,
IntegerType), false);
+
+ List<Row> rows = IntStream.range(0, ROW_COUNT)
+ .mapToObj(i -> RowFactory.create(i,
ImmutableMap.of("key" + i, i)))
+ .collect(Collectors.toList());
+ Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+ bulkWriterDataFrameWriter(df,
MAP_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS))
+ .save();
+ }
+
+ private void writeUdtData(SparkSession spark)
+ {
+ StructType udtType = createStructType(new StructField[]{
+ new StructField("f1", StringType, true, Metadata.empty()),
+ new StructField("f2", IntegerType, true, Metadata.empty())
+ });
+ StructType schema = new StructType()
+ .add("id", IntegerType, false)
+ .add("udtfield", udtType, false);
+
+ List<Row> rows = IntStream.range(0, ROW_COUNT)
+ .mapToObj(i -> RowFactory.create(i,
RowFactory.create("course" + i, i)))
+ .collect(Collectors.toList());
+ Dataset<Row> df = spark.createDataFrame(rows, schema);
+
+ bulkWriterDataFrameWriter(df,
UDT_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS))
+ .save();
+ }
+
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .nodesPerDc(3);
+ }
+
+ @Override
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(LIST_TTL_TABLE, DC1_RF3);
+
+ cluster.schemaChangeIgnoringStoppedInstances("CREATE TYPE " +
TEST_KEYSPACE + "." + SIMPLE_UDT_NAME
+ + " (f1 text, f2 int)");
+
+ cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " +
LIST_TTL_TABLE + " ("
+ + "id int PRIMARY KEY, "
+ + "listdata list<int>)");
+
+ cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " +
SET_TTL_TABLE + " ("
+ + "id int PRIMARY KEY, "
+ + "setdata set<text>)");
+
+ cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " +
MAP_TTL_TABLE + " ("
+ + "id int PRIMARY KEY, "
+ + "mapdata map<text,
int>)");
+
+ cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " +
UDT_TTL_TABLE + " ("
+ + "id int PRIMARY KEY, "
+ + "udtfield " +
SIMPLE_UDT_NAME + ")");
+ }
+}
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
index b4e7d8b1..62c9d685 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java
@@ -242,9 +242,7 @@ public class FourZeroCdcEventBuilder extends CdcEventBuilder
holder.add(makeValue(cell.buffer(), cell.column()));
if (cell.isExpiring())
{
- // TODO: CASSANDRA-14227 Support unit interpretation,
- // so that TTL does not overflow and become negative.
- setTTL(cell.ttl(),
Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime()));
+ setTTL(cell.ttl(), cell.localDeletionTime());
}
}
}
@@ -274,7 +272,7 @@ public class FourZeroCdcEventBuilder extends CdcEventBuilder
buffer.addCell(cell);
if (cell.isExpiring())
{
- setTTL(cell.ttl(),
Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime()));
+ setTTL(cell.ttl(), cell.localDeletionTime());
}
}
}
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
index 963f3454..dd0cc171 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableMetadata;
@@ -31,21 +33,25 @@ public class DbUtils
throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
}
- public static DeletionTime deletionTime(long markedForDeleteAt, int
localDeletionTime)
+ @VisibleForTesting
+ public static DeletionTime deletionTime(long markedForDeleteAt, long
localDeletionTime)
{
return DeletionTime.build(markedForDeleteAt, localDeletionTime);
}
+ @VisibleForTesting
public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds)
{
return LivenessInfo.create(timestamp, nowInSeconds);
}
+ @VisibleForTesting
public static PartitionUpdate fullPartitionDeletion(TableMetadata
metadata, ByteBuffer key, long timestamp, long nowInSec)
{
return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp,
nowInSec);
}
+ @VisibleForTesting
public static PartitionUpdate.SimpleBuilder
partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long
nowInSec)
{
return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(nowInSec);
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
index 63ab2fdb..dfd18bb3 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
@@ -187,7 +187,7 @@ public final class SSTableTombstoneWriter implements
Closeable
ClientState.forInternalCalls(),
options,
delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
- (int)
TimeUnit.MILLISECONDS.toSeconds(now),
+
TimeUnit.MILLISECONDS.toSeconds(now),
delete.getTimeToLive(options),
Collections.emptyMap());
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
index df6af138..34426e63 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
@@ -117,7 +117,7 @@ public class CompactionStreamScanner extends
AbstractStreamScanner
@Override
UnfilteredPartitionIterator initializePartitions()
{
- int nowInSec = timeProvider.referenceEpochInSeconds();
+ long nowInSec = timeProvider.referenceEpochInSeconds();
Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace);
ColumnFamilyStore cfStore =
keyspace.getColumnFamilyStore(metadata.name);
controller = new PurgingCompactionController(cfStore,
CompactionParams.TombstoneOption.NONE);
diff --git
a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index 8dffb296..72a656a3 100644
---
a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++
b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -48,7 +48,7 @@ public abstract class CqlType extends AbstractCqlType
long deletionTime)
{
Preconditions.checkArgument(cd.isComplex(), "The method only works
with complex columns");
- rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime,
(int) TimeUnit.MICROSECONDS.toSeconds(deletionTime)));
+ rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime,
TimeUnit.MICROSECONDS.toSeconds(deletionTime)));
}
public static BufferCell tombstone(ColumnMetadata column, long timestamp,
long nowInSec, CellPath path)
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
index 7965272e..4284b0cc 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -33,23 +34,32 @@ public class DbUtils
throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
}
- public static DeletionTime deletionTime(long markedForDeleteAt, int
localDeletionTime)
+
+ @VisibleForTesting
+ public static DeletionTime deletionTime(long markedForDeleteAt, long
localDeletionTime)
{
- return new DeletionTime(markedForDeleteAt, localDeletionTime);
+ // C* 4.0 DeletionTime constructor requires int for localDeletionTime;
checked cast will throw after Y2038
+ return new DeletionTime(markedForDeleteAt,
Ints.checkedCast(localDeletionTime));
}
+ @VisibleForTesting
public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds)
{
+ // C* 4.0 LivenessInfo.create requires int for nowInSeconds; checked
cast will throw after Y2038
return LivenessInfo.create(timestamp, Ints.checkedCast(nowInSeconds));
}
+ @VisibleForTesting
public static PartitionUpdate fullPartitionDeletion(TableMetadata
metadata, ByteBuffer key, long timestamp, long nowInSec)
{
+ // C* 4.0 fullPartitionDelete requires int for nowInSec; checked cast
will throw after Y2038
return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp,
Ints.checkedCast(nowInSec));
}
+ @VisibleForTesting
public static PartitionUpdate.SimpleBuilder
partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long
nowInSec)
{
+ // C* 4.0 simpleBuilder.nowInSec requires int; checked cast will throw
after Y2038
return PartitionUpdate.simpleBuilder(metadata,
key).nowInSec(Ints.checkedCast(nowInSec));
}
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
index 32ff7053..57868d39 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
import org.apache.cassandra.bridge.CassandraSchema;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -185,7 +186,7 @@ public final class SSTableTombstoneWriter implements
Closeable
delete.updatedColumns(),
options,
delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options),
- (int)
TimeUnit.MILLISECONDS.toSeconds(now),
+
Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(now)),
delete.getTimeToLive(options),
Collections.emptyMap());
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
index 05d9d45e..0853325c 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
@@ -25,6 +25,7 @@ import java.math.BigInteger;
import java.util.Iterator;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringPrefix;
@@ -438,12 +439,14 @@ public abstract class AbstractStreamScanner implements
StreamScanner<RowData>, C
{
AbstractComplexTypeBuffer buffer =
AbstractComplexTypeBuffer.newBuffer(column.type, cellCount);
long maxTimestamp = Long.MIN_VALUE;
+ // C* 4.0 Cell.isLive requires int for nowInSec; checked cast
will throw after Y2038
+ int referenceEpochInSecondsAsInt =
Ints.checkedCast(timeProvider.referenceEpochInSeconds());
while (cells.hasNext())
{
Cell<?> cell = cells.next();
// Re: isLive vs. isTombstone - isLive considers TTL so
that if a cell is expiring soon,
// it is handled as tombstone
- if (cell.isLive(timeProvider.referenceEpochInSeconds()))
+ if (cell.isLive(referenceEpochInSecondsAsInt))
{
buffer.addCell(cell);
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
index 89a2df3f..2925eb59 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java
@@ -28,6 +28,7 @@ import java.util.function.LongPredicate;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
import org.apache.cassandra.db.AbstractCompactionController;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -117,7 +118,7 @@ public class CompactionStreamScanner extends
AbstractStreamScanner
@Override
UnfilteredPartitionIterator initializePartitions()
{
- int nowInSec = timeProvider.referenceEpochInSeconds();
+ long nowInSec = timeProvider.referenceEpochInSeconds();
Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace);
ColumnFamilyStore cfStore =
keyspace.getColumnFamilyStore(metadata.name);
controller = new PurgingCompactionController(cfStore,
CompactionParams.TombstoneOption.NONE);
@@ -125,7 +126,8 @@ public class CompactionStreamScanner extends
AbstractStreamScanner
.map(Scannable::scanner)
.collect(Collectors.toList());
scanners = new AbstractCompactionStrategy.ScannerList(scannerList);
- ci = new CompactionIterator(OperationType.COMPACTION,
scanners.scanners, controller, nowInSec, taskId);
+ // C* 4.0 CompactionIterator requires int for nowInSec; checked cast
will throw after Y2038
+ ci = new CompactionIterator(OperationType.COMPACTION,
scanners.scanners, controller, Ints.checkedCast(nowInSec), taskId);
return ci;
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
index e4425d75..008d34cc 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.cdc.api.Row;
@@ -138,7 +139,7 @@ public abstract class AbstractCqlType implements
CqlField.CqlType
ColumnMetadata cd,
long timestamp,
int ttl,
- int now,
+ long now,
Object value)
{
addCell(rowBuilder, cd, timestamp, ttl, now, value, null);
@@ -149,7 +150,7 @@ public abstract class AbstractCqlType implements
CqlField.CqlType
ColumnMetadata cd,
long timestamp,
int ttl,
- int now,
+ long now,
Object value,
CellPath cellPath)
{
@@ -199,6 +200,7 @@ public abstract class AbstractCqlType implements
CqlField.CqlType
long deletionTime)
{
Preconditions.checkArgument(cd.isComplex(), "The method only works
with complex columns");
- rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, (int)
TimeUnit.MICROSECONDS.toSeconds(deletionTime)));
+ // C* 4.0 DeletionTime constructor requires int for localDeletionTime
+ rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime,
Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(deletionTime))));
}
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
index e91027a6..3912d9a8 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java
@@ -31,11 +31,13 @@ public abstract class CqlType extends AbstractCqlType
{
public static BufferCell tombstone(ColumnMetadata column, long timestamp,
long nowInSec, CellPath path)
{
+ // C* 4.0 BufferCell.tombstone requires int for nowInSec; checked cast
will throw after Y2038
return BufferCell.tombstone(column, timestamp,
Ints.checkedCast(nowInSec), path);
}
public static BufferCell expiring(ColumnMetadata column, long timestamp,
int ttl, long nowInSec, ByteBuffer value, CellPath path)
{
+ // C* 4.0 BufferCell.expiring requires int for nowInSec; checked cast
will throw after Y2038
return BufferCell.expiring(column, timestamp, ttl,
Ints.checkedCast(nowInSec), value, path);
}
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
index a7cf8bea..43619e6f 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java
@@ -105,15 +105,15 @@ public abstract class AbstractCqlList extends
CqlCollection implements CqlField.
ColumnMetadata cd,
long timestamp,
int ttl,
- int now,
+ long now,
Object value)
{
for (Object o : (List<?>) value)
{
if (ttl != NO_TTL)
{
- rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl,
now, type().serialize(o),
-
CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))));
+ rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now,
type().serialize(o),
+
CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))));
}
else
{
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
index 6d0e49d3..e9003b09 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
@@ -120,15 +120,15 @@ public class CqlMap extends CqlCollection implements
CqlField.CqlMap
ColumnMetadata cd,
long timestamp,
int ttl,
- int now,
+ long now,
Object value)
{
for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet())
{
if (ttl != NO_TTL)
{
- rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl,
now, valueType().serialize(entry.getValue()),
-
CellPath.create(keyType().serialize(entry.getKey()))));
+ rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now,
valueType().serialize(entry.getValue()),
+
CellPath.create(keyType().serialize(entry.getKey()))));
}
else
{
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
index c50b8059..bbe504ae 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
@@ -104,15 +104,15 @@ public class CqlSet extends CqlList implements
CqlField.CqlSet
ColumnMetadata cd,
long timestamp,
int ttl,
- int now,
+ long now,
Object value)
{
for (Object o : (Set<?>) value)
{
if (ttl != NO_TTL)
{
- rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl,
now, ByteBufferUtil.EMPTY_BYTE_BUFFER,
-
CellPath.create(type().serialize(o))));
+ rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now,
ByteBufferUtil.EMPTY_BYTE_BUFFER,
+
CellPath.create(type().serialize(o))));
}
else
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]