Copilot commented on code in PR #60181:
URL: https://github.com/apache/doris/pull/60181#discussion_r2720364429


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +169,56 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
         LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        // build split
+        Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, 
baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+        if (this.currentSplit instanceof MySqlSnapshotSplit) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+            this.currentReader = getBinlogSplitReader(baseReq);
+        }
+        this.currentReader.submitSplit(this.currentSplit);

Review Comment:
   Potential resource leak: if prepareAndSubmitSplit is called on a 
SourceReader that already has a currentReader, the old reader will be 
overwritten without being closed. Since SourceReader instances are reused 
across multiple requests for the same jobId (as seen in Env.getOrCreateReader), 
this could happen if finishSplitRecords fails or if there's an exception before 
cleanup. Consider closing the existing currentReader before creating a new one, 
or add a check to ensure currentReader is null before proceeding.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +215,134 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information. 1. poll
+     * interval record. 2. Try to retrieve the heartbeat message, as it 
returns the latest offset,
+     * preventing the next task from having to skip a large number of records.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
-                    }
-                }
-                // Check if maxInterval has been exceeded
+            // 2. poll record
+            while (!shouldStop) {
                 long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
+                boolean timeoutReached = maxIntervalMillis > 0 && elapsedTime 
>= maxIntervalMillis;
+
+                // Timeout protection: force stop if waiting for heartbeat 
exceeds threshold
+                // After reaching maxInterval, wait at most 
DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3 for
+                // heartbeat
+                if (timeoutReached
+                        && elapsedTime
+                                > maxIntervalMillis
+                                        + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+                    LOG.warn(
+                            "Heartbeat wait timeout after {} ms since timeout 
reached, force stopping. "
+                                    + "Total elapsed: {} ms",
+                            elapsedTime - maxIntervalMillis,
+                            elapsedTime);
                     break;
                 }
-            }
-        } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
-        }
-        // get offset from split state
-        try {
-            if (readResult.getSplitState() != null) {
-                // Set meta information for hw
-                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                    metaResponse = offsetRes;
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
+
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);

Review Comment:
   Using a fixed 100ms sleep in a tight polling loop could lead to high CPU 
usage when waiting for data. The sleep interval should be configurable or use a 
longer delay (e.g., related to DEBEZIUM_POLL_INTERVAL_MS constant). 
Additionally, consider using a blocking queue or proper wait/notify mechanism 
if the underlying Debezium reader supports it to avoid busy-waiting.
   ```suggestion
                       
TimeUnit.MILLISECONDS.sleep(Constants.DEBEZIUM_POLL_INTERVAL_MS);
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java:
##########
@@ -22,7 +22,7 @@ public class Constants {
     public static final long POLL_SPLIT_RECORDS_TIMEOUTS = 15000L;
 
     // Debezium default properties
-    public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L;
+    public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;

Review Comment:
   The heartbeat interval was reduced from 10000ms to 3000ms. While this is a 
reasonable optimization for faster offset updates and connection issue 
detection, it will result in more frequent heartbeat messages and potentially 
increased network traffic and CPU usage. Consider making this value 
configurable through a system property or configuration file, allowing 
administrators to tune it based on their specific workload and network 
characteristics.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -83,67 +87,101 @@ public PipelineCoordinator() {
 
     public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest) 
throws Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(fetchRecordRequest);
-        SplitReadResult readResult = 
sourceReader.readSplitRecords(fetchRecordRequest);
+        SplitReadResult readResult = 
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
         return buildRecordResponse(sourceReader, fetchRecordRequest, 
readResult);
     }
 
-    /** build RecordWithMeta */
+    /**
+     * Build RecordWithMeta response
+     *
+     * <p>This method polls records until: 1. Data is received AND heartbeat 
is received (normal
+     * case) 2. Timeout is reached (with heartbeat wait protection)
+     */
     private RecordWithMeta buildRecordResponse(
             SourceReader sourceReader, FetchRecordRequest fetchRecord, 
SplitReadResult readResult)
             throws Exception {
         RecordWithMeta recordResponse = new RecordWithMeta();
-        SourceSplit split = readResult.getSplit();
-        int count = 0;
         try {
-            // Serialize records and add them to the response (collect from 
iterator)
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        sourceReader.deserialize(fetchRecord.getConfig(), 
element);
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    recordResponse.getRecords().addAll(serializedRecords);
-                    count += serializedRecords.size();
-                    if (sourceReader.isBinlogSplit(split)) {
-                        // put offset for event
-                        Map<String, String> lastMeta =
-                                
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                        lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                        recordResponse.setMeta(lastMeta);
+            long startTime = System.currentTimeMillis();
+            boolean shouldStop = false;
+            boolean hasReceivedData = false;
+            int heartbeatCount = 0;
+            int recordCount = 0;
+
+            while (!shouldStop) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                boolean timeoutReached = elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+
+                // Timeout protection: force stop if waiting for heartbeat 
exceeds threshold
+                // After reaching timeout, wait at most 
DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3 for
+                // heartbeat
+                if (timeoutReached
+                        && elapsedTime
+                                > Constants.POLL_SPLIT_RECORDS_TIMEOUTS
+                                        + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+                    LOG.warn(
+                            "Heartbeat wait timeout after {} ms since timeout 
reached, force stopping. "
+                                    + "Total elapsed: {} ms",
+                            elapsedTime - 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS,
+                            elapsedTime);
+                    break;
+                }
+
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
+
+                if (!recordIterator.hasNext()) {
+                    // If we have data and reached timeout, continue waiting 
for heartbeat
+                    if (hasReceivedData && timeoutReached) {
+                        Thread.sleep(100);
+                        continue;
                     }
+                    // Otherwise, just wait for more data
+                    Thread.sleep(100);

Review Comment:
   Using a fixed 100ms sleep in a tight polling loop could lead to high CPU 
usage when waiting for data. The sleep interval should be configurable or use a 
longer delay (e.g., related to DEBEZIUM_POLL_INTERVAL_MS constant). 
Additionally, consider using a blocking queue or proper wait/notify mechanism 
if the underlying Debezium reader supports it to avoid busy-waiting.
   ```suggestion
                           Thread.sleep(Constants.DEBEZIUM_POLL_INTERVAL_MS);
                           continue;
                       }
                       // Otherwise, just wait for more data
                       Thread.sleep(Constants.DEBEZIUM_POLL_INTERVAL_MS);
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +215,134 @@ public CompletableFuture<Void> 
writeRecordsAsync(WriteRecordRequest writeRecordR
                 executor);
     }
 
-    /** Read data from SourceReader and write it to Doris, while returning 
meta information. */
+    /**
+     * Read data from SourceReader and write it to Doris, while returning meta 
information. 1. poll
+     * interval record. 2. Try to retrieve the heartbeat message, as it 
returns the latest offset,
+     * preventing the next task from having to skip a large number of records.
+     */
     public void writeRecords(WriteRecordRequest writeRecordRequest) throws 
Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         Map<String, String> metaResponse = new HashMap<>();
         long scannedRows = 0L;
         long scannedBytes = 0L;
+        int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
-            readResult = sourceReader.readSplitRecords(writeRecordRequest);
-            batchStreamLoad =
-                    getOrCreateBatchStreamLoad(
-                            writeRecordRequest.getJobId(), 
writeRecordRequest.getTargetDb());
-            batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-            
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
-            batchStreamLoad.setToken(writeRecordRequest.getToken());
-
-            // Record start time for maxInterval check
+            // 1. submit split async
+            readResult = 
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+            batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
             long startTime = System.currentTimeMillis();
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
+            boolean shouldStop = false;
 
-            // Use iterators to read and write.
-            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
-            while (iterator != null && iterator.hasNext()) {
-                SourceRecord element = iterator.next();
-                List<String> serializedRecords =
-                        
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
-                if (!CollectionUtils.isEmpty(serializedRecords)) {
-                    String database = writeRecordRequest.getTargetDb();
-                    String table = extractTable(element);
-                    for (String record : serializedRecords) {
-                        scannedRows++;
-                        byte[] dataBytes = record.getBytes();
-                        scannedBytes += dataBytes.length;
-                        batchStreamLoad.writeRecord(database, table, 
dataBytes);
-                    }
-                }
-                // Check if maxInterval has been exceeded
+            // 2. poll record
+            while (!shouldStop) {
                 long elapsedTime = System.currentTimeMillis() - startTime;
-                if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis) 
{
-                    LOG.info(
-                            "Max interval {} seconds reached, stopping data 
reading",
-                            writeRecordRequest.getMaxInterval());
+                boolean timeoutReached = maxIntervalMillis > 0 && elapsedTime 
>= maxIntervalMillis;
+
+                // Timeout protection: force stop if waiting for heartbeat 
exceeds threshold
+                // After reaching maxInterval, wait at most 
DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3 for
+                // heartbeat
+                if (timeoutReached
+                        && elapsedTime
+                                > maxIntervalMillis
+                                        + 
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+                    LOG.warn(
+                            "Heartbeat wait timeout after {} ms since timeout 
reached, force stopping. "
+                                    + "Total elapsed: {} ms",
+                            elapsedTime - maxIntervalMillis,
+                            elapsedTime);
                     break;
                 }
-            }
-        } finally {
-            if (readResult != null) {
-                // The LSN in the commit is the current offset, which is the 
offset from the last
-                // successful write.
-                // Therefore, even if a subsequent write fails, it will not 
affect the commit.
-                sourceReader.commitSourceOffset(
-                        writeRecordRequest.getJobId(), readResult.getSplit());
-            }
 
-            // This must be called after commitSourceOffset; otherwise,
-            // PG's confirmed lsn will not proceed.
-            // This operation must be performed before 
batchStreamLoad.commitOffset;
-            // otherwise, fe might issue the next task for this job.
-            sourceReader.finishSplitRecords();
-        }
-        // get offset from split state
-        try {
-            if (readResult.getSplitState() != null) {
-                // Set meta information for hw
-                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                    metaResponse = offsetRes;
+                Iterator<SourceRecord> recordIterator =
+                        sourceReader.pollRecords(readResult.getSplitState());
+
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+                    continue;
                 }
 
-                // set meta for binlog event
-                if (sourceReader.isBinlogSplit(readResult.getSplit())) {
-                    Map<String, String> offsetRes =
-                            
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                    offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                    metaResponse = offsetRes;
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+                    // Check if this is a heartbeat message
+                    if (isHeartbeatEvent(element)) {
+                        heartbeatCount++;
+                        if (timeoutReached) {
+                            LOG.info(
+                                    "Max interval reached and heartbeat 
received, stopping data reading");
+                            shouldStop = true;
+                            break;
+                        }
+                        // Skip heartbeat messages during normal processing
+                        continue;
+                    }
+
+                    // Process data messages
+                    List<String> serializedRecords =
+                            
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+                    if (!CollectionUtils.isEmpty(serializedRecords)) {
+                        String database = writeRecordRequest.getTargetDb();
+                        String table = extractTable(element);
+                        for (String record : serializedRecords) {
+                            scannedRows++;
+                            byte[] dataBytes = record.getBytes();
+                            scannedBytes += dataBytes.length;
+                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                        }
+                    }
                 }
-            } else {
-                throw new RuntimeException("split state is null");
             }
+            LOG.info(
+                    "Fetched {} records and {} heartbeats in {} ms for 
jobId={} taskId={}",
+                    scannedRows,
+                    heartbeatCount,
+                    System.currentTimeMillis() - startTime,
+                    writeRecordRequest.getJobId(),
+                    writeRecordRequest.getTaskId());
+
+        } finally {
+            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+        }
 
-            // wait all stream load finish
+        // 3. Extract offset from split state
+        metaResponse = extractOffsetMeta(sourceReader, readResult);
+        try {
+            // 4. wait all stream load finish
             batchStreamLoad.forceFlush();
 
-            // request fe api
+            // 5. request fe api update offset
             batchStreamLoad.commitOffset(metaResponse, scannedRows, 
scannedBytes);
         } finally {
             batchStreamLoad.resetTaskId();
         }
     }
 
-    private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String 
targetDb) {
-        return batchStreamLoadMap.computeIfAbsent(
-                jobId,
-                k -> {
-                    LOG.info("Create DorisBatchStreamLoad for jobId={}", 
jobId);
-                    return new DorisBatchStreamLoad(jobId, targetDb);
-                });
+    public static boolean isHeartbeatEvent(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null
+                && 
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+    }

Review Comment:
   The heartbeat detection method here may be inconsistent with the one used in 
FilteredRecordIterator classes. For MySQL, RecordUtils.isHeartbeatEvent is 
used, while for Postgres/JDBC sources, SourceRecordUtils.isHeartbeatEvent is 
used. This PipelineCoordinator.isHeartbeatEvent implementation manually checks 
the schema name. Consider using the same utility methods (RecordUtils or 
SourceRecordUtils) that are used in the FilteredRecordIterator to ensure 
consistency across the codebase. If the implementations differ, heartbeat 
events may not be detected correctly, leading to infinite waiting or incorrect 
offset management.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +169,56 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
         LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
+        // build split
+        Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, 
baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+        if (this.currentSplit instanceof MySqlSnapshotSplit) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+            this.currentReader = getBinlogSplitReader(baseReq);

Review Comment:
   Missing else clause for invalid split type. If the split is neither a 
snapshot split nor a binlog split, currentReader will not be initialized, 
leading to a NullPointerException on line 187. Add an else clause that throws 
an IllegalStateException with a descriptive message to handle this edge case 
explicitly.
   ```suggestion
               this.currentReader = getBinlogSplitReader(baseReq);
           } else {
               throw new IllegalStateException(
                       "Unsupported MySqlSplit type: " + 
this.currentSplit.getClass().getName());
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,70 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/stream split based on offset and 
start the reader.
-        SourceSplitBase split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader for job {}",
-                        baseReq.isReload(),
-                        baseReq.getJobId());
-                // build split
-                Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof 
IncrementalSourceStreamFetcher) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
-        } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+        LOG.info("Job {} prepare and submit split with offset: {}", 
baseReq.getJobId(), offsetMeta);
+        // Build split
+        Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+
+        // Create reader based on split type
+        if (this.currentSplit.isSnapshotSplit()) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit.isStreamSplit()) {
+            this.currentReader = getBinlogSplitReader(baseReq);
         }

Review Comment:
   Potential resource leak: if prepareAndSubmitSplit is called on a 
SourceReader that already has a currentReader, the old reader will be 
overwritten without being closed. Since SourceReader instances are reused 
across multiple requests for the same jobId (as seen in Env.getOrCreateReader), 
this could happen if finishSplitRecords fails or if there's an exception before 
cleanup. Consider closing the existing currentReader before creating a new one, 
or add a check to ensure currentReader is null before proceeding.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,70 @@ public List<AbstractSourceSplit> 
getSourceSplits(FetchTableSplitsRequest ftsReq)
     }
 
     @Override
-    public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) 
throws Exception {
+    public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq) 
throws Exception {
         Map<String, Object> offsetMeta = baseReq.getMeta();
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/stream split based on offset and 
start the reader.
-        SourceSplitBase split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader for job {}",
-                        baseReq.isReload(),
-                        baseReq.getJobId());
-                // build split
-                Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof 
IncrementalSourceStreamFetcher) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
-        } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
+        LOG.info("Job {} prepare and submit split with offset: {}", 
baseReq.getJobId(), offsetMeta);
+        // Build split
+        Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
+        this.currentSplit = splitFlag.f0;
+        LOG.info("Get a split: {}", this.currentSplit.toString());
+
+        // Create reader based on split type
+        if (this.currentSplit.isSnapshotSplit()) {
+            this.currentReader = getSnapshotSplitReader(baseReq);
+        } else if (this.currentSplit.isStreamSplit()) {
+            this.currentReader = getBinlogSplitReader(baseReq);

Review Comment:
   Missing else clause for invalid split type. If the split is neither a 
snapshot split nor a stream split, currentReader will not be initialized, 
leading to a NullPointerException on line 171. Add an else clause that throws 
an IllegalStateException with a descriptive message to handle this edge case 
explicitly.
   ```suggestion
               this.currentReader = getBinlogSplitReader(baseReq);
           } else {
               throw new IllegalStateException(
                       "Unsupported split type: " + this.currentSplit + ". 
Expected snapshot or stream split.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to