This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 207f28cd CASSANALYTICS-129 : Fix ReadStatusTracker to distinguish
clean completion from error termination in BufferingCommitLogReader (#182)
207f28cd is described below
commit 207f28cd3586ae61f649cb2de5e53db03f58112d
Author: Jyothsna konisa <[email protected]>
AuthorDate: Fri Mar 13 15:24:28 2026 -0700
CASSANALYTICS-129 : Fix ReadStatusTracker to distinguish clean completion
from error termination in BufferingCommitLogReader (#182)
Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSANALYTICS-129
---
CHANGES.txt | 1 +
.../db/commitlog/BufferingCommitLogReader.java | 26 +++++++++++++++++++---
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d0e3c524..2ade1c36 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.4.0
-----
+ * Fix ReadStatusTracker to distinguish clean completion from error
termination in BufferingCommitLogReader (CASSANALYTICS-129)
* Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
* Fixing CdcTests.testMockedCdc broken due to incorrect position update in
BufferingCommitLogReader (CASSANALYTICS-127)
* Commitlog reading not progressing in CDC due to incorrect
CommitLogReader.isFullyRead (CASSANALYTICS-124)
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
index 19349b30..312cb96c 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
@@ -316,6 +316,14 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
break;
}
}
+
+ // If the segment is flagged complete in the index but no
end-of-segment marker was
+ // encountered, advance position to maxOffset so isFullyRead()
returns true and CDC
+ // can move on to the next commit log.
+ if (log.completed() && statusTracker.noErrors())
+ {
+ this.position = (int) log.maxOffset();
+ }
}
// Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop)
cannot throw a checked exception,
// so we check to see if a RuntimeException is wrapping an IOException.
@@ -425,7 +433,7 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
if (end - reader.getFilePointer() < 4)
{
logger.trace("Not enough bytes left for another mutation
in this CommitLog section, continuing");
- statusTracker.requestTermination();
+ statusTracker.markCleanCompletion();
return;
}
@@ -437,7 +445,7 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
// Mark the log as fully consumed so isFullyRead() returns
true.
// The guard above ensures this is not overridden after
readSection returns.
this.position = (int) log.maxOffset();
- statusTracker.requestTermination();
+ statusTracker.markCleanCompletion();
return;
}
@@ -673,10 +681,12 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
public String errorContext = "";
public boolean tolerateErrorsInSection;
private boolean error;
+ private boolean cleanCompletion;
private ReadStatusTracker(int mutationLimit, boolean
tolerateErrorsInSection)
{
this.error = false;
+ this.cleanCompletion = false;
this.mutationsLeft = mutationLimit;
this.tolerateErrorsInSection = tolerateErrorsInSection;
}
@@ -692,13 +702,23 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
public boolean shouldContinue()
{
- return !error && mutationsLeft != 0;
+ return !error && !cleanCompletion && mutationsLeft != 0;
+ }
+
+ public boolean noErrors()
+ {
+ return !error;
}
public void requestTermination()
{
error = true;
}
+
+ public void markCleanCompletion()
+ {
+ cleanCompletion = true;
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]