This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba38f156 CASSANALYTICS-126: Flush event consumer before persisting CDC
state to prevent data loss on failure (#178)
ba38f156 is described below
commit ba38f156375a51c9e6639ce92c3da88b627e4ac4
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Mar 16 15:20:35 2026 -0700
CASSANALYTICS-126: Flush event consumer before persisting CDC state to
prevent data loss on failure (#178)
Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSANALYTICS-127
---
CHANGES.txt | 1 +
.../src/main/java/org/apache/cassandra/cdc/Cdc.java | 12 ++++++++++++
.../java/org/apache/cassandra/cdc/api/EventConsumer.java | 13 +++++++++++++
3 files changed, 26 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ade1c36..53e81fd2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Flush event consumer before persisting CDC state to prevent data loss on
failure (CASSANALYTICS-126)
* Fix ReadStatusTracker to distinguish clean completion from error
termination in BufferingCommitLogReader (CASSANALYTICS-129)
* Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
* Fixing CdcTests.testMockedCdc broken due to incorrect position update in
BufferingCommitLogReader (CASSANALYTICS-127)
diff --git
a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java
b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java
index b3409cdf..6d3617d2 100644
--- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java
+++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java
@@ -318,6 +318,18 @@ public class Cdc implements Closeable
eventConsumer.accept(event);
}
+ // flush before persisting state; if delivery fails this throws,
+ // skipping persist() so the micro-batch is retried on the next run
+ try
+ {
+ eventConsumer.flush();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during event consumer
flush", e);
+ }
+
// persist end state
CdcState endState = it.endState();
persist(endState, tokenRange);
diff --git
a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
index a0ccddee..0890b0f4 100644
---
a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
+++
b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
@@ -25,4 +25,17 @@ import org.apache.cassandra.cdc.msg.CdcEvent;
public interface EventConsumer extends Consumer<CdcEvent>
{
+ /**
+ * Flush any pending events to the transport layer.
+ * Called after each micro-batch and before CDC state is persisted, to
ensure
+ * all events have been durably delivered before the commit-log position
advances.
+ * If delivery fails, implementations must throw so that state is NOT
persisted
+ * and the micro-batch will be retried on the next run.
+ *
+ * @throws InterruptedException if the calling thread is interrupted while
flushing
+ */
+ default void flush() throws InterruptedException
+ {
+ // no-op by default
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]