This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba38f156 CASSANALYTICS-126: Flush event consumer before persisting CDC 
state to prevent data loss on failure (#178)
ba38f156 is described below

commit ba38f156375a51c9e6639ce92c3da88b627e4ac4
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Mar 16 15:20:35 2026 -0700

    CASSANALYTICS-126: Flush event consumer before persisting CDC state to 
prevent data loss on failure (#178)
    
    Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSANALYTICS-127
---
 CHANGES.txt                                                 |  1 +
 .../src/main/java/org/apache/cassandra/cdc/Cdc.java         | 12 ++++++++++++
 .../java/org/apache/cassandra/cdc/api/EventConsumer.java    | 13 +++++++++++++
 3 files changed, 26 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2ade1c36..53e81fd2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Flush event consumer before persisting CDC state to prevent data loss on 
failure (CASSANALYTICS-126)
  * Fix ReadStatusTracker to distinguish clean completion from error 
termination in BufferingCommitLogReader (CASSANALYTICS-129)
  * Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
  * Fixing CdcTests.testMockedCdc broken due to incorrect position update in 
BufferingCommitLogReader (CASSANALYTICS-127)
diff --git 
a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java 
b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java
index b3409cdf..6d3617d2 100644
--- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java
+++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java
@@ -318,6 +318,18 @@ public class Cdc implements Closeable
                 eventConsumer.accept(event);
             }
 
+            // flush before persisting state; if delivery fails this throws,
+            // skipping persist() so the micro-batch is retried on the next run
+            try
+            {
+                eventConsumer.flush();
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted during event consumer 
flush", e);
+            }
+
             // persist end state
             CdcState endState = it.endState();
             persist(endState, tokenRange);
diff --git 
a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
 
b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
index a0ccddee..0890b0f4 100644
--- 
a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
+++ 
b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/api/EventConsumer.java
@@ -25,4 +25,17 @@ import org.apache.cassandra.cdc.msg.CdcEvent;
 
 public interface EventConsumer extends Consumer<CdcEvent>
 {
+    /**
+     * Flush any pending events to the transport layer.
+     * Called after each micro-batch and before CDC state is persisted, to 
ensure
+     * all events have been durably delivered before the commit-log position 
advances.
+     * If delivery fails, implementations must throw so that state is NOT 
persisted
+     * and the micro-batch will be retried on the next run.
+     *
+     * @throws InterruptedException if the calling thread is interrupted while 
flushing
+     */
+    default void flush() throws InterruptedException
+    {
+        // no-op by default
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to