Copilot commented on code in PR #60181:
URL: https://github.com/apache/doris/pull/60181#discussion_r2721092347
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information.
+ *
+ * <p>Snapshot split: Returns immediately after reading; otherwise,
returns after the
+ * maxInterval.
+ *
+ * <p>Binlog split: Fetches data at the maxInterval. Returns immediately
if no data is found; if
+ * found, checks if the last record is a heartbeat record. If it is,
returns immediately;
+ * otherwise, fetches again until the heartbeat deadline.
+ *
+ * <p>Heartbeat events will carry the latest offset.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
+ boolean lastMessageIsHeartbeat = false;
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ isSnapshotSplit);
+
+ // 2. poll record
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
Review Comment:
Similar to line 122 in buildRecordResponse, when an InterruptedException
occurs during Thread.sleep(100), the thread's interrupted status should be
preserved. Consider catching and re-interrupting the thread or propagating the
exception properly.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information.
+ *
+ * <p>Snapshot split: Returns immediately after reading; otherwise,
returns after the
+ * maxInterval.
+ *
+ * <p>Binlog split: Fetches data at the maxInterval. Returns immediately
if no data is found; if
+ * found, checks if the last record is a heartbeat record. If it is,
returns immediately;
+ * otherwise, fetches again until the heartbeat deadline.
+ *
+ * <p>Heartbeat events will carry the latest offset.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
+ boolean lastMessageIsHeartbeat = false;
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ isSnapshotSplit);
+
+ // 2. poll record
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+
+ // Check if should stop
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (shouldStop(
+ isSnapshotSplit,
+ scannedRows > 0,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ maxIntervalMillis,
+ timeoutReached)) {
+ break;
}
+ continue;
}
- // Check if maxInterval has been exceeded
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
- break;
+
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+
+ // Mark last message as heartbeat (only for binlog
split)
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
+
+ // If already timeout, stop immediately when heartbeat
received
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (!isSnapshotSplit && timeoutReached) {
+ LOG.info(
+ "Binlog split max interval reached and
heartbeat received, stopping data reading");
+ shouldStop = true;
+ break;
+ }
+ // Skip heartbeat messages during normal processing
+ continue;
+ }
+
+ // Process data messages
+ List<String> serializedRecords =
+
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+ if (!CollectionUtils.isEmpty(serializedRecords)) {
+ String database = writeRecordRequest.getTargetDb();
+ String table = extractTable(element);
+ for (String record : serializedRecords) {
+ scannedRows++;
+ byte[] dataBytes = record.getBytes();
+ scannedBytes += dataBytes.length;
+ batchStreamLoad.writeRecord(database, table,
dataBytes);
+ }
+ // Mark last message as data (not heartbeat)
+ lastMessageIsHeartbeat = false;
+ }
}
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ scannedRows,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+
} finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
+ // 3. Extract offset from split state
+ metaResponse = extractOffsetMeta(sourceReader, readResult);
+ // 4. wait all stream load finish
+ batchStreamLoad.forceFlush();
+
+ // 5. request fe api update offset
+ String currentTaskId = batchStreamLoad.getCurrentTaskId();
+ // The offset must be reset before commitOffset to prevent the next
taskId from being create
+ // by the fe.
+ batchStreamLoad.resetTaskId();
+ batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows,
scannedBytes);
+ }
+
+ public static boolean isHeartbeatEvent(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null
+ &&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+ }
+
+ /**
+ * Determine if we should stop polling.
+ *
+ * @param isSnapshotSplit whether this is a snapshot split
+ * @param hasData whether we have received any data
+ * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+ * @param elapsedTime total elapsed time in milliseconds
+ * @param maxIntervalMillis max interval in milliseconds
+ * @param timeoutReached whether timeout is reached
+ * @return true if should stop, false if should continue
+ */
+ private boolean shouldStop(
+ boolean isSnapshotSplit,
+ boolean hasData,
+ boolean lastMessageIsHeartbeat,
+ long elapsedTime,
+ long maxIntervalMillis,
+ boolean timeoutReached) {
+
+ // 1. Snapshot split with data: if no more data in queue, stop
immediately (no need to wait
+ // for timeout)
+ // snapshot split will be written to the debezium queue all at once.
+ if (isSnapshotSplit && hasData) {
+ LOG.info(
+ "Snapshot split finished, no more data available. Total
elapsed: {} ms",
+ elapsedTime);
+ return true;
}
- // get offset from split state
- try {
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
- }
- // set meta for binlog event
- if (sourceReader.isBinlogSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- metaResponse = offsetRes;
- }
- } else {
- throw new RuntimeException("split state is null");
- }
+ // 2. Not timeout yet: continue waiting
+ if (!timeoutReached) {
+ return false;
+ }
- // wait all stream load finish
- batchStreamLoad.forceFlush();
+ // === Below are checks after timeout is reached ===
- // request fe api
- batchStreamLoad.commitOffset(metaResponse, scannedRows,
scannedBytes);
- } finally {
- batchStreamLoad.resetTaskId();
+ // 3. No data received after timeout: stop
+ if (!hasData) {
+ LOG.info("No data received after timeout, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 4. Snapshot split after timeout (should not reach here, but keep as
safety check)
+ if (isSnapshotSplit) {
+ LOG.info("Snapshot split timeout reached, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 5. Binlog split + last message is heartbeat: stop immediately
+ if (lastMessageIsHeartbeat) {
+ LOG.info("Binlog split timeout and last message is heartbeat,
stopping");
+ return true;
+ }
+
+ // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout
protection
+ if (elapsedTime > maxIntervalMillis +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+ LOG.warn(
+ "Binlog split heartbeat wait timeout after {} ms, force
stopping. Total elapsed: {} ms",
+ elapsedTime - maxIntervalMillis,
+ elapsedTime);
+ return true;
}
+
+ // Continue waiting for heartbeat
+ return false;
}
- private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String
targetDb) {
- return batchStreamLoadMap.computeIfAbsent(
- jobId,
- k -> {
- LOG.info("Create DorisBatchStreamLoad for jobId={}",
jobId);
- return new DorisBatchStreamLoad(jobId, targetDb);
- });
+ private synchronized DorisBatchStreamLoad getOrCreateBatchStreamLoad(
+ WriteRecordRequest writeRecordRequest) {
+ DorisBatchStreamLoad batchStreamLoad =
+ batchStreamLoadMap.computeIfAbsent(
+ writeRecordRequest.getJobId(),
+ k -> {
+ LOG.info(
+ "Create DorisBatchStreamLoad for jobId={}",
+ writeRecordRequest.getJobId());
+ return new DorisBatchStreamLoad(
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTargetDb());
+ });
+ batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
+
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
+ batchStreamLoad.setToken(writeRecordRequest.getToken());
+ return batchStreamLoad;
Review Comment:
The method getOrCreateBatchStreamLoad has been synchronized, which is good
for thread safety. However, after creating or retrieving the
DorisBatchStreamLoad, the code sets mutable properties (currentTaskId,
frontendAddress, token) on lines 443-445. If multiple threads call writeRecords
concurrently for the same jobId, these properties could be overwritten by
another thread between setting them and using them in the writeRecords method.
Consider either: 1) keeping these as parameters passed through rather than set
as instance state, or 2) documenting that writeRecords should not be called
concurrently for the same jobId.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,73 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
- LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/stream split based on offset and
start the reader.
- SourceSplitBase split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- Fetcher<SourceRecords, SourceSplitBase> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader for job {}",
- baseReq.isReload(),
- baseReq.getJobId());
- // build split
- Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof
IncrementalSourceStreamFetcher) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ LOG.info("Job {} prepare and submit split with offset: {}",
baseReq.getJobId(), offsetMeta);
+ // Build split
+ Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+
+ // Create reader based on split type
+ if (this.currentSplit.isSnapshotSplit()) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit.isStreamSplit()) {
+ this.currentReader = getBinlogSplitReader(baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ throw new IllegalStateException(
+ "Unknown split type: " +
this.currentSplit.getClass().getName());
}
- // build response with iterator
- SplitReadResult result = new SplitReadResult();
- SourceSplitState currentSplitState = null;
- SourceSplitBase currentSplit = this.getCurrentSplit();
- if (currentSplit.isSnapshotSplit()) {
- currentSplitState = new
SnapshotSplitState(currentSplit.asSnapshotSplit());
+ // Submit split
+ FetchTask<SourceSplitBase> splitFetchTask =
+ createFetchTaskFromSplit(baseReq, this.currentSplit);
+ this.currentReader.submitTask(splitFetchTask);
+ this.setCurrentFetchTask(splitFetchTask);
+
+ // Create split state
+ SourceSplitState currentSplitState;
+ if (this.currentSplit.isSnapshotSplit()) {
+ currentSplitState = new
SnapshotSplitState(this.currentSplit.asSnapshotSplit());
} else {
- currentSplitState = new
StreamSplitState(currentSplit.asStreamSplit());
+ currentSplitState = new
StreamSplitState(this.currentSplit.asStreamSplit());
}
- Iterator<SourceRecord> filteredIterator =
- new FilteredRecordIterator(currentSplitRecords,
currentSplitState);
-
- result.setRecordIterator(filteredIterator);
+ // Return result without iterator
+ SplitReadResult result = new SplitReadResult();
+ result.setSplit(this.currentSplit);
result.setSplitState(currentSplitState);
- result.setSplit(split);
return result;
}
+ @Override
+ public Iterator<SourceRecord> pollRecords(Object splitState) throws
InterruptedException {
+ Preconditions.checkState(this.currentReader != null, "currentReader is
null");
+ Preconditions.checkNotNull(splitState, "splitState is null");
+ Preconditions.checkState(
+ splitState instanceof SourceSplitState,
+ "splitState type is invalid " + splitState.getClass());
+
+ // Poll data from Debezium queue
+ Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+ if (dataIt == null || !dataIt.hasNext()) {
+ return Collections.emptyIterator(); // No data available
+ }
+
+ SourceRecords sourceRecords = dataIt.next();
+ SplitRecords splitRecords =
+ new SplitRecords(this.currentSplit.splitId(),
sourceRecords.iterator());
+ if (!sourceRecords.getSourceRecordList().isEmpty()) {
+ LOG.info("{} Records received.",
sourceRecords.getSourceRecordList().size());
+ }
+
+ // Return filtered iterator
+ return new FilteredRecordIterator(splitRecords, (SourceSplitState)
splitState);
+ }
Review Comment:
Similar to MySqlSourceReader, if pollRecords is called multiple times on the
same split, a new FilteredRecordIterator is created each time while reusing the
underlying currentSplit and currentReader. This could lead to state
inconsistencies if the iterator is not fully consumed before the next call to
pollRecords.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -83,67 +87,109 @@ public PipelineCoordinator() {
public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest)
throws Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(fetchRecordRequest);
- SplitReadResult readResult =
sourceReader.readSplitRecords(fetchRecordRequest);
+ SplitReadResult readResult =
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
return buildRecordResponse(sourceReader, fetchRecordRequest,
readResult);
}
- /** build RecordWithMeta */
+ /**
+ * Build RecordWithMeta response
+ *
+ * <p>This method polls records until: 1. Data is received AND heartbeat
is received (normal
+ * case) 2. Timeout is reached (with heartbeat wait protection)
+ */
private RecordWithMeta buildRecordResponse(
SourceReader sourceReader, FetchRecordRequest fetchRecord,
SplitReadResult readResult)
throws Exception {
RecordWithMeta recordResponse = new RecordWithMeta();
- SourceSplit split = readResult.getSplit();
- int count = 0;
try {
- // Serialize records and add them to the response (collect from
iterator)
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
- sourceReader.deserialize(fetchRecord.getConfig(),
element);
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- recordResponse.getRecords().addAll(serializedRecords);
- count += serializedRecords.size();
- if (sourceReader.isBinlogSplit(split)) {
- // put offset for event
- Map<String, String> lastMeta =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- recordResponse.setMeta(lastMeta);
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
+ long startTime = System.currentTimeMillis();
+ boolean shouldStop = false;
+ boolean hasReceivedData = false;
+ boolean lastMessageIsHeartbeat = false;
+ int heartbeatCount = 0;
+ int recordCount = 0;
+ LOG.info(
+ "Start fetching records for jobId={}, isSnapshotSplit={}",
+ fetchRecord.getJobId(),
+ isSnapshotSplit);
+
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
+
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
Review Comment:
When an InterruptedException occurs during Thread.sleep(100), the exception
is thrown but the thread's interrupted status is not preserved. This violates
the standard practice for handling InterruptedException. Consider either
catching and re-interrupting the thread, or propagating the
InterruptedException properly. For example: catch (InterruptedException e) {
Thread.currentThread().interrupt(); throw e; }
```suggestion
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,64 +168,62 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/binlog split based on offset and
start the reader.
- MySqlSplit split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- DebeziumReader<SourceRecords, MySqlSplit> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
- // build split
- Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // reset binlog reader
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof BinlogSplitReader) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ // build split
+ Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta,
baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+ if (this.currentSplit instanceof MySqlSnapshotSplit) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+ this.currentReader = getBinlogSplitReader(baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ throw new IllegalStateException(
+ "Unsupported MySqlSplit type: " +
this.currentSplit.getClass().getName());
}
- // build response with iterator
- SplitReadResult result = new SplitReadResult();
+ this.currentReader.submitSplit(this.currentSplit);
MySqlSplitState currentSplitState = null;
- MySqlSplit currentSplit = this.getCurrentSplit();
- if (currentSplit.isSnapshotSplit()) {
- currentSplitState = new
MySqlSnapshotSplitState(currentSplit.asSnapshotSplit());
+ if (this.currentSplit.isSnapshotSplit()) {
+ currentSplitState = new
MySqlSnapshotSplitState(this.currentSplit.asSnapshotSplit());
} else {
- currentSplitState = new
MySqlBinlogSplitState(currentSplit.asBinlogSplit());
+ currentSplitState = new
MySqlBinlogSplitState(this.currentSplit.asBinlogSplit());
}
-
- Iterator<SourceRecord> filteredIterator =
- new FilteredRecordIterator(currentSplitRecords,
currentSplitState);
-
- result.setRecordIterator(filteredIterator);
+ SplitReadResult result = new SplitReadResult();
+ result.setSplit(this.currentSplit);
result.setSplitState(currentSplitState);
- result.setSplit(split);
return result;
}
+ @Override
+ public Iterator<SourceRecord> pollRecords(Object splitState) throws
InterruptedException {
+ Preconditions.checkState(this.currentReader != null, "currentReader is
null");
+ Preconditions.checkNotNull(splitState, "splitState is null");
+ Preconditions.checkState(
+ splitState instanceof MySqlSplitState,
+ "splitState type is invalid " + splitState.getClass());
+
+ // Poll data from Debezium queue
+ Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+ if (dataIt == null || !dataIt.hasNext()) {
+ return Collections.emptyIterator(); // No data available
+ }
+
+ SourceRecords sourceRecords = dataIt.next();
+ SplitRecords splitRecords =
+ new SplitRecords(this.currentSplit.splitId(),
sourceRecords.iterator());
+ if (!sourceRecords.getSourceRecordList().isEmpty()) {
+ LOG.info("{} Records received",
sourceRecords.getSourceRecordList().size());
+ }
+
+ return new FilteredRecordIterator(splitRecords, (MySqlSplitState)
splitState);
+ }
Review Comment:
The SourceReader instance is shared per jobId (from Env.getReader), and the
currentReader/currentSplit fields are instance variables. If
prepareAndSubmitSplit or pollRecords are called concurrently from multiple
threads for the same job (e.g., via the executor in PipelineCoordinator), there
could be race conditions where one thread's split/reader overwrites another's.
Consider adding synchronization to prepareAndSubmitSplit and pollRecords
methods, or document that these methods must not be called concurrently for the
same SourceReader instance.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,73 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
- LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/stream split based on offset and
start the reader.
- SourceSplitBase split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- Fetcher<SourceRecords, SourceSplitBase> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader for job {}",
- baseReq.isReload(),
- baseReq.getJobId());
- // build split
- Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof
IncrementalSourceStreamFetcher) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ LOG.info("Job {} prepare and submit split with offset: {}",
baseReq.getJobId(), offsetMeta);
+ // Build split
+ Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+
+ // Create reader based on split type
+ if (this.currentSplit.isSnapshotSplit()) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit.isStreamSplit()) {
+ this.currentReader = getBinlogSplitReader(baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ throw new IllegalStateException(
+ "Unknown split type: " +
this.currentSplit.getClass().getName());
}
- // build response with iterator
- SplitReadResult result = new SplitReadResult();
- SourceSplitState currentSplitState = null;
- SourceSplitBase currentSplit = this.getCurrentSplit();
- if (currentSplit.isSnapshotSplit()) {
- currentSplitState = new
SnapshotSplitState(currentSplit.asSnapshotSplit());
+ // Submit split
+ FetchTask<SourceSplitBase> splitFetchTask =
+ createFetchTaskFromSplit(baseReq, this.currentSplit);
+ this.currentReader.submitTask(splitFetchTask);
+ this.setCurrentFetchTask(splitFetchTask);
+
+ // Create split state
+ SourceSplitState currentSplitState;
+ if (this.currentSplit.isSnapshotSplit()) {
+ currentSplitState = new
SnapshotSplitState(this.currentSplit.asSnapshotSplit());
} else {
- currentSplitState = new
StreamSplitState(currentSplit.asStreamSplit());
+ currentSplitState = new
StreamSplitState(this.currentSplit.asStreamSplit());
}
- Iterator<SourceRecord> filteredIterator =
- new FilteredRecordIterator(currentSplitRecords,
currentSplitState);
-
- result.setRecordIterator(filteredIterator);
+ // Return result without iterator
+ SplitReadResult result = new SplitReadResult();
+ result.setSplit(this.currentSplit);
result.setSplitState(currentSplitState);
- result.setSplit(split);
return result;
}
+ @Override
+ public Iterator<SourceRecord> pollRecords(Object splitState) throws
InterruptedException {
+ Preconditions.checkState(this.currentReader != null, "currentReader is
null");
+ Preconditions.checkNotNull(splitState, "splitState is null");
+ Preconditions.checkState(
+ splitState instanceof SourceSplitState,
+ "splitState type is invalid " + splitState.getClass());
+
+ // Poll data from Debezium queue
+ Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+ if (dataIt == null || !dataIt.hasNext()) {
+ return Collections.emptyIterator(); // No data available
+ }
+
+ SourceRecords sourceRecords = dataIt.next();
+ SplitRecords splitRecords =
+ new SplitRecords(this.currentSplit.splitId(),
sourceRecords.iterator());
+ if (!sourceRecords.getSourceRecordList().isEmpty()) {
+ LOG.info("{} Records received.",
sourceRecords.getSourceRecordList().size());
+ }
+
+ // Return filtered iterator
+ return new FilteredRecordIterator(splitRecords, (SourceSplitState)
splitState);
+ }
Review Comment:
Similar to MySqlSourceReader, the SourceReader instance is shared per jobId,
and currentReader/currentSplit are instance variables. Concurrent calls to
prepareAndSubmitSplit or pollRecords for the same job could cause race
conditions. Consider adding synchronization.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,73 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
- LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/stream split based on offset and
start the reader.
- SourceSplitBase split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- Fetcher<SourceRecords, SourceSplitBase> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader for job {}",
- baseReq.isReload(),
- baseReq.getJobId());
- // build split
- Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof
IncrementalSourceStreamFetcher) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ LOG.info("Job {} prepare and submit split with offset: {}",
baseReq.getJobId(), offsetMeta);
+ // Build split
+ Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+
+ // Create reader based on split type
+ if (this.currentSplit.isSnapshotSplit()) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit.isStreamSplit()) {
+ this.currentReader = getBinlogSplitReader(baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ throw new IllegalStateException(
+ "Unknown split type: " +
this.currentSplit.getClass().getName());
}
Review Comment:
Similar to MySqlSourceReader, a new reader is created every time
prepareAndSubmitSplit is called (lines 163 or 165), but the old currentReader
is not closed before being replaced. This could lead to resource leaks.
Consider closing the existing currentReader before creating a new one.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information.
+ *
+ * <p>Snapshot split: Returns immediately after reading; otherwise,
returns after the
+ * maxInterval.
+ *
+ * <p>Binlog split: Fetches data at the maxInterval. Returns immediately
if no data is found; if
+ * found, checks if the last record is a heartbeat record. If it is,
returns immediately;
+ * otherwise, fetches again until the heartbeat deadline.
+ *
+ * <p>Heartbeat events will carry the latest offset.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
+ boolean lastMessageIsHeartbeat = false;
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ isSnapshotSplit);
+
+ // 2. poll record
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+
+ // Check if should stop
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (shouldStop(
+ isSnapshotSplit,
+ scannedRows > 0,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ maxIntervalMillis,
+ timeoutReached)) {
+ break;
}
+ continue;
}
- // Check if maxInterval has been exceeded
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
- break;
+
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+
+ // Mark last message as heartbeat (only for binlog
split)
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
+
+ // If already timeout, stop immediately when heartbeat
received
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (!isSnapshotSplit && timeoutReached) {
+ LOG.info(
+ "Binlog split max interval reached and
heartbeat received, stopping data reading");
+ shouldStop = true;
+ break;
+ }
+ // Skip heartbeat messages during normal processing
+ continue;
+ }
+
+ // Process data messages
+ List<String> serializedRecords =
+
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+ if (!CollectionUtils.isEmpty(serializedRecords)) {
+ String database = writeRecordRequest.getTargetDb();
+ String table = extractTable(element);
+ for (String record : serializedRecords) {
+ scannedRows++;
+ byte[] dataBytes = record.getBytes();
+ scannedBytes += dataBytes.length;
+ batchStreamLoad.writeRecord(database, table,
dataBytes);
+ }
+ // Mark last message as data (not heartbeat)
+ lastMessageIsHeartbeat = false;
+ }
}
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ scannedRows,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+
} finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
+ // 3. Extract offset from split state
+ metaResponse = extractOffsetMeta(sourceReader, readResult);
+ // 4. wait all stream load finish
+ batchStreamLoad.forceFlush();
+
+ // 5. request fe api update offset
+ String currentTaskId = batchStreamLoad.getCurrentTaskId();
+ // The offset must be reset before commitOffset to prevent the next
taskId from being create
+ // by the fe.
+ batchStreamLoad.resetTaskId();
+ batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows,
scannedBytes);
+ }
+
+ public static boolean isHeartbeatEvent(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null
+ &&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+ }
+
+ /**
+ * Determine if we should stop polling.
+ *
+ * @param isSnapshotSplit whether this is a snapshot split
+ * @param hasData whether we have received any data
+ * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+ * @param elapsedTime total elapsed time in milliseconds
+ * @param maxIntervalMillis max interval in milliseconds
+ * @param timeoutReached whether timeout is reached
+ * @return true if should stop, false if should continue
+ */
+ private boolean shouldStop(
+ boolean isSnapshotSplit,
+ boolean hasData,
+ boolean lastMessageIsHeartbeat,
+ long elapsedTime,
+ long maxIntervalMillis,
+ boolean timeoutReached) {
+
+ // 1. Snapshot split with data: if no more data in queue, stop
immediately (no need to wait
+ // for timeout)
+ // snapshot split will be written to the debezium queue all at once.
+ if (isSnapshotSplit && hasData) {
+ LOG.info(
+ "Snapshot split finished, no more data available. Total
elapsed: {} ms",
+ elapsedTime);
+ return true;
}
- // get offset from split state
- try {
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
- }
- // set meta for binlog event
- if (sourceReader.isBinlogSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- metaResponse = offsetRes;
- }
- } else {
- throw new RuntimeException("split state is null");
- }
+ // 2. Not timeout yet: continue waiting
+ if (!timeoutReached) {
+ return false;
+ }
- // wait all stream load finish
- batchStreamLoad.forceFlush();
+ // === Below are checks after timeout is reached ===
- // request fe api
- batchStreamLoad.commitOffset(metaResponse, scannedRows,
scannedBytes);
- } finally {
- batchStreamLoad.resetTaskId();
+ // 3. No data received after timeout: stop
+ if (!hasData) {
+ LOG.info("No data received after timeout, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 4. Snapshot split after timeout (should not reach here, but keep as
safety check)
+ if (isSnapshotSplit) {
+ LOG.info("Snapshot split timeout reached, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 5. Binlog split + last message is heartbeat: stop immediately
+ if (lastMessageIsHeartbeat) {
+ LOG.info("Binlog split timeout and last message is heartbeat,
stopping");
+ return true;
+ }
+
+ // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout
protection
+ if (elapsedTime > maxIntervalMillis +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+ LOG.warn(
+ "Binlog split heartbeat wait timeout after {} ms, force
stopping. Total elapsed: {} ms",
+ elapsedTime - maxIntervalMillis,
+ elapsedTime);
+ return true;
Review Comment:
The heartbeat wait protection at line 418 uses "maxIntervalMillis +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3" as the maximum wait time. With
the new heartbeat interval of 3000ms (3 seconds), this gives 9 extra seconds
beyond the timeout. However, this magic number "3" is not documented. Consider
adding a constant or comment explaining why 3 heartbeat intervals are chosen as
the maximum wait time for the heartbeat protection.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -83,67 +87,109 @@ public PipelineCoordinator() {
public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest)
throws Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(fetchRecordRequest);
- SplitReadResult readResult =
sourceReader.readSplitRecords(fetchRecordRequest);
+ SplitReadResult readResult =
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
return buildRecordResponse(sourceReader, fetchRecordRequest,
readResult);
}
- /** build RecordWithMeta */
+ /**
+ * Build RecordWithMeta response
+ *
+ * <p>This method polls records until: 1. Data is received AND heartbeat
is received (normal
+ * case) 2. Timeout is reached (with heartbeat wait protection)
+ */
private RecordWithMeta buildRecordResponse(
SourceReader sourceReader, FetchRecordRequest fetchRecord,
SplitReadResult readResult)
throws Exception {
RecordWithMeta recordResponse = new RecordWithMeta();
- SourceSplit split = readResult.getSplit();
- int count = 0;
try {
- // Serialize records and add them to the response (collect from
iterator)
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
- sourceReader.deserialize(fetchRecord.getConfig(),
element);
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- recordResponse.getRecords().addAll(serializedRecords);
- count += serializedRecords.size();
- if (sourceReader.isBinlogSplit(split)) {
- // put offset for event
- Map<String, String> lastMeta =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- recordResponse.setMeta(lastMeta);
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
+ long startTime = System.currentTimeMillis();
+ boolean shouldStop = false;
+ boolean hasReceivedData = false;
+ boolean lastMessageIsHeartbeat = false;
+ int heartbeatCount = 0;
+ int recordCount = 0;
+ LOG.info(
+ "Start fetching records for jobId={}, isSnapshotSplit={}",
+ fetchRecord.getJobId(),
+ isSnapshotSplit);
+
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
+
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+
+ // Check if should stop
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached = elapsedTime >
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+
+ if (shouldStop(
+ isSnapshotSplit,
+ hasReceivedData,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ Constants.POLL_SPLIT_RECORDS_TIMEOUTS,
+ timeoutReached)) {
+ break;
}
+ continue;
}
- }
- } finally {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not affect
the commit.
- sourceReader.commitSourceOffset(fetchRecord.getJobId(),
readResult.getSplit());
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- sourceReader.finishSplitRecords();
- }
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(split)) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, split.splitId());
- recordResponse.setMeta(offsetRes);
- }
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+
+ // Mark last message as heartbeat
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
- // set meta for binlog event
- if (sourceReader.isBinlogSplit(split)) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- recordResponse.setMeta(offsetRes);
+ // If already have data or timeout, stop when
heartbeat received
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ elapsedTime >
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+
+ if (hasReceivedData || timeoutReached) {
+ LOG.info(
+ "Heartbeat received after {} data records,
stopping",
+ recordResponse.getRecords().size());
+ shouldStop = true;
+ break;
+ }
Review Comment:
In the fetchRecords flow, if hasReceivedData is true (data was received) or
timeoutReached is true, the loop stops immediately when a heartbeat is
received. However, the condition at line 157 uses OR logic: "if
(hasReceivedData || timeoutReached)". This means that even if we have received
data but timeout hasn't been reached yet, we'll stop on the first heartbeat.
This could result in returning prematurely when more data is still available in
the queue. Consider using AND logic: "if (hasReceivedData && timeoutReached)"
to only stop when both conditions are met.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SplitReadResult.java:
##########
@@ -18,16 +18,12 @@
package org.apache.doris.cdcclient.source.reader;
import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.kafka.connect.source.SourceRecord;
-
-import java.util.Iterator;
import lombok.Data;
/** The result of reading a split with iterator. */
Review Comment:
The class comment still says "The result of reading a split with iterator"
but the recordIterator field has been removed as part of this refactoring. The
comment should be updated to reflect that this class now only contains the
split and splitState, and that iteration is now handled separately via the
pollRecords method.
```suggestion
/**
* Container for a source split and its associated state.
* Iteration over records for this split is handled separately (for example
via pollRecords).
*/
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +223,227 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information.
+ *
+ * <p>Snapshot split: Returns immediately after reading; otherwise,
returns after the
+ * maxInterval.
+ *
+ * <p>Binlog split: Fetches data at the maxInterval. Returns immediately
if no data is found; if
+ * found, checks if the last record is a heartbeat record. If it is,
returns immediately;
+ * otherwise, fetches again until the heartbeat deadline.
+ *
+ * <p>Heartbeat events will carry the latest offset.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
+ boolean lastMessageIsHeartbeat = false;
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ isSnapshotSplit);
+
+ // 2. poll record
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+
+ // Check if should stop
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (shouldStop(
+ isSnapshotSplit,
+ scannedRows > 0,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ maxIntervalMillis,
+ timeoutReached)) {
+ break;
}
+ continue;
}
- // Check if maxInterval has been exceeded
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
- break;
+
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+
+ // Mark last message as heartbeat (only for binlog
split)
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
+
+ // If already timeout, stop immediately when heartbeat
received
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (!isSnapshotSplit && timeoutReached) {
+ LOG.info(
+ "Binlog split max interval reached and
heartbeat received, stopping data reading");
+ shouldStop = true;
+ break;
+ }
+ // Skip heartbeat messages during normal processing
+ continue;
+ }
+
+ // Process data messages
+ List<String> serializedRecords =
+
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+ if (!CollectionUtils.isEmpty(serializedRecords)) {
+ String database = writeRecordRequest.getTargetDb();
+ String table = extractTable(element);
+ for (String record : serializedRecords) {
+ scannedRows++;
+ byte[] dataBytes = record.getBytes();
+ scannedBytes += dataBytes.length;
+ batchStreamLoad.writeRecord(database, table,
dataBytes);
+ }
+ // Mark last message as data (not heartbeat)
+ lastMessageIsHeartbeat = false;
+ }
}
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ scannedRows,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+
} finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
+ // 3. Extract offset from split state
+ metaResponse = extractOffsetMeta(sourceReader, readResult);
+ // 4. wait all stream load finish
+ batchStreamLoad.forceFlush();
+
+ // 5. request fe api update offset
+ String currentTaskId = batchStreamLoad.getCurrentTaskId();
+ // The offset must be reset before commitOffset to prevent the next
taskId from being create
+ // by the fe.
+ batchStreamLoad.resetTaskId();
+ batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows,
scannedBytes);
+ }
+
+ public static boolean isHeartbeatEvent(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null
+ &&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+ }
+
+ /**
+ * Determine if we should stop polling.
+ *
+ * @param isSnapshotSplit whether this is a snapshot split
+ * @param hasData whether we have received any data
+ * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+ * @param elapsedTime total elapsed time in milliseconds
+ * @param maxIntervalMillis max interval in milliseconds
+ * @param timeoutReached whether timeout is reached
+ * @return true if should stop, false if should continue
+ */
+ private boolean shouldStop(
+ boolean isSnapshotSplit,
+ boolean hasData,
+ boolean lastMessageIsHeartbeat,
+ long elapsedTime,
+ long maxIntervalMillis,
+ boolean timeoutReached) {
+
+ // 1. Snapshot split with data: if no more data in queue, stop
immediately (no need to wait
+ // for timeout)
+ // snapshot split will be written to the debezium queue all at once.
+ if (isSnapshotSplit && hasData) {
+ LOG.info(
+ "Snapshot split finished, no more data available. Total
elapsed: {} ms",
+ elapsedTime);
+ return true;
Review Comment:
The shouldStop logic for snapshot splits at lines 385-389 returns true when
"isSnapshotSplit && hasData" without checking if there's more data in the
iterator. This is based on the assumption that "snapshot split will be written
to the debezium queue all at once" (line 384). However, the check happens when
"!recordIterator.hasNext()" (line 121), which means we've already exhausted the
current batch from the queue. For snapshot splits, this is correct behavior,
but the comment at line 384 could be misleading if Debezium actually streams
snapshot data in multiple batches. Consider verifying this assumption or adding
more defensive checks.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -796,6 +717,8 @@ public boolean hasNext() {
BinlogOffset position =
RecordUtils.getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}
+ nextRecord = element;
+ return true;
Review Comment:
FilteredRecordIterator now returns heartbeat events (sets nextRecord and
returns true), but PipelineCoordinator also checks for heartbeat events
separately. This creates double processing of heartbeat events. The
FilteredRecordIterator should either filter out heartbeat events completely
(not return them) or PipelineCoordinator should not do separate heartbeat
checking. Consider either:
1. Removing lines 720-721 so heartbeat events are filtered out in the
iterator (like watermark events), OR
2. Update PipelineCoordinator to not check for heartbeat events since the
iterator already handles offset updates
```suggestion
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,64 +168,62 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/binlog split based on offset and
start the reader.
- MySqlSplit split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- DebeziumReader<SourceRecords, MySqlSplit> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
- // build split
- Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // reset binlog reader
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof BinlogSplitReader) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ // build split
+ Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta,
baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+ if (this.currentSplit instanceof MySqlSnapshotSplit) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+ this.currentReader = getBinlogSplitReader(baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ throw new IllegalStateException(
+ "Unsupported MySqlSplit type: " +
this.currentSplit.getClass().getName());
}
Review Comment:
In prepareAndSubmitSplit, a new reader is created every time (either
SnapshotSplitReader or BinlogSplitReader at lines 182 or 184), but the old
currentReader is not closed before being replaced. This could lead to resource
leaks (unclosed database connections, threads, etc.). Before creating a new
reader, you should check if currentReader exists and close it if necessary,
similar to how the old code called closeCurrentReader().
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,64 +168,62 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/binlog split based on offset and
start the reader.
- MySqlSplit split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- DebeziumReader<SourceRecords, MySqlSplit> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
- // build split
- Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // reset binlog reader
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof BinlogSplitReader) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ // build split
+ Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta,
baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+ if (this.currentSplit instanceof MySqlSnapshotSplit) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+ this.currentReader = getBinlogSplitReader(baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ throw new IllegalStateException(
+ "Unsupported MySqlSplit type: " +
this.currentSplit.getClass().getName());
}
- // build response with iterator
- SplitReadResult result = new SplitReadResult();
+ this.currentReader.submitSplit(this.currentSplit);
MySqlSplitState currentSplitState = null;
- MySqlSplit currentSplit = this.getCurrentSplit();
- if (currentSplit.isSnapshotSplit()) {
- currentSplitState = new
MySqlSnapshotSplitState(currentSplit.asSnapshotSplit());
+ if (this.currentSplit.isSnapshotSplit()) {
+ currentSplitState = new
MySqlSnapshotSplitState(this.currentSplit.asSnapshotSplit());
} else {
- currentSplitState = new
MySqlBinlogSplitState(currentSplit.asBinlogSplit());
+ currentSplitState = new
MySqlBinlogSplitState(this.currentSplit.asBinlogSplit());
}
-
- Iterator<SourceRecord> filteredIterator =
- new FilteredRecordIterator(currentSplitRecords,
currentSplitState);
-
- result.setRecordIterator(filteredIterator);
+ SplitReadResult result = new SplitReadResult();
+ result.setSplit(this.currentSplit);
result.setSplitState(currentSplitState);
- result.setSplit(split);
return result;
}
+ @Override
+ public Iterator<SourceRecord> pollRecords(Object splitState) throws
InterruptedException {
+ Preconditions.checkState(this.currentReader != null, "currentReader is
null");
+ Preconditions.checkNotNull(splitState, "splitState is null");
+ Preconditions.checkState(
+ splitState instanceof MySqlSplitState,
+ "splitState type is invalid " + splitState.getClass());
+
+ // Poll data from Debezium queue
+ Iterator<SourceRecords> dataIt = currentReader.pollSplitRecords();
+ if (dataIt == null || !dataIt.hasNext()) {
+ return Collections.emptyIterator(); // No data available
+ }
+
+ SourceRecords sourceRecords = dataIt.next();
+ SplitRecords splitRecords =
+ new SplitRecords(this.currentSplit.splitId(),
sourceRecords.iterator());
+ if (!sourceRecords.getSourceRecordList().isEmpty()) {
+ LOG.info("{} Records received",
sourceRecords.getSourceRecordList().size());
+ }
+
+ return new FilteredRecordIterator(splitRecords, (MySqlSplitState)
splitState);
+ }
Review Comment:
The method signature change removes the recordIterator field from
SplitReadResult, but the Iterator is still being created and used in
FilteredRecordIterator. However, there's a logical issue: if pollRecords is
called multiple times on the same split, a new FilteredRecordIterator is
created each time, but the underlying currentSplit and currentReader are
reused. This could lead to state inconsistencies if the iterator is not fully
consumed before the next call to pollRecords. Consider adding validation to
ensure the previous iterator was fully consumed or reset the reader state
appropriately.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -672,6 +594,8 @@ public boolean hasNext() {
Offset position = createOffset(element.sourceOffset());
splitState.asStreamSplitState().setStartingOffset(position);
}
+ nextRecord = element;
+ return true;
Review Comment:
Similar to MySqlSourceReader, FilteredRecordIterator now returns heartbeat
events (sets nextRecord and returns true at lines 597-598), but
PipelineCoordinator also checks for heartbeat events separately. This creates
double processing. The iterator should either filter out heartbeat events
completely or PipelineCoordinator should not do separate heartbeat checking.
```suggestion
// Do not emit heartbeat events as records; only update
state.
continue;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]