This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2427d273 CASSANALYTICS-127: Fixing CdcTests.testMockedCdc broken due 
to incorrect position update in BufferingCommitLogReader (#179)
2427d273 is described below

commit 2427d2736524b0949df87e4783965f815dca0ac6
Author: Jyothsna konisa <[email protected]>
AuthorDate: Wed Mar 11 13:38:02 2026 -0700

    CASSANALYTICS-127: Fixing CdcTests.testMockedCdc broken due to incorrect 
position update in BufferingCommitLogReader (#179)
    
    Patch by Jyothsna Konisa; Reviewed by Josh McKenzie & Lukasz Antoniak for 
CASSANALYTICS-127
---
 CHANGES.txt                                        |  4 ++++
 .../org/apache/cassandra/cdc/LocalCommitLog.java   | 22 ++++++++++++++++++++--
 .../db/commitlog/BufferingCommitLogReader.java     |  8 --------
 3 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6b973c39..e2740c94 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+0.4.0
+-----
+ * Fixing CdcTests.testMockedCdc broken due to incorrect position update in 
BufferingCommitLogReader (CASSANALYTICS-127)
+
 0.3.0
 -----
  * Assign data file start offset based on BTI index (CASSANALYTICS-121)
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
index da7e06f0..3c696bca 100644
--- 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
@@ -20,7 +20,9 @@
 package org.apache.cassandra.cdc;
 
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.List;
 
 import org.apache.cassandra.cdc.api.CommitLog;
 import org.apache.cassandra.spark.data.FileSystemSource;
@@ -56,7 +58,8 @@ public class LocalCommitLog implements CommitLog
 
     public long maxOffset()
     {
-        return length;
+        List<String> lines = readIdxFile();
+        return lines.isEmpty() ? length : Long.parseLong(lines.get(0).trim());
     }
 
     public long length()
@@ -67,7 +70,22 @@ public class LocalCommitLog implements CommitLog
     public boolean completed()
     {
         // for CDC Cassandra writes COMPLETED in the final line of the 
CommitLog-7-*_cdc.idx index file.
-        return maxOffset() >= 67108818;
+        List<String> lines = readIdxFile();
+        return lines.size() >= 2 && 
lines.get(1).trim().equalsIgnoreCase("COMPLETED");
+    }
+
+    // Each commit log .log file has a corresponding _cdc.idx file that tracks 
the CDC offset and completion status.
+    private List<String> readIdxFile()
+    {
+        Path idxPath = path.resolveSibling(name.replace(".log", "_cdc.idx"));
+        try
+        {
+            return Files.exists(idxPath) ? Files.readAllLines(idxPath) : 
List.of();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public FileSystemSource<CommitLog> source()
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
index fd457d8a..d6476087 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
@@ -316,14 +316,6 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
                     break;
                 }
             }
-
-            // If the loop finished naturally (iterator exhausted) without 
hitting an error or limit,
-            // ensure the position reflects the end of the file. If we aborted 
early due to an error
-            // or mutation limit, 'this.position' remains at the last valid 
read offset.
-            if (statusTracker.shouldContinue())
-            {
-                this.position = (int) log.maxOffset();
-            }
         }
         // Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) 
cannot throw a checked exception,
         // so we check to see if a RuntimeException is wrapping an IOException.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to