Copilot commented on code in PR #60181:
URL: https://github.com/apache/doris/pull/60181#discussion_r2721092347


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information.
+     *
+     * <p>Snapshot split: Returns immediately after reading; otherwise, 
returns after the
+     * maxInterval.
+     *
+     * <p>Binlog split: Fetches data at the maxInterval. Returns immediately 
if no data is found; if
+     * found, checks if the last record is a heartbeat record. If it is, 
returns immediately;
+     * otherwise, fetches again until the heartbeat deadline.
+     *
+     * <p>Heartbeat events will carry the latest offset.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
+            boolean lastMessageIsHeartbeat = false;
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId(),
+                    isSnapshotSplit);
+
+            // 2. poll record
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);

Review Comment:
   Similar to line 122 in buildRecordResponse, when an InterruptedException 
occurs during Thread.sleep(100), the thread's interrupted status should be 
preserved. Consider catching and re-interrupting the thread or propagating the 
exception properly.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information.
+     *
+     * <p>Snapshot split: Returns immediately after reading; otherwise, 
returns after the
+     * maxInterval.
+     *
+     * <p>Binlog split: Fetches data at the maxInterval. Returns immediately 
if no data is found; if
+     * found, checks if the last record is a heartbeat record. If it is, 
returns immediately;
+     * otherwise, fetches again until the heartbeat deadline.
+     *
+     * <p>Heartbeat events will carry the latest offset.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
+            boolean lastMessageIsHeartbeat = false;
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId(),
+                    isSnapshotSplit);
+
+            // 2. poll record
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+
+                    // Check if should stop
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    boolean timeoutReached =
+                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            scannedRows > 0,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            maxIntervalMillis,
+                            timeoutReached)) {
+                        break;
                     }
+                    continue;
                 }
-                // Check if maxInterval has been exceeded
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
-                    break;
+
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+
+                        // Mark last message as heartbeat (only for binlog 
split)
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
+
+                        // If already timeout, stop immediately when heartbeat 
received
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                        if (!isSnapshotSplit && timeoutReached) {
+                            LOG.info(
+                                    "Binlog split max interval reached and 
heartbeat received, stopping data reading");
+                            shouldStop = true;
+                            break;
+                        }
+                        // Skip heartbeat messages during normal processing
+                        continue;
+                    }
+
+                    // Process data messages
+                    List<String> serializedRecords =
+                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                        String database = writeRecordRequest.getTargetDb();
+                        String table = extractTable(element);
+                        for (String record : serializedRecords) {
+                            scannedRows++;
+                            byte[] dataBytes = record.getBytes();
+                            scannedBytes += dataBytes.length;
+                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                        }
+                        // Mark last message as data (not heartbeat)
+                        lastMessageIsHeartbeat = false;
+                    }
                 }
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    scannedRows,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+
         } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
+            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+        }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
+        // 3. Extract offset from split state
+        metaResponse = extractOffsetMeta(sourceReader, readResult);
+        // 4. wait all stream load finish
+        batchStreamLoad.forceFlush();
+
+        // 5. request fe api update offset
+        String currentTaskId = batchStreamLoad.getCurrentTaskId();
+        // The offset must be reset before commitOffset to prevent the next 
taskId from being create
+        // by the fe.
+        batchStreamLoad.resetTaskId();
+        batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows, 
scannedBytes);
+    }
+
+    public static boolean isHeartbeatEvent(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null
+                && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+    }
+
+    /**
+     * Determine if we should stop polling.
+     *
+     * @param isSnapshotSplit whether this is a snapshot split
+     * @param hasData whether we have received any data
+     * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+     * @param elapsedTime total elapsed time in milliseconds
+     * @param maxIntervalMillis max interval in milliseconds
+     * @param timeoutReached whether timeout is reached
+     * @return true if should stop, false if should continue
+     */
+    private boolean shouldStop(
+            boolean isSnapshotSplit,
+            boolean hasData,
+            boolean lastMessageIsHeartbeat,
+            long elapsedTime,
+            long maxIntervalMillis,
+            boolean timeoutReached) {
+
+        // 1. Snapshot split with data: if no more data in queue, stop 
immediately (no need to wait
+        // for timeout)
+        // snapshot split will be written to the debezium queue all at once.
+        if (isSnapshotSplit && hasData) {
+            LOG.info(
+                    "Snapshot split finished, no more data available. Total 
elapsed: {} ms",
+                    elapsedTime);
+            return true;
         }
-        // get offset from split state
-        try {
-            if (readResult.getSplitState() != null) {
-                // Set meta information for hw
-                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                    metaResponse = offsetRes;
-                }
 
-                // set meta for binlog event
-                if (sourceReader.isBinlogSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                    metaResponse = offsetRes;
-                }
-            } else {
-                throw new RuntimeException("split state is null");
-            }
+        // 2. Not timeout yet: continue waiting
+        if (!timeoutReached) {
+            return false;
+        }
 
-            // wait all stream load finish
-            batchStreamLoad.forceFlush();
+        // === Below are checks after timeout is reached ===
 
-            // request fe api
-            batchStreamLoad.commitOffset(metaResponse, scannedRows, 
scannedBytes);
-        } finally {
-            batchStreamLoad.resetTaskId();
+        // 3. No data received after timeout: stop
+        if (!hasData) {
+            LOG.info("No data received after timeout, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 4. Snapshot split after timeout (should not reach here, but keep as 
safety check)
+        if (isSnapshotSplit) {
+            LOG.info("Snapshot split timeout reached, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 5. Binlog split + last message is heartbeat: stop immediately
+        if (lastMessageIsHeartbeat) {
+            LOG.info("Binlog split timeout and last message is heartbeat, 
stopping");
+            return true;
+        }
+
+        // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout 
protection
+        if (elapsedTime > maxIntervalMillis + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+            LOG.warn(
+                    "Binlog split heartbeat wait timeout after {} ms, force 
stopping. Total elapsed: {} ms",
+                    elapsedTime - maxIntervalMillis,
+                    elapsedTime);
+            return true;
         }
+
+        // Continue waiting for heartbeat
+        return false;
     }
 
-    private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String 
targetDb) {
-        return batchStreamLoadMap.computeIfAbsent(
-                jobId,
-                k -> {
-                    LOG.info("Create DorisBatchStreamLoad for jobId={}", 
jobId);
-                    return new DorisBatchStreamLoad(jobId, targetDb);
-                });
+    private synchronized DorisBatchStreamLoad getOrCreateBatchStreamLoad(
+            WriteRecordRequest writeRecordRequest) {
+        DorisBatchStreamLoad batchStreamLoad =
+                batchStreamLoadMap.computeIfAbsent(
+                        writeRecordRequest.getJobId(),
+                        k -> {
+                            LOG.info(
+                                    "Create DorisBatchStreamLoad for jobId={}",
+                                    writeRecordRequest.getJobId());
+                            return new DorisBatchStreamLoad(
+                                    writeRecordRequest.getJobId(),
+                                    writeRecordRequest.getTargetDb());
+                        });
+        batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
+        
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
+        batchStreamLoad.setToken(writeRecordRequest.getToken());
+        return batchStreamLoad;

Review Comment:
   The method getOrCreateBatchStreamLoad has been synchronized, which is good 
for thread safety. However, after creating or retrieving the 
DorisBatchStreamLoad, the code sets mutable properties (currentTaskId, 
frontendAddress, token) on lines 443-445. If multiple threads call writeRecords 
concurrently for the same jobId, these properties could be overwritten by 
another thread between setting them and using them in the writeRecords method. 
Consider either: 1) keeping these as parameters passed through rather than set 
as instance state, or 2) documenting that writeRecords should not be called 
concurrently for the same jobId.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,73 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/stream split based on offset and 
start the reader.
-        SourceSplitBase split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader for job {}",
-                        baseReq.isReload(),
-                        baseReq.getJobId());
-                // build split
-                Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof 
IncrementalSourceStreamFetcher) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        LOG.info("Job {} prepare and submit split with offset: {}", 
baseReq.getJobId(), offsetMeta);
+        // Build split
+        Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+
+        // Create reader based on split type
+        if (this.currentSplit.isSnapshotSplit()) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit.isStreamSplit()) {
+            this.currentReader = getBinlogSplitReader(baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            throw new IllegalStateException(
+                    "Unknown split type: " + 
this.currentSplit.getClass().getName());
         }
 
-        // build response with iterator
-        SplitReadResult result = new SplitReadResult();
-        SourceSplitState currentSplitState = null;
-        SourceSplitBase currentSplit = this.getCurrentSplit();
-        if (currentSplit.isSnapshotSplit()) {
-            currentSplitState = new 
SnapshotSplitState(currentSplit.asSnapshotSplit());
+        // Submit split
+        FetchTask<SourceSplitBase> splitFetchTask =
+                createFetchTaskFromSplit(baseReq, this.currentSplit);
+        this.currentReader.submitTask(splitFetchTask);
+        this.setCurrentFetchTask(splitFetchTask);
+
+        // Create split state
+        SourceSplitState currentSplitState;
+        if (this.currentSplit.isSnapshotSplit()) {
+            currentSplitState = new 
SnapshotSplitState(this.currentSplit.asSnapshotSplit());
         } else {
-            currentSplitState = new 
StreamSplitState(currentSplit.asStreamSplit());
+            currentSplitState = new 
StreamSplitState(this.currentSplit.asStreamSplit());
         }
 
-        Iterator<SourceRecord> filteredIterator =
-                new FilteredRecordIterator(currentSplitRecords, 
currentSplitState);
-
-        result.setRecordIterator(filteredIterator);
+        // Return result without iterator
+        SplitReadResult result = new SplitReadResult();
+        result.setSplit(this.currentSplit);
         result.setSplitState(currentSplitState);
-        result.setSplit(split);
         return result;
     }
 
+    @Override
+    public Iterator<SourceRecord> pollRecords(Object splitState) throws 
InterruptedException {
+        Preconditions.checkState(this.currentReader != null, "currentReader is 
null");
+        Preconditions.checkNotNull(splitState, "splitState is null");
+        Preconditions.checkState(
+                splitState instanceof SourceSplitState,
+                "splitState type is invalid " + splitState.getClass());
+
+        // Poll data from Debezium queue
+        Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+        if (dataIt == null || !dataIt.hasNext()) {
+            return Collections.emptyIterator(); // No data available
+        }
+
+        SourceRecords sourceRecords = dataIt.next();
+        SplitRecords splitRecords =
+                new SplitRecords(this.currentSplit.splitId(), 
sourceRecords.iterator());
+        if (!sourceRecords.getSourceRecordList().isEmpty()) {
+            LOG.info("{} Records received.", 
sourceRecords.getSourceRecordList().size());
+        }
+
+        // Return filtered iterator
+        return new FilteredRecordIterator(splitRecords, (SourceSplitState) 
splitState);
+    }

Review Comment:
   Similar to MySqlSourceReader, if pollRecords is called multiple times on the 
same split, a new FilteredRecordIterator is created each time while reusing the 
underlying currentSplit and currentReader. This could lead to state 
inconsistencies if the iterator is not fully consumed before the next call to 
pollRecords.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -83,67 +87,109 @@ public PipelineCoordinator() {
 
     public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest) 
throws Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(fetchRecordRequest);
-        SplitReadResult readResult = 
sourceReader.readSplitRecords(fetchRecordRequest);
+        SplitReadResult readResult = 
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
         return buildRecordResponse(sourceReader, fetchRecordRequest, 
readResult);
     }
 
-    /** build RecordWithMeta */
+    /**
+     * Build RecordWithMeta response
+     *
+     * <p>This method polls records until: 1. Data is received AND heartbeat 
is received (normal
+     * case) 2. Timeout is reached (with heartbeat wait protection)
+     */
     private RecordWithMeta buildRecordResponse(
             SourceReader sourceReader, FetchRecordRequest fetchRecord, 
SplitReadResult readResult)
             throws Exception {
         RecordWithMeta recordResponse = new RecordWithMeta();
-        SourceSplit split = readResult.getSplit();
-        int count = 0;
         try {
-            // Serialize records and add them to the response (collect from 
iterator)
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        sourceReader.deserialize(fetchRecord.getConfig(), 
element);
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    recordResponse.getRecords().addAll(serializedRecords);
-                    count += serializedRecords.size();
-                    if (sourceReader.isBinlogSplit(split)) {
-                        // put offset for event
-                        Map<String, String> lastMeta =
-                                
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                        lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                        recordResponse.setMeta(lastMeta);
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
+            long startTime = System.currentTimeMillis();
+            boolean shouldStop = false;
+            boolean hasReceivedData = false;
+            boolean lastMessageIsHeartbeat = false;
+            int heartbeatCount = 0;
+            int recordCount = 0;
+            LOG.info(
+                    "Start fetching records for jobId={}, isSnapshotSplit={}",
+                    fetchRecord.getJobId(),
+                    isSnapshotSplit);
+
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
+
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);

Review Comment:
   When an InterruptedException occurs during Thread.sleep(100), the exception 
is thrown but the thread's interrupted status is not preserved. This violates 
the standard practice for handling InterruptedException. Consider either 
catching and re-interrupting the thread, or propagating the 
InterruptedException properly. For example: catch (InterruptedException e) { 
Thread.currentThread().interrupt(); throw e; }
   ```suggestion
                       try {
                           Thread.sleep(100);
                       } catch (InterruptedException e) {
                           Thread.currentThread().interrupt();
                           throw e;
                       }
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,64 +168,62 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
         LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        // build split
+        Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, 
baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+        if (this.currentSplit instanceof MySqlSnapshotSplit) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+            this.currentReader = getBinlogSplitReader(baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            throw new IllegalStateException(
+                    "Unsupported MySqlSplit type: " + 
this.currentSplit.getClass().getName());
         }
 
-        // build response with iterator
-        SplitReadResult result = new SplitReadResult();
+        this.currentReader.submitSplit(this.currentSplit);
         MySqlSplitState currentSplitState = null;
-        MySqlSplit currentSplit = this.getCurrentSplit();
-        if (currentSplit.isSnapshotSplit()) {
-            currentSplitState = new 
MySqlSnapshotSplitState(currentSplit.asSnapshotSplit());
+        if (this.currentSplit.isSnapshotSplit()) {
+            currentSplitState = new 
MySqlSnapshotSplitState(this.currentSplit.asSnapshotSplit());
         } else {
-            currentSplitState = new 
MySqlBinlogSplitState(currentSplit.asBinlogSplit());
+            currentSplitState = new 
MySqlBinlogSplitState(this.currentSplit.asBinlogSplit());
         }
-
-        Iterator<SourceRecord> filteredIterator =
-                new FilteredRecordIterator(currentSplitRecords, 
currentSplitState);
-
-        result.setRecordIterator(filteredIterator);
+        SplitReadResult result = new SplitReadResult();
+        result.setSplit(this.currentSplit);
         result.setSplitState(currentSplitState);
-        result.setSplit(split);
         return result;
     }
 
+    @Override
+    public Iterator<SourceRecord> pollRecords(Object splitState) throws 
InterruptedException {
+        Preconditions.checkState(this.currentReader != null, "currentReader is 
null");
+        Preconditions.checkNotNull(splitState, "splitState is null");
+        Preconditions.checkState(
+                splitState instanceof MySqlSplitState,
+                "splitState type is invalid " + splitState.getClass());
+
+        // Poll data from Debezium queue
+        Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+        if (dataIt == null || !dataIt.hasNext()) {
+            return Collections.emptyIterator(); // No data available
+        }
+
+        SourceRecords sourceRecords = dataIt.next();
+        SplitRecords splitRecords =
+                new SplitRecords(this.currentSplit.splitId(), 
sourceRecords.iterator());
+        if (!sourceRecords.getSourceRecordList().isEmpty()) {
+            LOG.info("{} Records received", 
sourceRecords.getSourceRecordList().size());
+        }
+
+        return new FilteredRecordIterator(splitRecords, (MySqlSplitState) 
splitState);
+    }

Review Comment:
   The SourceReader instance is shared per jobId (from Env.getReader), and the 
currentReader/currentSplit fields are instance variables. If 
prepareAndSubmitSplit or pollRecords are called concurrently from multiple 
threads for the same job (e.g., via the executor in PipelineCoordinator), there 
could be race conditions where one thread's split/reader overwrites another's. 
Consider adding synchronization to prepareAndSubmitSplit and pollRecords 
methods, or document that these methods must not be called concurrently for the 
same SourceReader instance.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,73 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/stream split based on offset and 
start the reader.
-        SourceSplitBase split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader for job {}",
-                        baseReq.isReload(),
-                        baseReq.getJobId());
-                // build split
-                Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof 
IncrementalSourceStreamFetcher) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        LOG.info("Job {} prepare and submit split with offset: {}", 
baseReq.getJobId(), offsetMeta);
+        // Build split
+        Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+
+        // Create reader based on split type
+        if (this.currentSplit.isSnapshotSplit()) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit.isStreamSplit()) {
+            this.currentReader = getBinlogSplitReader(baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            throw new IllegalStateException(
+                    "Unknown split type: " + 
this.currentSplit.getClass().getName());
         }
 
-        // build response with iterator
-        SplitReadResult result = new SplitReadResult();
-        SourceSplitState currentSplitState = null;
-        SourceSplitBase currentSplit = this.getCurrentSplit();
-        if (currentSplit.isSnapshotSplit()) {
-            currentSplitState = new 
SnapshotSplitState(currentSplit.asSnapshotSplit());
+        // Submit split
+        FetchTask<SourceSplitBase> splitFetchTask =
+                createFetchTaskFromSplit(baseReq, this.currentSplit);
+        this.currentReader.submitTask(splitFetchTask);
+        this.setCurrentFetchTask(splitFetchTask);
+
+        // Create split state
+        SourceSplitState currentSplitState;
+        if (this.currentSplit.isSnapshotSplit()) {
+            currentSplitState = new 
SnapshotSplitState(this.currentSplit.asSnapshotSplit());
         } else {
-            currentSplitState = new 
StreamSplitState(currentSplit.asStreamSplit());
+            currentSplitState = new 
StreamSplitState(this.currentSplit.asStreamSplit());
         }
 
-        Iterator<SourceRecord> filteredIterator =
-                new FilteredRecordIterator(currentSplitRecords, 
currentSplitState);
-
-        result.setRecordIterator(filteredIterator);
+        // Return result without iterator
+        SplitReadResult result = new SplitReadResult();
+        result.setSplit(this.currentSplit);
         result.setSplitState(currentSplitState);
-        result.setSplit(split);
         return result;
     }
 
+    @Override
+    public Iterator<SourceRecord> pollRecords(Object splitState) throws 
InterruptedException {
+        Preconditions.checkState(this.currentReader != null, "currentReader is 
null");
+        Preconditions.checkNotNull(splitState, "splitState is null");
+        Preconditions.checkState(
+                splitState instanceof SourceSplitState,
+                "splitState type is invalid " + splitState.getClass());
+
+        // Poll data from Debezium queue
+        Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+        if (dataIt == null || !dataIt.hasNext()) {
+            return Collections.emptyIterator(); // No data available
+        }
+
+        SourceRecords sourceRecords = dataIt.next();
+        SplitRecords splitRecords =
+                new SplitRecords(this.currentSplit.splitId(), 
sourceRecords.iterator());
+        if (!sourceRecords.getSourceRecordList().isEmpty()) {
+            LOG.info("{} Records received.", 
sourceRecords.getSourceRecordList().size());
+        }
+
+        // Return filtered iterator
+        return new FilteredRecordIterator(splitRecords, (SourceSplitState) 
splitState);
+    }

Review Comment:
   Similar to MySqlSourceReader, the SourceReader instance is shared per jobId, 
and currentReader/currentSplit are instance variables. Concurrent calls to 
prepareAndSubmitSplit or pollRecords for the same job could cause race 
conditions. Consider adding synchronization.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,73 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/stream split based on offset and 
start the reader.
-        SourceSplitBase split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader for job {}",
-                        baseReq.isReload(),
-                        baseReq.getJobId());
-                // build split
-                Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof 
IncrementalSourceStreamFetcher) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        LOG.info("Job {} prepare and submit split with offset: {}", 
baseReq.getJobId(), offsetMeta);
+        // Build split
+        Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+
+        // Create reader based on split type
+        if (this.currentSplit.isSnapshotSplit()) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit.isStreamSplit()) {
+            this.currentReader = getBinlogSplitReader(baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            throw new IllegalStateException(
+                    "Unknown split type: " + 
this.currentSplit.getClass().getName());
         }

Review Comment:
   Similar to MySqlSourceReader, a new reader is created every time 
prepareAndSubmitSplit is called (lines 163 or 165), but the old currentReader 
is not closed before being replaced. This could lead to resource leaks. 
Consider closing the existing currentReader before creating a new one.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information.
+     *
+     * <p>Snapshot split: Returns immediately after reading; otherwise, 
returns after the
+     * maxInterval.
+     *
+     * <p>Binlog split: Fetches data at the maxInterval. Returns immediately 
if no data is found; if
+     * found, checks if the last record is a heartbeat record. If it is, 
returns immediately;
+     * otherwise, fetches again until the heartbeat deadline.
+     *
+     * <p>Heartbeat events will carry the latest offset.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
+            boolean lastMessageIsHeartbeat = false;
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId(),
+                    isSnapshotSplit);
+
+            // 2. poll record
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+
+                    // Check if should stop
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    boolean timeoutReached =
+                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            scannedRows > 0,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            maxIntervalMillis,
+                            timeoutReached)) {
+                        break;
                     }
+                    continue;
                 }
-                // Check if maxInterval has been exceeded
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
-                    break;
+
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+
+                        // Mark last message as heartbeat (only for binlog 
split)
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
+
+                        // If already timeout, stop immediately when heartbeat 
received
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                        if (!isSnapshotSplit && timeoutReached) {
+                            LOG.info(
+                                    "Binlog split max interval reached and 
heartbeat received, stopping data reading");
+                            shouldStop = true;
+                            break;
+                        }
+                        // Skip heartbeat messages during normal processing
+                        continue;
+                    }
+
+                    // Process data messages
+                    List<String> serializedRecords =
+                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                        String database = writeRecordRequest.getTargetDb();
+                        String table = extractTable(element);
+                        for (String record : serializedRecords) {
+                            scannedRows++;
+                            byte[] dataBytes = record.getBytes();
+                            scannedBytes += dataBytes.length;
+                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                        }
+                        // Mark last message as data (not heartbeat)
+                        lastMessageIsHeartbeat = false;
+                    }
                 }
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    scannedRows,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+
         } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
+            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+        }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
+        // 3. Extract offset from split state
+        metaResponse = extractOffsetMeta(sourceReader, readResult);
+        // 4. wait all stream load finish
+        batchStreamLoad.forceFlush();
+
+        // 5. request fe api update offset
+        String currentTaskId = batchStreamLoad.getCurrentTaskId();
+        // The offset must be reset before commitOffset to prevent the next 
taskId from being create
+        // by the fe.
+        batchStreamLoad.resetTaskId();
+        batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows, 
scannedBytes);
+    }
+
+    public static boolean isHeartbeatEvent(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null
+                && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+    }
+
+    /**
+     * Determine if we should stop polling.
+     *
+     * @param isSnapshotSplit whether this is a snapshot split
+     * @param hasData whether we have received any data
+     * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+     * @param elapsedTime total elapsed time in milliseconds
+     * @param maxIntervalMillis max interval in milliseconds
+     * @param timeoutReached whether timeout is reached
+     * @return true if should stop, false if should continue
+     */
+    private boolean shouldStop(
+            boolean isSnapshotSplit,
+            boolean hasData,
+            boolean lastMessageIsHeartbeat,
+            long elapsedTime,
+            long maxIntervalMillis,
+            boolean timeoutReached) {
+
+        // 1. Snapshot split with data: if no more data in queue, stop 
immediately (no need to wait
+        // for timeout)
+        // snapshot split will be written to the debezium queue all at once.
+        if (isSnapshotSplit && hasData) {
+            LOG.info(
+                    "Snapshot split finished, no more data available. Total 
elapsed: {} ms",
+                    elapsedTime);
+            return true;
         }
-        // get offset from split state
-        try {
-            if (readResult.getSplitState() != null) {
-                // Set meta information for hw
-                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                    metaResponse = offsetRes;
-                }
 
-                // set meta for binlog event
-                if (sourceReader.isBinlogSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                    metaResponse = offsetRes;
-                }
-            } else {
-                throw new RuntimeException("split state is null");
-            }
+        // 2. Not timeout yet: continue waiting
+        if (!timeoutReached) {
+            return false;
+        }
 
-            // wait all stream load finish
-            batchStreamLoad.forceFlush();
+        // === Below are checks after timeout is reached ===
 
-            // request fe api
-            batchStreamLoad.commitOffset(metaResponse, scannedRows, 
scannedBytes);
-        } finally {
-            batchStreamLoad.resetTaskId();
+        // 3. No data received after timeout: stop
+        if (!hasData) {
+            LOG.info("No data received after timeout, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 4. Snapshot split after timeout (should not reach here, but keep as 
safety check)
+        if (isSnapshotSplit) {
+            LOG.info("Snapshot split timeout reached, stopping. Elapsed: {} 
ms", elapsedTime);
+            return true;
+        }
+
+        // 5. Binlog split + last message is heartbeat: stop immediately
+        if (lastMessageIsHeartbeat) {
+            LOG.info("Binlog split timeout and last message is heartbeat, 
stopping");
+            return true;
+        }
+
+        // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout 
protection
+        if (elapsedTime > maxIntervalMillis + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+            LOG.warn(
+                    "Binlog split heartbeat wait timeout after {} ms, force 
stopping. Total elapsed: {} ms",
+                    elapsedTime - maxIntervalMillis,
+                    elapsedTime);
+            return true;

Review Comment:
   The heartbeat wait protection at line 418 uses "maxIntervalMillis + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3" as the maximum wait time. With 
the new heartbeat interval of 3000ms (3 seconds), this gives 9 extra seconds 
beyond the timeout. However, this magic number "3" is not documented. Consider 
adding a constant or comment explaining why 3 heartbeat intervals are chosen as 
the maximum wait time for the heartbeat protection.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -83,67 +87,109 @@ public PipelineCoordinator() {
 
     public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest) 
throws Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(fetchRecordRequest);
-        SplitReadResult readResult = 
sourceReader.readSplitRecords(fetchRecordRequest);
+        SplitReadResult readResult = 
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
         return buildRecordResponse(sourceReader, fetchRecordRequest, 
readResult);
     }
 
-    /** build RecordWithMeta */
+    /**
+     * Build RecordWithMeta response
+     *
+     * <p>This method polls records until: 1. Data is received AND heartbeat 
is received (normal
+     * case) 2. Timeout is reached (with heartbeat wait protection)
+     */
     private RecordWithMeta buildRecordResponse(
             SourceReader sourceReader, FetchRecordRequest fetchRecord, 
SplitReadResult readResult)
             throws Exception {
         RecordWithMeta recordResponse = new RecordWithMeta();
-        SourceSplit split = readResult.getSplit();
-        int count = 0;
         try {
-            // Serialize records and add them to the response (collect from 
iterator)
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        sourceReader.deserialize(fetchRecord.getConfig(), 
element);
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    recordResponse.getRecords().addAll(serializedRecords);
-                    count += serializedRecords.size();
-                    if (sourceReader.isBinlogSplit(split)) {
-                        // put offset for event
-                        Map<String, String> lastMeta =
-                                
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                        lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                        recordResponse.setMeta(lastMeta);
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
+            long startTime = System.currentTimeMillis();
+            boolean shouldStop = false;
+            boolean hasReceivedData = false;
+            boolean lastMessageIsHeartbeat = false;
+            int heartbeatCount = 0;
+            int recordCount = 0;
+            LOG.info(
+                    "Start fetching records for jobId={}, isSnapshotSplit={}",
+                    fetchRecord.getJobId(),
+                    isSnapshotSplit);
+
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
+
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+
+                    // Check if should stop
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    boolean timeoutReached = elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            hasReceivedData,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            Constants.POLL_SPLIT_RECORDS_TIMEOUTS,
+                            timeoutReached)) {
+                        break;
                     }
+                    continue;
                 }
-            }
-        } finally {
-            // The LSN in the commit is the current offset, which is the 
offset from the last
-            // successful write.
-            // Therefore, even if a subsequent write fails, it will not affect 
the commit.
-            sourceReader.commitSourceOffset(fetchRecord.getJobId(), 
readResult.getSplit());
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            sourceReader.finishSplitRecords();
-        }
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
 
-        if (readResult.getSplitState() != null) {
-            // Set meta information for hw
-            if (sourceReader.isSnapshotSplit(split)) {
-                Map<String, String> offsetRes =
-                        
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                offsetRes.put(SPLIT_ID, split.splitId());
-                recordResponse.setMeta(offsetRes);
-            }
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+
+                        // Mark last message as heartbeat
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
 
-            // set meta for binlog event
-            if (sourceReader.isBinlogSplit(split)) {
-                Map<String, String> offsetRes =
-                        
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                recordResponse.setMeta(offsetRes);
+                        // If already have data or timeout, stop when 
heartbeat received
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+
+                        if (hasReceivedData || timeoutReached) {
+                            LOG.info(
+                                    "Heartbeat received after {} data records, 
stopping",
+                                    recordResponse.getRecords().size());
+                            shouldStop = true;
+                            break;
+                        }

Review Comment:
   In the fetchRecords flow, if hasReceivedData is true (data was received) or 
timeoutReached is true, the loop stops immediately when a heartbeat is 
received. However, the condition at line 157 uses OR logic: "if 
(hasReceivedData || timeoutReached)". This means that even if we have received 
data but timeout hasn't been reached yet, we'll stop on the first heartbeat. 
This could result in returning prematurely when more data is still available in 
the queue. Consider using AND logic: "if (hasReceivedData && timeoutReached)" 
to only stop when both conditions are met.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SplitReadResult.java:
##########
@@ -18,16 +18,12 @@
 package org.apache.doris.cdcclient.source.reader;
 
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.kafka.connect.source.SourceRecord;
-
-import java.util.Iterator;
 
 import lombok.Data;
 
 /** The result of reading a split with iterator. */

Review Comment:
   The class comment still says "The result of reading a split with iterator" 
but the recordIterator field has been removed as part of this refactoring. The 
comment should be updated to reflect that this class now only contains the 
split and splitState, and that iteration is now handled separately via the 
pollRecords method.
   ```suggestion
   /**
    * Container for a source split and its associated state.
    * Iteration over records for this split is handled separately (for example 
via pollRecords).
    */
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information.
+     *
+     * <p>Snapshot split: Returns immediately after reading; otherwise, 
returns after the
+     * maxInterval.
+     *
+     * <p>Binlog split: Fetches data at the maxInterval. Returns immediately 
if no data is found; if
+     * found, checks if the last record is a heartbeat record. If it is, 
returns immediately;
+     * otherwise, fetches again until the heartbeat deadline.
+     *
+     * <p>Heartbeat events will carry the latest offset.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+            boolean isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
+            boolean lastMessageIsHeartbeat = false;
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId(),
+                    isSnapshotSplit);
+
+            // 2. poll record
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+
+                    // Check if should stop
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    boolean timeoutReached =
+                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            scannedRows > 0,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            maxIntervalMillis,
+                            timeoutReached)) {
+                        break;
                     }
+                    continue;
                 }
-                // Check if maxInterval has been exceeded
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
-                    break;
+
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+
+                        // Mark last message as heartbeat (only for binlog 
split)
+                        if (!isSnapshotSplit) {
+                            lastMessageIsHeartbeat = true;
+                        }
+
+                        // If already timeout, stop immediately when heartbeat 
received
+                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        boolean timeoutReached =
+                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+
+                        if (!isSnapshotSplit && timeoutReached) {
+                            LOG.info(
+                                    "Binlog split max interval reached and 
heartbeat received, stopping data reading");
+                            shouldStop = true;
+                            break;
+                        }
+                        // Skip heartbeat messages during normal processing
+                        continue;
+                    }
+
+                    // Process data messages
+                    List<String> serializedRecords =
+                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                        String database = writeRecordRequest.getTargetDb();
+                        String table = extractTable(element);
+                        for (String record : serializedRecords) {
+                            scannedRows++;
+                            byte[] dataBytes = record.getBytes();
+                            scannedBytes += dataBytes.length;
+                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                        }
+                        // Mark last message as data (not heartbeat)
+                        lastMessageIsHeartbeat = false;
+                    }
                 }
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    scannedRows,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+
         } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
+            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+        }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
+        // 3. Extract offset from split state
+        metaResponse = extractOffsetMeta(sourceReader, readResult);
+        // 4. wait all stream load finish
+        batchStreamLoad.forceFlush();
+
+        // 5. request fe api update offset
+        String currentTaskId = batchStreamLoad.getCurrentTaskId();
+        // The offset must be reset before commitOffset to prevent the next 
taskId from being create
+        // by the fe.
+        batchStreamLoad.resetTaskId();
+        batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows, 
scannedBytes);
+    }
+
+    public static boolean isHeartbeatEvent(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null
+                && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+    }
+
+    /**
+     * Determine if we should stop polling.
+     *
+     * @param isSnapshotSplit whether this is a snapshot split
+     * @param hasData whether we have received any data
+     * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+     * @param elapsedTime total elapsed time in milliseconds
+     * @param maxIntervalMillis max interval in milliseconds
+     * @param timeoutReached whether timeout is reached
+     * @return true if should stop, false if should continue
+     */
+    private boolean shouldStop(
+            boolean isSnapshotSplit,
+            boolean hasData,
+            boolean lastMessageIsHeartbeat,
+            long elapsedTime,
+            long maxIntervalMillis,
+            boolean timeoutReached) {
+
+        // 1. Snapshot split with data: if no more data in queue, stop 
immediately (no need to wait
+        // for timeout)
+        // snapshot split will be written to the debezium queue all at once.
+        if (isSnapshotSplit && hasData) {
+            LOG.info(
+                    "Snapshot split finished, no more data available. Total 
elapsed: {} ms",
+                    elapsedTime);
+            return true;

Review Comment:
   The shouldStop logic for snapshot splits at lines 385-389 returns true when 
"isSnapshotSplit && hasData" without checking if there's more data in the 
iterator. This is based on the assumption that "snapshot split will be written 
to the debezium queue all at once" (line 384). However, the check happens when 
"!recordIterator.hasNext()" (line 121), which means we've already exhausted the 
current batch from the queue. For snapshot splits, this is correct behavior, 
but the comment at line 384 could be misleading if Debezium actually streams 
snapshot data in multiple batches. Consider verifying this assumption or adding 
more defensive checks.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -796,6 +717,8 @@ public boolean hasNext() {
                         BinlogOffset position = 
RecordUtils.getBinlogPosition(element);
                         
splitState.asBinlogSplitState().setStartingOffset(position);
                     }
+                    nextRecord = element;
+                    return true;

Review Comment:
   FilteredRecordIterator now returns heartbeat events (sets nextRecord and 
returns true), but PipelineCoordinator also checks for heartbeat events 
separately. This creates double processing of heartbeat events. The 
FilteredRecordIterator should either filter out heartbeat events completely 
(not return them) or PipelineCoordinator should not do separate heartbeat 
checking. Consider either:
   1. Removing lines 720-721 so heartbeat events are filtered out in the 
iterator (like watermark events), OR
   2. Update PipelineCoordinator to not check for heartbeat events since the 
iterator already handles offset updates
   ```suggestion
   
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,64 +168,62 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
         LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        // build split
+        Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, 
baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+        if (this.currentSplit instanceof MySqlSnapshotSplit) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+            this.currentReader = getBinlogSplitReader(baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            throw new IllegalStateException(
+                    "Unsupported MySqlSplit type: " + 
this.currentSplit.getClass().getName());
         }

Review Comment:
   In prepareAndSubmitSplit, a new reader is created every time (either 
SnapshotSplitReader or BinlogSplitReader at lines 182 or 184), but the old 
currentReader is not closed before being replaced. This could lead to resource 
leaks (unclosed database connections, threads, etc.). Before creating a new 
reader, you should check if currentReader exists and close it if necessary, 
similar to how the old code called closeCurrentReader().



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,64 +168,62 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
         LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        // build split
+        Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, 
baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+        if (this.currentSplit instanceof MySqlSnapshotSplit) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+            this.currentReader = getBinlogSplitReader(baseReq);
         } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+            throw new IllegalStateException(
+                    "Unsupported MySqlSplit type: " + 
this.currentSplit.getClass().getName());
         }
 
-        // build response with iterator
-        SplitReadResult result = new SplitReadResult();
+        this.currentReader.submitSplit(this.currentSplit);
         MySqlSplitState currentSplitState = null;
-        MySqlSplit currentSplit = this.getCurrentSplit();
-        if (currentSplit.isSnapshotSplit()) {
-            currentSplitState = new 
MySqlSnapshotSplitState(currentSplit.asSnapshotSplit());
+        if (this.currentSplit.isSnapshotSplit()) {
+            currentSplitState = new 
MySqlSnapshotSplitState(this.currentSplit.asSnapshotSplit());
         } else {
-            currentSplitState = new 
MySqlBinlogSplitState(currentSplit.asBinlogSplit());
+            currentSplitState = new 
MySqlBinlogSplitState(this.currentSplit.asBinlogSplit());
         }
-
-        Iterator<SourceRecord> filteredIterator =
-                new FilteredRecordIterator(currentSplitRecords, 
currentSplitState);
-
-        result.setRecordIterator(filteredIterator);
+        SplitReadResult result = new SplitReadResult();
+        result.setSplit(this.currentSplit);
         result.setSplitState(currentSplitState);
-        result.setSplit(split);
         return result;
     }
 
+    @Override
+    public Iterator<SourceRecord> pollRecords(Object splitState) throws 
InterruptedException {
+        Preconditions.checkState(this.currentReader != null, "currentReader is 
null");
+        Preconditions.checkNotNull(splitState, "splitState is null");
+        Preconditions.checkState(
+                splitState instanceof MySqlSplitState,
+                "splitState type is invalid " + splitState.getClass());
+
+        // Poll data from Debezium queue
+        Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+        if (dataIt == null || !dataIt.hasNext()) {
+            return Collections.emptyIterator(); // No data available
+        }
+
+        SourceRecords sourceRecords = dataIt.next();
+        SplitRecords splitRecords =
+                new SplitRecords(this.currentSplit.splitId(), 
sourceRecords.iterator());
+        if (!sourceRecords.getSourceRecordList().isEmpty()) {
+            LOG.info("{} Records received", 
sourceRecords.getSourceRecordList().size());
+        }
+
+        return new FilteredRecordIterator(splitRecords, (MySqlSplitState) 
splitState);
+    }

Review Comment:
   The method signature change removes the recordIterator field from 
SplitReadResult, but the Iterator is still being created and used in 
FilteredRecordIterator. However, there's a logical issue: if pollRecords is 
called multiple times on the same split, a new FilteredRecordIterator is 
created each time, but the underlying currentSplit and currentReader are 
reused. This could lead to state inconsistencies if the iterator is not fully 
consumed before the next call to pollRecords. Consider adding validation to 
ensure the previous iterator was fully consumed or reset the reader state 
appropriately.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -672,6 +594,8 @@ public boolean hasNext() {
                         Offset position = createOffset(element.sourceOffset());
                         
splitState.asStreamSplitState().setStartingOffset(position);
                     }
+                    nextRecord = element;
+                    return true;

Review Comment:
   Similar to MySqlSourceReader, FilteredRecordIterator now returns heartbeat 
events (sets nextRecord and returns true at lines 597-598), but 
PipelineCoordinator also checks for heartbeat events separately. This creates 
double processing. The iterator should either filter out heartbeat events 
completely or PipelineCoordinator should not do separate heartbeat checking.
   ```suggestion
                       // Do not emit heartbeat events as records; only update 
state.
                       continue;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to