This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 207f28cd CASSANALYTICS-129 : Fix ReadStatusTracker to distinguish 
clean completion from error termination in BufferingCommitLogReader (#182)
207f28cd is described below

commit 207f28cd3586ae61f649cb2de5e53db03f58112d
Author: Jyothsna konisa <[email protected]>
AuthorDate: Fri Mar 13 15:24:28 2026 -0700

    CASSANALYTICS-129 : Fix ReadStatusTracker to distinguish clean completion 
from error termination in BufferingCommitLogReader (#182)
    
    Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSANALYTICS-129
---
 CHANGES.txt                                        |  1 +
 .../db/commitlog/BufferingCommitLogReader.java     | 26 +++++++++++++++++++---
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d0e3c524..2ade1c36 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Fix ReadStatusTracker to distinguish clean completion from error 
termination in BufferingCommitLogReader (CASSANALYTICS-129)
  * Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
  * Fixing CdcTests.testMockedCdc broken due to incorrect position update in 
BufferingCommitLogReader (CASSANALYTICS-127)
  * Commitlog reading not progressing in CDC due to incorrect 
CommitLogReader.isFullyRead (CASSANALYTICS-124)
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
index 19349b30..312cb96c 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
@@ -316,6 +316,14 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
                     break;
                 }
             }
+
+            // If the segment is flagged complete in the index but no 
end-of-segment marker was
+            // encountered, advance position to maxOffset so isFullyRead() 
returns true and CDC
+            // can move on to the next commit log.
+            if (log.completed() && statusTracker.noErrors())
+            {
+                this.position = (int) log.maxOffset();
+            }
         }
         // Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) 
cannot throw a checked exception,
         // so we check to see if a RuntimeException is wrapping an IOException.
@@ -425,7 +433,7 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
                 if (end - reader.getFilePointer() < 4)
                 {
                     logger.trace("Not enough bytes left for another mutation 
in this CommitLog section, continuing");
-                    statusTracker.requestTermination();
+                    statusTracker.markCleanCompletion();
                     return;
                 }
 
@@ -437,7 +445,7 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
                     // Mark the log as fully consumed so isFullyRead() returns 
true.
                     // The guard above ensures this is not overridden after 
readSection returns.
                     this.position = (int) log.maxOffset();
-                    statusTracker.requestTermination();
+                    statusTracker.markCleanCompletion();
                     return;
                 }
 
@@ -673,10 +681,12 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
         public String errorContext = "";
         public boolean tolerateErrorsInSection;
         private boolean error;
+        private boolean cleanCompletion;
 
         private ReadStatusTracker(int mutationLimit, boolean 
tolerateErrorsInSection)
         {
             this.error = false;
+            this.cleanCompletion = false;
             this.mutationsLeft = mutationLimit;
             this.tolerateErrorsInSection = tolerateErrorsInSection;
         }
@@ -692,13 +702,23 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
 
         public boolean shouldContinue()
         {
-            return !error && mutationsLeft != 0;
+            return !error && !cleanCompletion && mutationsLeft != 0;
+        }
+
+        public boolean noErrors()
+        {
+            return !error;
         }
 
         public void requestTermination()
         {
             error = true;
         }
+
+        public void markCleanCompletion()
+        {
+            cleanCompletion = true;
+        }
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to