This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e0700f  [Fix] Fix processedOffset update when retry load (#79)
3e0700f is described below

commit 3e0700f1ae27854380cc763c869d496ef71c7a7e
Author: wudi <[email protected]>
AuthorDate: Wed Jul 2 19:37:58 2025 +0800

    [Fix] Fix processedOffset update when retry load (#79)
---
 .../doris/kafka/connector/writer/DorisWriter.java  | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 6e1a5a4..cfa3ef9 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -46,6 +46,8 @@ public abstract class DorisWriter {
     protected final AtomicLong committedOffset; // loaded offset + 1
     protected final AtomicLong flushedOffset; // flushed offset
     protected final AtomicLong processedOffset; // processed offset
+    protected final AtomicLong skipMinOffset; // skipMinOffset offset
+    protected final AtomicLong skipMaxOffset; // skipMaxOffset offset
     protected long previousFlushTimeStamp;
 
     // make the initialization lazy
@@ -86,6 +88,8 @@ public abstract class DorisWriter {
         this.processedOffset = new AtomicLong(-1);
         this.flushedOffset = new AtomicLong(-1);
         this.committedOffset = new AtomicLong(0);
+        this.skipMinOffset = new AtomicLong(-1);
+        this.skipMaxOffset = new AtomicLong(-1);
         this.previousFlushTimeStamp = System.currentTimeMillis();
 
         this.dorisOptions = dorisOptions;
@@ -118,6 +122,19 @@ public abstract class DorisWriter {
         // discard the record if the record offset is smaller or equal to 
server side offset
         if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
                 && record.kafkaOffset() > processedOffset.get()) {
+
+            if (skipMinOffset.get() != -1 || skipMaxOffset.get() != -1) {
+                LOG.info(
+                        "Skip partition {} offset {} to {}, cause these are 
smaller than processedOffset, offsetPersistedInDoris={}, processedOffset={}",
+                        partition,
+                        skipMinOffset.get(),
+                        skipMaxOffset.get(),
+                        offsetPersistedInDoris.get(),
+                        processedOffset.get());
+                skipMinOffset.set(-1);
+                skipMaxOffset.set(-1);
+            }
+
             SinkRecord dorisRecord = record;
             RecordBuffer tmpBuff = null;
 
@@ -131,19 +148,19 @@ public abstract class DorisWriter {
 
             if (tmpBuff != null) {
                 LOG.info(
-                        "trigger flush by buffer size or count, buffer size: 
{}, num of records: {}",
+                        "trigger flush by buffer size or count, partition: {}, 
buffer size: {}, num of records: {}",
+                        partition,
                         tmpBuff.getBufferSizeBytes(),
                         tmpBuff.getNumOfRecords());
                 flush(tmpBuff);
+                processedOffset.set(dorisRecord.kafkaOffset());
             }
-            processedOffset.set(dorisRecord.kafkaOffset());
         } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "The record offset is smaller than processedOffset. 
recordOffset={}, offsetPersistedInDoris={}, processedOffset={}",
-                        record.kafkaOffset(),
-                        offsetPersistedInDoris.get(),
-                        processedOffset.get());
+            if (skipMinOffset.get() == -1) {
+                skipMinOffset.set(record.kafkaOffset());
+                skipMaxOffset.set(record.kafkaOffset());
+            } else {
+                skipMaxOffset.set(record.kafkaOffset());
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to