Copilot commented on code in PR #60181:
URL: https://github.com/apache/doris/pull/60181#discussion_r2740831427


##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java:
##########
@@ -32,5 +32,6 @@ public class DataSourceConfigKeys {
     public static final String OFFSET_INITIAL = "initial";
     public static final String OFFSET_EARLIEST = "earliest";
     public static final String OFFSET_LATEST = "latest";

Review Comment:
   Breaking API change: the configuration key "split_size" has been renamed to 
"snapshot_split_size". This will break existing jobs that use the old 
configuration key. Consider supporting both keys with deprecation warning for 
backward compatibility, or document this breaking change in the release notes.
   ```suggestion
       public static final String OFFSET_LATEST = "latest";
       /**
        * Deprecated: use {@link #SNAPSHOT_SPLIT_SIZE} instead.
        * Kept for backward compatibility with existing jobs using the 
"split_size" key.
        */
       @Deprecated
       public static final String SPLIT_SIZE = "split_size";
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -68,6 +69,8 @@
 public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
     public static final String SPLIT_ID = "splitId";
     private static final ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   The default value for SNAPSHOT_PARALLELISM_DEFAULT is set to 4, but there's 
no documentation or comment explaining why 4 was chosen as the default. 
Consider adding a comment explaining the rationale for this value and under 
what conditions users should adjust it.
   ```suggestion
       private static final ObjectMapper objectMapper = new ObjectMapper();
       // Default snapshot parallelism used when 
DataSourceConfigKeys.SNAPSHOT_PARALLELISM is not set.
       // 4 is chosen as a conservative default that provides some parallelism 
on typical production
       // machines without putting too much load on the source database or the 
Doris FE. Increase this
       // value if the source database and network can safely handle more 
concurrent snapshot reads,
       // or decrease it for small / under-provisioned databases or when 
snapshotting causes high load.
   ```



##########
regression-test/conf/regression-conf.groovy:
##########
@@ -24,7 +24,7 @@ defaultDb = "regression_test"
 // init cmd like: select @@session.tx_read_only
 // at each time we connect.
 // add allowLoadLocalInfile so that the jdbc can execute mysql load data from 
client.
-jdbcUrl = 
"jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round"
+jdbcUrl = 
"jdbc:mysql://10.16.10.6:29939/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round"

Review Comment:
   Hardcoded IP address and port in configuration file. This configuration 
should not be committed to the repository as it contains environment-specific 
settings (10.16.10.6:29939). This will break tests in other environments.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -162,42 +163,54 @@ public void writeRecord(String database, String table, 
byte[] record) {
                 lock.unlock();
             }
         }
+
+        // Single table flush according to the STREAM_LOAD_MAX_BYTES
+        if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES) {
+            bufferFullFlush(bufferKey);
+        }
+    }
+
+    public void bufferFullFlush(String bufferKey) {
+         doFlush(bufferKey, false, true);
     }
 
-    public boolean cacheFullFlush() {
-        return doFlush(true, true);
+    public void forceFlush() {
+         doFlush(null, true, false);
     }
 
-    public boolean forceFlush() {
-        return doFlush(true, false);
+    public void cacheFullFlush() {
+         doFlush(null, true, true);
     }
 
-    private synchronized boolean doFlush(boolean waitUtilDone, boolean 
cacheFull) {
+
+    private synchronized void doFlush(String bufferKey, boolean waitUtilDone, 
boolean bufferFull) {
         checkFlushException();
-        if (waitUtilDone || cacheFull) {
-            return flush(waitUtilDone);
+        if (waitUtilDone || bufferFull) {
+            flush(bufferKey, waitUtilDone);
         }
-        return false;
     }
 
-    private synchronized boolean flush(boolean waitUtilDone) {
+    private synchronized void flush(String bufferKey, boolean waitUtilDone) {
         if (!waitUtilDone && bufferMap.isEmpty()) {
             // bufferMap may have been flushed by other threads
             LOG.info("bufferMap is empty, no need to flush");
-            return false;
+            return;
         }
-        for (String key : bufferMap.keySet()) {
-            if (waitUtilDone) {
-                // Ensure that the interval satisfies intervalMS
-                flushBuffer(key);
+
+        if (null == bufferKey) {
+            for (String key : bufferMap.keySet()) {
+                if (waitUtilDone) {
+                    flushBuffer(key);
+                }
             }
-        }
-        if (!waitUtilDone) {
-            return false;
+        } else if (bufferMap.containsKey(bufferKey)) {
+            flushBuffer(bufferKey);
         } else {
+            LOG.warn("buffer not found for key: {}, may be already flushed.", 
bufferKey);
+        }
+        if (waitUtilDone) {
             waitAsyncLoadFinish();
         }
-        return true;
     }

Review Comment:
   The flush method signature has changed from returning boolean to void, but 
the return value was used by callers to determine if flush was successful. This 
is a breaking API change. Verify that all callers have been updated to handle 
the new void return type appropriately.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +178,234 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
+        
         LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
 
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        String splitId = String.valueOf(offsetMeta.get(SPLIT_ID));
+        if (BINLOG_SPLIT_ID.equals(splitId)){
+            // Binlog split mode
+            return prepareBinlogSplit(offsetMeta, baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            // Extract snapshot split list
+            List<MySqlSnapshotSplit> snapshotSplits = 
extractSnapshotSplits(offsetMeta, baseReq);
+            return prepareSnapshotSplits(snapshotSplits, baseReq);
         }
-
-        // build response with iterator
+    }
+    
+    /**
+     * Extract snapshot splits from meta.
+     * Only supports format: {"splits": [{"splitId": "xxx", ...}, ...]}
+     * 
+     * @return List of snapshot splits, or null if it's a binlog split
+     */
+    private List<MySqlSnapshotSplit> extractSnapshotSplits(
+            Map<String, Object> offsetMeta, 
+            JobBaseRecordRequest baseReq) throws JsonProcessingException {
+        
+        // Check if it contains "splits" array
+        Object splitsObj = offsetMeta.get("splits");
+        if (splitsObj == null) {
+            throw new RuntimeException("Invalid meta format: missing 'splits' 
array");
+        }
+        
+        if (!(splitsObj instanceof List)) {
+            throw new RuntimeException("Invalid meta format: 'splits' must be 
an array");
+        }
+        
+        // Parse splits array
+        List<Map<String, Object>> splitMetaList = (List<Map<String, Object>>) 
splitsObj;
+        if (splitMetaList.isEmpty()) {
+            throw new RuntimeException("Invalid meta format: 'splits' array is 
empty");
+        }
+        
+        List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
+        for (Map<String, Object> splitMeta : splitMetaList) {
+            MySqlSnapshotSplit split = createSnapshotSplit(splitMeta, baseReq);
+            snapshotSplits.add(split);
+        }
+        
+        LOG.info("Extracted {} snapshot split(s) from meta", 
snapshotSplits.size());
+        return snapshotSplits;
+    }
+    
+    /**
+     * Prepare snapshot splits (unified handling for single or multiple splits)
+     */
+    private SplitReadResult prepareSnapshotSplits(
+            List<MySqlSnapshotSplit> splits,
+            JobBaseRecordRequest baseReq) throws Exception {
+        
+        LOG.info("Preparing {} snapshot split(s) for reading", splits.size());
+        
+        // Clear previous contexts
+        this.snapshotReaderContexts.clear();
+        this.currentReaderIndex = 0;
+        
+        // Create reader for each split and submit
+        for (int i = 0; i < splits.size(); i++) {
+            MySqlSnapshotSplit split = splits.get(i);
+            
+            // Create independent reader (each has its own Debezium queue)
+            SnapshotSplitReader reader = getSnapshotSplitReader(baseReq, i);
+            
+            // Submit split (triggers async reading, data goes into reader's 
Debezium queue)
+            reader.submitSplit(split);
+            
+            // Create split state
+            MySqlSnapshotSplitState splitState = new 
MySqlSnapshotSplitState(split);
+            
+            // Save context using generic SnapshotReaderContext
+            SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState> context = 
+                new SnapshotReaderContext<>(split, reader, splitState);
+            snapshotReaderContexts.add(context);
+            
+            LOG.info("Created reader {}/{} and submitted split: {} (table: 
{})", 
+                    i + 1, splits.size(), split.splitId(), 
split.getTableId().identifier());

Review Comment:
   Potential resource leak: if an exception occurs after some snapshot readers 
are created but before all are added to snapshotReaderContexts, the 
already-created readers will not be closed. Consider using try-catch to ensure 
cleanup, or add all contexts first with null readers and then initialize them.
   ```suggestion
           // Track created readers so we can clean them up on failure
           List<SnapshotSplitReader> createdReaders = new ArrayList<>();
           try {
               // Create reader for each split and submit
               for (int i = 0; i < splits.size(); i++) {
                   MySqlSnapshotSplit split = splits.get(i);
                   
                   // Create independent reader (each has its own Debezium 
queue)
                   SnapshotSplitReader reader = getSnapshotSplitReader(baseReq, 
i);
                   // Add to tracking list immediately after creation
                   createdReaders.add(reader);
                   
                   // Submit split (triggers async reading, data goes into 
reader's Debezium queue)
                   reader.submitSplit(split);
                   
                   // Create split state
                   MySqlSnapshotSplitState splitState = new 
MySqlSnapshotSplitState(split);
                   
                   // Save context using generic SnapshotReaderContext
                   SnapshotReaderContext<MySqlSnapshotSplit, 
SnapshotSplitReader, MySqlSnapshotSplitState> context = 
                           new SnapshotReaderContext<>(split, reader, 
splitState);
                   snapshotReaderContexts.add(context);
                   
                   LOG.info("Created reader {}/{} and submitted split: {} 
(table: {})",
                           i + 1, splits.size(), split.splitId(), 
split.getTableId().identifier());
               }
           } catch (Exception e) {
               // Best-effort cleanup of any readers that were created before 
the failure
               for (SnapshotSplitReader reader : createdReaders) {
                   if (reader != null) {
                       try {
                           reader.close();
                       } catch (Exception closeEx) {
                           LOG.warn("Error closing snapshot split reader during 
cleanup", closeEx);
                       }
                   }
               }
               throw e;
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -162,42 +163,54 @@ public void writeRecord(String database, String table, 
byte[] record) {
                 lock.unlock();
             }
         }
+
+        // Single table flush according to the STREAM_LOAD_MAX_BYTES
+        if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES) {
+            bufferFullFlush(bufferKey);
+        }
+    }
+
+    public void bufferFullFlush(String bufferKey) {
+         doFlush(bufferKey, false, true);
     }

Review Comment:
   The new bufferFullFlush method immediately flushes when buffer is full, but 
there's no synchronization between writeRecord and bufferFullFlush. If multiple 
threads are calling writeRecord concurrently, they could trigger multiple 
flushes for the same buffer. Consider adding proper synchronization or using 
the existing doFlush method's synchronization.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -796,6 +895,8 @@ public boolean hasNext() {
                         BinlogOffset position = 
RecordUtils.getBinlogPosition(element);
                         
splitState.asBinlogSplitState().setStartingOffset(position);
                     }

Review Comment:
   Missing return statement after storing nextRecord in hasNext(). The code 
sets 'nextRecord = element' and returns true, but then continues to the next 
check for isDataChangeRecord. This appears to be correct behavior for heartbeat 
events, but the code flow could be clearer with a comment explaining that 
heartbeat processing doesn't stop iteration.
   ```suggestion
                       }
                       // Heartbeat events are exposed as the next record and 
stop iteration
                       // for this hasNext() call; we do not fall through to 
data-change handling.
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +178,234 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
+        
         LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
 
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        String splitId = String.valueOf(offsetMeta.get(SPLIT_ID));
+        if (BINLOG_SPLIT_ID.equals(splitId)){
+            // Binlog split mode
+            return prepareBinlogSplit(offsetMeta, baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            // Extract snapshot split list
+            List<MySqlSnapshotSplit> snapshotSplits = 
extractSnapshotSplits(offsetMeta, baseReq);
+            return prepareSnapshotSplits(snapshotSplits, baseReq);
         }
-
-        // build response with iterator
+    }
+    
+    /**
+     * Extract snapshot splits from meta.
+     * Only supports format: {"splits": [{"splitId": "xxx", ...}, ...]}
+     * 
+     * @return List of snapshot splits, or null if it's a binlog split
+     */
+    private List<MySqlSnapshotSplit> extractSnapshotSplits(
+            Map<String, Object> offsetMeta, 
+            JobBaseRecordRequest baseReq) throws JsonProcessingException {
+        
+        // Check if it contains "splits" array
+        Object splitsObj = offsetMeta.get("splits");
+        if (splitsObj == null) {
+            throw new RuntimeException("Invalid meta format: missing 'splits' 
array");
+        }
+        
+        if (!(splitsObj instanceof List)) {
+            throw new RuntimeException("Invalid meta format: 'splits' must be 
an array");
+        }
+        
+        // Parse splits array
+        List<Map<String, Object>> splitMetaList = (List<Map<String, Object>>) 
splitsObj;
+        if (splitMetaList.isEmpty()) {
+            throw new RuntimeException("Invalid meta format: 'splits' array is 
empty");
+        }
+        
+        List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
+        for (Map<String, Object> splitMeta : splitMetaList) {
+            MySqlSnapshotSplit split = createSnapshotSplit(splitMeta, baseReq);
+            snapshotSplits.add(split);
+        }
+        
+        LOG.info("Extracted {} snapshot split(s) from meta", 
snapshotSplits.size());
+        return snapshotSplits;
+    }
+    
+    /**
+     * Prepare snapshot splits (unified handling for single or multiple splits)
+     */
+    private SplitReadResult prepareSnapshotSplits(
+            List<MySqlSnapshotSplit> splits,
+            JobBaseRecordRequest baseReq) throws Exception {
+        
+        LOG.info("Preparing {} snapshot split(s) for reading", splits.size());
+        
+        // Clear previous contexts
+        this.snapshotReaderContexts.clear();
+        this.currentReaderIndex = 0;
+        
+        // Create reader for each split and submit
+        for (int i = 0; i < splits.size(); i++) {
+            MySqlSnapshotSplit split = splits.get(i);
+            
+            // Create independent reader (each has its own Debezium queue)
+            SnapshotSplitReader reader = getSnapshotSplitReader(baseReq, i);
+            
+            // Submit split (triggers async reading, data goes into reader's 
Debezium queue)
+            reader.submitSplit(split);
+            
+            // Create split state
+            MySqlSnapshotSplitState splitState = new 
MySqlSnapshotSplitState(split);
+            
+            // Save context using generic SnapshotReaderContext
+            SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState> context = 
+                new SnapshotReaderContext<>(split, reader, splitState);
+            snapshotReaderContexts.add(context);
+            
+            LOG.info("Created reader {}/{} and submitted split: {} (table: 
{})", 
+                    i + 1, splits.size(), split.splitId(), 
split.getTableId().identifier());
+        }
+        
+        // Construct return result with all splits and states
+        SplitReadResult result = new SplitReadResult();
+        
+        List<SourceSplit> allSplits = new ArrayList<>();
+        Map<String, Object> allStates = new HashMap<>();
+        
+        for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState> context 
+                : snapshotReaderContexts) {
+            MySqlSnapshotSplit split = context.getSplit();
+            allSplits.add(split);
+            allStates.put(split.splitId(), context.getSplitState());
+        }
+        
+        result.setSplits(allSplits);
+        result.setSplitStates(allStates);
+        
+        return result;
+    }
+    
+    /**
+     * Prepare binlog split
+     */
+    private SplitReadResult prepareBinlogSplit(
+            Map<String, Object> offsetMeta, 
+            JobBaseRecordRequest baseReq) throws Exception {
+        Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta, 
baseReq);
+        this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+        this.binlogReader = getBinlogSplitReader(baseReq);
+        
+        LOG.info("Prepare binlog split: {}", this.binlogSplit.toString());
+        
+        this.binlogReader.submitSplit(this.binlogSplit);
+        this.binlogSplitState = new MySqlBinlogSplitState(this.binlogSplit);
+        
         SplitReadResult result = new SplitReadResult();
-        MySqlSplitState currentSplitState = null;
-        MySqlSplit currentSplit = this.getCurrentSplit();
-        if (currentSplit.isSnapshotSplit()) {
-            currentSplitState = new 
MySqlSnapshotSplitState(currentSplit.asSnapshotSplit());
+        result.setSplits(Collections.singletonList(this.binlogSplit));
+        
+        Map<String, Object> statesMap = new HashMap<>();
+        statesMap.put(this.binlogSplit.splitId(), this.binlogSplitState);
+        result.setSplitStates(statesMap);
+        
+        return result;
+    }
+
+    @Override
+    public Iterator<SourceRecord> pollRecords() throws InterruptedException {
+        if (!snapshotReaderContexts.isEmpty()) {
+            // Snapshot split mode
+            return pollRecordsFromSnapshotReaders();
+        } else if (binlogReader != null) {
+            // Binlog split mode
+            return pollRecordsFromBinlogReader();
         } else {
-            currentSplitState = new 
MySqlBinlogSplitState(currentSplit.asBinlogSplit());
+            throw new RuntimeException("No active snapshot or binlog reader 
available");
         }
+    }
+    
+    /**
+     * Poll records from multiple snapshot readers sequentially.
+     * Directly pulls data from each reader's Debezium queue without extra 
queue.
+     * 
+     * <p>This implementation loops through all readers in a single call.
+     * If a reader has no data, it automatically tries the next one.
+     * Each snapshot split is read only once.
+     */
+    private Iterator<SourceRecord> pollRecordsFromSnapshotReaders() 
+            throws InterruptedException {
+        if (snapshotReaderContexts.isEmpty()) {
+            return Collections.emptyIterator();
+        }
+        
+        // Try all remaining readers (starting from currentReaderIndex)
+        int totalReaders = snapshotReaderContexts.size();
+        int attempts = totalReaders - currentReaderIndex;
+        
+        for (int i = 0; i < attempts; i++) {
+            // Check if we've processed all splits
+            if (currentReaderIndex >= totalReaders) {
+                LOG.info("All {} snapshot splits have been processed", 
totalReaders);
+                return Collections.emptyIterator();
+            }
+            
+            // Get current reader context
+            SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, 
MySqlSnapshotSplitState> context = 
+                snapshotReaderContexts.get(currentReaderIndex);
+            
+            // Poll data directly from this reader's Debezium queue
+            Iterator<SourceRecords> dataIt = 
context.getReader().pollSplitRecords();
+            
+            if (dataIt == null || !dataIt.hasNext()) {
+                // This reader has no data currently, move to next reader and 
continue
+                LOG.debug("Split {} has no data, trying next split", 
context.getSplit().splitId());
+                currentReaderIndex++;
+                continue;
+            }
+            
+            // Has data, process and return
+            SourceRecords sourceRecords = dataIt.next();
 
-        Iterator<SourceRecord> filteredIterator =
-                new FilteredRecordIterator(currentSplitRecords, 
currentSplitState);
+            // Move to next reader for next call
+            currentReaderIndex++;

Review Comment:
   The currentReaderIndex is incremented before checking if we've received 
valid data. If the iterator returns null or an empty iterator, we've already 
moved to the next reader. This could cause issues in retry scenarios or if the 
same reader should be polled again. Consider incrementing the index only after 
successfully processing data.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -633,15 +744,16 @@ public List<String> deserialize(Map<String, String> 
config, SourceRecord element
 
     /**
      * Filtered record iterator that only returns data change records, 
filtering out watermark,
-     * heartbeat and other events. This is a private inner class that 
encapsulates record filtering
-     * logic, making the main method cleaner.
+     * heartbeat and other events. This is a private static inner class that 
encapsulates record
+     * filtering logic, making the main method cleaner.
      */
     private class FilteredRecordIterator implements Iterator<SourceRecord> {
         private final Iterator<SourceRecord> sourceIterator;
         private final SourceSplitState splitState;
         private SourceRecord nextRecord;
 
-        FilteredRecordIterator(SplitRecords currentSplitRecords, 
SourceSplitState splitState) {
+        FilteredRecordIterator(SplitRecords currentSplitRecords, 
SourceSplitState splitState,
+                JdbcIncrementalSourceReader reader) {

Review Comment:
   The FilteredRecordIterator constructor signature has changed to require a 
JdbcIncrementalSourceReader parameter, but this parameter is not used in the 
constructor or stored as a field. This is inconsistent with the MySQL 
implementation where the parameter is not required.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +224,226 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information.
+     *
+     * <p>Snapshot split: Returns immediately after reading; otherwise, 
returns after the
+     * maxInterval.
+     *
+     * <p>Binlog split: Fetches data at the maxInterval. Returns immediately 
if no data is found; if
+     * found, checks if the last record is a heartbeat record. If it is, 
returns immediately;
+     * otherwise, fetches again until the heartbeat deadline.
+     *
+     * <p>Heartbeat events will carry the latest offset.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
-        Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
+            boolean lastMessageIsHeartbeat = false;
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId(),
+                    isSnapshotSplit);
+
+            // 2. poll record
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+
+                    // Check if should stop
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    boolean timeoutReached =
+                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            scannedRows > 0,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            maxIntervalMillis,
+                            timeoutReached)) {
+                        break;
                     }
+                    continue;
                 }
-                // Check if maxInterval has been exceeded
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
-                    break;
+
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+
+                        // Mark last message as heartbeat (only for binlog 
split)
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
+
+                        // If already timeout, stop immediately when heartbeat 
received
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                        if (!isSnapshotSplit && timeoutReached) {
+                            LOG.info(
+                                    "Binlog split max interval reached and 
heartbeat received, stopping data reading");
+                            shouldStop = true;
+                            break;
+                        }
+                        // Skip heartbeat messages during normal processing
+                        continue;
+                    }
+
+                    // Process data messages
+                    List<String> serializedRecords =
+                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                        String database = writeRecordRequest.getTargetDb();
+                        String table = extractTable(element);
+                        for (String record : serializedRecords) {
+                            scannedRows++;
+                            byte[] dataBytes = record.getBytes();
+                            scannedBytes += dataBytes.length;
+                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                        }
+                        // Mark last message as data (not heartbeat)
+                        lastMessageIsHeartbeat = false;
+                    }
                 }
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    scannedRows,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+
         } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
+            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+        }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
+        // 3. Extract offset from split state
+        List<Map<String, String>> metaResponse = 
extractOffsetMeta(sourceReader, readResult);
+        // 4. wait all stream load finish
+        batchStreamLoad.forceFlush();
+
+        // 5. request fe api update offset
+        String currentTaskId = batchStreamLoad.getCurrentTaskId();
+        // The offset must be reset before commitOffset to prevent the next 
taskId from being create
+        // by the fe.
+        batchStreamLoad.resetTaskId();
+        batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows, 
scannedBytes);
+    }
+
+    public static boolean isHeartbeatEvent(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null
+                && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+    }
+
+    /**
+     * Determine if we should stop polling.
+     *
+     * @param isSnapshotSplit whether this is a snapshot split
+     * @param hasData whether we have received any data
+     * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+     * @param elapsedTime total elapsed time in milliseconds
+     * @param maxIntervalMillis max interval in milliseconds
+     * @param timeoutReached whether timeout is reached
+     * @return true if should stop, false if should continue
+     */
+    private boolean shouldStop(
+            boolean isSnapshotSplit,
+            boolean hasData,
+            boolean lastMessageIsHeartbeat,
+            long elapsedTime,
+            long maxIntervalMillis,
+            boolean timeoutReached) {
+
+        // 1. Snapshot split with data: if no more data in queue, stop 
immediately (no need to wait
+        // for timeout)
+        // snapshot split will be written to the debezium queue all at once.
+        // multiple snapshot splits are handled in the source reader.
+        if (isSnapshotSplit && hasData) {
+            LOG.info(
+                    "Snapshot split finished, no more data available. Total 
elapsed: {} ms",
+                    elapsedTime);
+            return true;
         }
-        // get offset from split state
-        try {
-            if (readResult.getSplitState() != null) {
-                // Set meta information for hw
-                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                    metaResponse = offsetRes;
-                }
 
-                // set meta for binlog event
-                if (sourceReader.isBinlogSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                    metaResponse = offsetRes;
-                }
-            } else {
-                throw new RuntimeException("split state is null");
-            }
+        // 2. Not timeout yet: continue waiting
+        if (!timeoutReached) {
+            return false;
+        }
 
-            // wait all stream load finish
-            batchStreamLoad.forceFlush();
+        // === Below are checks after timeout is reached ===
 
-            // request fe api
-            batchStreamLoad.commitOffset(metaResponse, scannedRows, 
scannedBytes);
-        } finally {
-            batchStreamLoad.resetTaskId();
+        // 3. No data received after timeout: stop
+        if (!hasData) {
+            LOG.info("No data received after timeout, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 4. Snapshot split after timeout (should not reach here, but keep as 
safety check)
+        if (isSnapshotSplit) {
+            LOG.info("Snapshot split timeout reached, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 5. Binlog split + last message is heartbeat: stop immediately
+        if (lastMessageIsHeartbeat) {
+            LOG.info("Binlog split timeout and last message is heartbeat, 
stopping");
+            return true;
+        }
+
+        // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout 
protection
+        if (elapsedTime > maxIntervalMillis + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+            LOG.warn(
+                    "Binlog split heartbeat wait timeout after {} ms, force 
stopping. Total elapsed: {} ms",
+                    elapsedTime - maxIntervalMillis,
+                    elapsedTime);
+            return true;
         }
+
+        // Continue waiting for heartbeat
+        return false;
     }

Review Comment:
   The method shouldStop has complex nested conditions that make it difficult 
to understand and maintain. Consider extracting some of the conditions into 
well-named helper methods (e.g., 'shouldStopForSnapshotSplit', 
'shouldStopForBinlogSplit') to improve readability and maintainability.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +224,226 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information.
+     *
+     * <p>Snapshot split: Returns immediately after reading; otherwise, 
returns after the
+     * maxInterval.
+     *
+     * <p>Binlog split: Fetches data at the maxInterval. Returns immediately 
if no data is found; if
+     * found, checks if the last record is a heartbeat record. If it is, 
returns immediately;
+     * otherwise, fetches again until the heartbeat deadline.
+     *
+     * <p>Heartbeat events will carry the latest offset.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
-        Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
+            boolean lastMessageIsHeartbeat = false;
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId(),
+                    isSnapshotSplit);
+
+            // 2. poll record
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+
+                    // Check if should stop
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    boolean timeoutReached =
+                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            scannedRows > 0,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            maxIntervalMillis,
+                            timeoutReached)) {
+                        break;
                     }
+                    continue;
                 }
-                // Check if maxInterval has been exceeded
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
-                    break;
+
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+
+                        // Mark last message as heartbeat (only for binlog 
split)
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
+
+                        // If already timeout, stop immediately when heartbeat 
received
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                        if (!isSnapshotSplit && timeoutReached) {
+                            LOG.info(
+                                    "Binlog split max interval reached and 
heartbeat received, stopping data reading");
+                            shouldStop = true;
+                            break;
+                        }
+                        // Skip heartbeat messages during normal processing
+                        continue;
+                    }
+
+                    // Process data messages
+                    List<String> serializedRecords =
+                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                        String database = writeRecordRequest.getTargetDb();
+                        String table = extractTable(element);
+                        for (String record : serializedRecords) {
+                            scannedRows++;
+                            byte[] dataBytes = record.getBytes();
+                            scannedBytes += dataBytes.length;
+                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                        }
+                        // Mark last message as data (not heartbeat)
+                        lastMessageIsHeartbeat = false;
+                    }
                 }
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    scannedRows,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+
         } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
+            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+        }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
+        // 3. Extract offset from split state
+        List<Map<String, String>> metaResponse = 
extractOffsetMeta(sourceReader, readResult);
+        // 4. wait all stream load finish
+        batchStreamLoad.forceFlush();
+
+        // 5. request fe api update offset
+        String currentTaskId = batchStreamLoad.getCurrentTaskId();
+        // The offset must be reset before commitOffset to prevent the next 
taskId from being create
+        // by the fe.
+        batchStreamLoad.resetTaskId();
+        batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows, 
scannedBytes);
+    }
+
+    public static boolean isHeartbeatEvent(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null
+                && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+    }
+
+    /**
+     * Determine if we should stop polling.
+     *
+     * @param isSnapshotSplit whether this is a snapshot split
+     * @param hasData whether we have received any data
+     * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+     * @param elapsedTime total elapsed time in milliseconds
+     * @param maxIntervalMillis max interval in milliseconds
+     * @param timeoutReached whether timeout is reached
+     * @return true if should stop, false if should continue
+     */
+    private boolean shouldStop(
+            boolean isSnapshotSplit,
+            boolean hasData,
+            boolean lastMessageIsHeartbeat,
+            long elapsedTime,
+            long maxIntervalMillis,
+            boolean timeoutReached) {
+
+        // 1. Snapshot split with data: if no more data in queue, stop 
immediately (no need to wait
+        // for timeout)
+        // snapshot split will be written to the debezium queue all at once.
+        // multiple snapshot splits are handled in the source reader.
+        if (isSnapshotSplit && hasData) {
+            LOG.info(
+                    "Snapshot split finished, no more data available. Total 
elapsed: {} ms",
+                    elapsedTime);
+            return true;
         }
-        // get offset from split state
-        try {
-            if (readResult.getSplitState() != null) {
-                // Set meta information for hw
-                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                    metaResponse = offsetRes;
-                }
 
-                // set meta for binlog event
-                if (sourceReader.isBinlogSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                    metaResponse = offsetRes;
-                }
-            } else {
-                throw new RuntimeException("split state is null");
-            }
+        // 2. Not timeout yet: continue waiting
+        if (!timeoutReached) {
+            return false;
+        }
 
-            // wait all stream load finish
-            batchStreamLoad.forceFlush();
+        // === Below are checks after timeout is reached ===
 
-            // request fe api
-            batchStreamLoad.commitOffset(metaResponse, scannedRows, 
scannedBytes);
-        } finally {
-            batchStreamLoad.resetTaskId();
+        // 3. No data received after timeout: stop
+        if (!hasData) {
+            LOG.info("No data received after timeout, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 4. Snapshot split after timeout (should not reach here, but keep as 
safety check)
+        if (isSnapshotSplit) {
+            LOG.info("Snapshot split timeout reached, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 5. Binlog split + last message is heartbeat: stop immediately
+        if (lastMessageIsHeartbeat) {
+            LOG.info("Binlog split timeout and last message is heartbeat, 
stopping");
+            return true;
+        }
+
+        // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout 
protection
+        if (elapsedTime > maxIntervalMillis + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {

Review Comment:
   The condition 'elapsedTime > maxIntervalMillis + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3' uses a magic number (3). This 
represents the timeout protection multiplier for waiting for heartbeat. 
Consider extracting this as a named constant (e.g., 
HEARTBEAT_WAIT_TIMEOUT_MULTIPLIER) to improve code clarity.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +224,226 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information.
+     *
+     * <p>Snapshot split: Returns immediately after reading; otherwise, 
returns after the
+     * maxInterval.
+     *
+     * <p>Binlog split: Fetches data at the maxInterval. Returns immediately 
if no data is found; if
+     * found, checks if the last record is a heartbeat record. If it is, 
returns immediately;
+     * otherwise, fetches again until the heartbeat deadline.
+     *
+     * <p>Heartbeat events will carry the latest offset.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
-        Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
+            boolean lastMessageIsHeartbeat = false;
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId(),
+                    isSnapshotSplit);
+
+            // 2. poll record
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+
+                    // Check if should stop
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    boolean timeoutReached =
+                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            scannedRows > 0,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            maxIntervalMillis,
+                            timeoutReached)) {
+                        break;
                     }
+                    continue;
                 }
-                // Check if maxInterval has been exceeded
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
-                    break;
+
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+
+                        // Mark last message as heartbeat (only for binlog 
split)
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
+
+                        // If already timeout, stop immediately when heartbeat 
received
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                        if (!isSnapshotSplit && timeoutReached) {
+                            LOG.info(
+                                    "Binlog split max interval reached and 
heartbeat received, stopping data reading");
+                            shouldStop = true;
+                            break;
+                        }
+                        // Skip heartbeat messages during normal processing
+                        continue;
+                    }
+
+                    // Process data messages
+                    List<String> serializedRecords =
+                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                        String database = writeRecordRequest.getTargetDb();
+                        String table = extractTable(element);
+                        for (String record : serializedRecords) {
+                            scannedRows++;
+                            byte[] dataBytes = record.getBytes();
+                            scannedBytes += dataBytes.length;
+                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                        }
+                        // Mark last message as data (not heartbeat)
+                        lastMessageIsHeartbeat = false;
+                    }
                 }
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    scannedRows,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+
         } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
+            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+        }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
+        // 3. Extract offset from split state
+        List<Map<String, String>> metaResponse = 
extractOffsetMeta(sourceReader, readResult);
+        // 4. wait all stream load finish
+        batchStreamLoad.forceFlush();
+
+        // 5. request fe api update offset
+        String currentTaskId = batchStreamLoad.getCurrentTaskId();
+        // The offset must be reset before commitOffset to prevent the next 
taskId from being create
+        // by the fe.
+        batchStreamLoad.resetTaskId();
+        batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows, 
scannedBytes);
+    }
+
+    public static boolean isHeartbeatEvent(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null
+                && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+    }
+
+    /**
+     * Determine if we should stop polling.
+     *
+     * @param isSnapshotSplit whether this is a snapshot split
+     * @param hasData whether we have received any data
+     * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+     * @param elapsedTime total elapsed time in milliseconds
+     * @param maxIntervalMillis max interval in milliseconds
+     * @param timeoutReached whether timeout is reached
+     * @return true if should stop, false if should continue
+     */
+    private boolean shouldStop(
+            boolean isSnapshotSplit,
+            boolean hasData,
+            boolean lastMessageIsHeartbeat,
+            long elapsedTime,
+            long maxIntervalMillis,
+            boolean timeoutReached) {
+
+        // 1. Snapshot split with data: if no more data in queue, stop 
immediately (no need to wait
+        // for timeout)
+        // snapshot split will be written to the debezium queue all at once.
+        // multiple snapshot splits are handled in the source reader.
+        if (isSnapshotSplit && hasData) {
+            LOG.info(
+                    "Snapshot split finished, no more data available. Total 
elapsed: {} ms",
+                    elapsedTime);
+            return true;
         }
-        // get offset from split state
-        try {
-            if (readResult.getSplitState() != null) {
-                // Set meta information for hw
-                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                    metaResponse = offsetRes;
-                }
 
-                // set meta for binlog event
-                if (sourceReader.isBinlogSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                    metaResponse = offsetRes;
-                }
-            } else {
-                throw new RuntimeException("split state is null");
-            }
+        // 2. Not timeout yet: continue waiting
+        if (!timeoutReached) {
+            return false;
+        }
 
-            // wait all stream load finish
-            batchStreamLoad.forceFlush();
+        // === Below are checks after timeout is reached ===
 
-            // request fe api
-            batchStreamLoad.commitOffset(metaResponse, scannedRows, 
scannedBytes);
-        } finally {
-            batchStreamLoad.resetTaskId();
+        // 3. No data received after timeout: stop
+        if (!hasData) {
+            LOG.info("No data received after timeout, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 4. Snapshot split after timeout (should not reach here, but keep as 
safety check)
+        if (isSnapshotSplit) {
+            LOG.info("Snapshot split timeout reached, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 5. Binlog split + last message is heartbeat: stop immediately
+        if (lastMessageIsHeartbeat) {
+            LOG.info("Binlog split timeout and last message is heartbeat, 
stopping");
+            return true;
+        }
+
+        // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout 
protection
+        if (elapsedTime > maxIntervalMillis + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+            LOG.warn(
+                    "Binlog split heartbeat wait timeout after {} ms, force 
stopping. Total elapsed: {} ms",
+                    elapsedTime - maxIntervalMillis,
+                    elapsedTime);
+            return true;
         }
+
+        // Continue waiting for heartbeat
+        return false;
     }
 
-    private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String 
targetDb) {
-        return batchStreamLoadMap.computeIfAbsent(
-                jobId,
-                k -> {
-                    LOG.info("Create DorisBatchStreamLoad for jobId={}", 
jobId);
-                    return new DorisBatchStreamLoad(jobId, targetDb);
-                });
+    private synchronized DorisBatchStreamLoad getOrCreateBatchStreamLoad(
+            WriteRecordRequest writeRecordRequest) {
+        DorisBatchStreamLoad batchStreamLoad =
+                batchStreamLoadMap.computeIfAbsent(
+                        writeRecordRequest.getJobId(),
+                        k -> {
+                            LOG.info(
+                                    "Create DorisBatchStreamLoad for jobId={}",
+                                    writeRecordRequest.getJobId());
+                            return new DorisBatchStreamLoad(
+                                    writeRecordRequest.getJobId(),
+                                    writeRecordRequest.getTargetDb());
+                        });
+        batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
+        
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
+        batchStreamLoad.setToken(writeRecordRequest.getToken());
+        return batchStreamLoad;

Review Comment:
   Race condition in getOrCreateBatchStreamLoad method. The method is 
synchronized but then calls setters on the returned object outside the 
synchronized block. If multiple threads call this method concurrently, they may 
overwrite each other's values for currentTaskId, frontendAddress, and token on 
the same DorisBatchStreamLoad instance.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SplitReadResult.java:
##########
@@ -18,18 +18,40 @@
 package org.apache.doris.cdcclient.source.reader;
 
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import lombok.Data;
 
-/** The result of reading a split with iterator. */
+/**
+ * Container for source splits and their associated states.
+ * Supports both single split (binlog) and multiple splits (snapshot).
+ * Iteration over records for these splits is handled separately (for example 
via pollRecords).
+ */
 @Data
 public class SplitReadResult {
-    private Iterator<SourceRecord> recordIterator;
-    // MySqlSplitState, SourceSplitState
-    private Object splitState;
-    // MySqlSplit SourceSplitBase
-    private SourceSplit split;
+    // List of splits (size=1 for binlog, size>=1 for snapshot)
+    private List<SourceSplit> splits;
+    
+    // Map of split states (key: splitId, value: state)
+    private Map<String, Object> splitStates;
+    
+    /**
+     * Get the first split ( The types in `splits` are the same.)
+     */
+    public SourceSplit getSplit() {
+        return splits != null && !splits.isEmpty() ? splits.get(0) : null;
+    }

Review Comment:
   Missing null check before accessing splits list. The method assumes splits 
is not null or empty, but there's no validation. If splits is null or empty, 
this will throw a NullPointerException or IndexOutOfBoundsException.



##########
fs_brokers/cdc_client/pom.xml:
##########
@@ -209,6 +209,9 @@ under the License.
                 <version>${spotless.version}</version>
                 <configuration>
                     <java>
+                        <includes>
+                            
<include>src/main/java/org/apache/doris/*.java</include>

Review Comment:
   The Spotless configuration now only includes specific paths under 
'src/main/java/org/apache/doris/*.java' which will exclude subdirectories. This 
means files in subdirectories like 'org/apache/doris/cdcclient/' will not be 
formatted. The include pattern should likely be 
'src/main/java/org/apache/doris/**/*.java' to include all subdirectories.
   ```suggestion
                               
<include>src/main/java/org/apache/doris/**/*.java</include>
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -481,13 +494,14 @@ public void resetTaskId() {
     }
 
     /** commit offfset to frontends. */
-    public void commitOffset(Map<String, String> meta, long scannedRows, long 
scannedBytes) {
+    public void commitOffset(
+            String taskId, List<Map<String, String>> meta, long scannedRows, 
long scannedBytes) {
         try {
             String url = String.format(COMMIT_URL_PATTERN, frontendAddress, 
targetDb);
             Map<String, Object> commitParams = new HashMap<>();
             commitParams.put("offset", OBJECT_MAPPER.writeValueAsString(meta));
             commitParams.put("jobId", jobId);
-            commitParams.put("taskId", currentTaskId);
+            commitParams.put("taskId", taskId);

Review Comment:
   The commitOffset method signature has changed to include taskId as a 
parameter, but the currentTaskId field is still being set via setter. This 
creates confusion about which taskId should be used. Consider either using only 
the parameter or only the field, not both. The implementation should be 
clarified to show why both are needed.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -162,42 +163,54 @@ public void writeRecord(String database, String table, 
byte[] record) {
                 lock.unlock();
             }
         }
+
+        // Single table flush according to the STREAM_LOAD_MAX_BYTES
+        if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES) {
+            bufferFullFlush(bufferKey);
+        }
+    }
+
+    public void bufferFullFlush(String bufferKey) {

Review Comment:
   Typo in variable name 'buffferKey' should be 'bufferKey'.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -306,7 +319,7 @@ public String getTimeoutReason() {
                 log.warn("Failed to get task timeout reason, response: {}", 
response);
             }
         } catch (ExecutionException | InterruptedException ex) {
-            log.error("Send get task fail reason request failed: ", ex);
+            log.error("Send get fail reason request failed: ", ex);

Review Comment:
   The log statement at line 322 has a typo: 'Send get fail reason request 
failed' should be 'Send get task fail reason request failed' to match the 
context and be more descriptive.
   ```suggestion
               log.error("Send get task fail reason request failed: ", ex);
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -65,9 +65,9 @@ public Object fetchSplits(@RequestBody 
FetchTableSplitsRequest ftsReq) {
             SourceReader reader = Env.getCurrentEnv().getReader(ftsReq);
             List splits = reader.getSourceSplits(ftsReq);
             return RestResponse.success(splits);
-        } catch (IllegalArgumentException ex) {
+        } catch (Exception ex) {
             LOG.error("Failed to fetch splits, jobId={}", ftsReq.getJobId(), 
ex);
-            return RestResponse.internalError(ex.getMessage());
+            return 
RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex));
         }

Review Comment:
   Exception handling has been broadened from catching IllegalArgumentException 
to catching all Exceptions. While this may prevent crashes, it could hide 
important errors. Consider logging at error level and differentiating between 
expected validation errors and unexpected exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to