Copilot commented on code in PR #60181:
URL: https://github.com/apache/doris/pull/60181#discussion_r2720364429
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +169,56 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/binlog split based on offset and
start the reader.
- MySqlSplit split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- DebeziumReader<SourceRecords, MySqlSplit> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
- // build split
- Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // reset binlog reader
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof BinlogSplitReader) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ // build split
+ Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta,
baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+ if (this.currentSplit instanceof MySqlSnapshotSplit) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+ this.currentReader = getBinlogSplitReader(baseReq);
+ }
+ this.currentReader.submitSplit(this.currentSplit);
Review Comment:
Potential resource leak: if prepareAndSubmitSplit is called on a
SourceReader that already has a currentReader, the old reader will be
overwritten without being closed. Since SourceReader instances are reused
across multiple requests for the same jobId (as seen in Env.getOrCreateReader),
this could happen if finishSplitRecords fails or if there's an exception before
cleanup. Consider closing the existing currentReader before creating a new one,
or add a check to ensure currentReader is null before proceeding.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +215,134 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information. 1. poll
+ * interval record. 2. Try to retrieve the heartbeat message, as it
returns the latest offset,
+ * preventing the next task from having to skip a large number of records.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
- }
- }
- // Check if maxInterval has been exceeded
+ // 2. poll record
+ while (!shouldStop) {
long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
+ boolean timeoutReached = maxIntervalMillis > 0 && elapsedTime
>= maxIntervalMillis;
+
+ // Timeout protection: force stop if waiting for heartbeat
exceeds threshold
+ // After reaching maxInterval, wait at most
DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3 for
+ // heartbeat
+ if (timeoutReached
+ && elapsedTime
+ > maxIntervalMillis
+ +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+ LOG.warn(
+ "Heartbeat wait timeout after {} ms since timeout
reached, force stopping. "
+ + "Total elapsed: {} ms",
+ elapsedTime - maxIntervalMillis,
+ elapsedTime);
break;
}
- }
- } finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
- }
- // get offset from split state
- try {
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
+
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
Review Comment:
Using a fixed 100ms sleep in a tight polling loop could lead to high CPU
usage when waiting for data. The sleep interval should be configurable or use a
longer delay (e.g., related to DEBEZIUM_POLL_INTERVAL_MS constant).
Additionally, consider using a blocking queue or proper wait/notify mechanism
if the underlying Debezium reader supports it to avoid busy-waiting.
```suggestion
TimeUnit.MILLISECONDS.sleep(Constants.DEBEZIUM_POLL_INTERVAL_MS);
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java:
##########
@@ -22,7 +22,7 @@ public class Constants {
public static final long POLL_SPLIT_RECORDS_TIMEOUTS = 15000L;
// Debezium default properties
- public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L;
+ public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L;
Review Comment:
The heartbeat interval was reduced from 10000ms to 3000ms. While this is a
reasonable optimization for faster offset updates and connection issue
detection, it will result in more frequent heartbeat messages and potentially
increased network traffic and CPU usage. Consider making this value
configurable through a system property or configuration file, allowing
administrators to tune it based on their specific workload and network
characteristics.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -83,67 +87,101 @@ public PipelineCoordinator() {
public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest)
throws Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(fetchRecordRequest);
- SplitReadResult readResult =
sourceReader.readSplitRecords(fetchRecordRequest);
+ SplitReadResult readResult =
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
return buildRecordResponse(sourceReader, fetchRecordRequest,
readResult);
}
- /** build RecordWithMeta */
+ /**
+ * Build RecordWithMeta response
+ *
+ * <p>This method polls records until: 1. Data is received AND heartbeat
is received (normal
+ * case) 2. Timeout is reached (with heartbeat wait protection)
+ */
private RecordWithMeta buildRecordResponse(
SourceReader sourceReader, FetchRecordRequest fetchRecord,
SplitReadResult readResult)
throws Exception {
RecordWithMeta recordResponse = new RecordWithMeta();
- SourceSplit split = readResult.getSplit();
- int count = 0;
try {
- // Serialize records and add them to the response (collect from
iterator)
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
- sourceReader.deserialize(fetchRecord.getConfig(),
element);
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- recordResponse.getRecords().addAll(serializedRecords);
- count += serializedRecords.size();
- if (sourceReader.isBinlogSplit(split)) {
- // put offset for event
- Map<String, String> lastMeta =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- recordResponse.setMeta(lastMeta);
+ long startTime = System.currentTimeMillis();
+ boolean shouldStop = false;
+ boolean hasReceivedData = false;
+ int heartbeatCount = 0;
+ int recordCount = 0;
+
+ while (!shouldStop) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached = elapsedTime >
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+
+ // Timeout protection: force stop if waiting for heartbeat
exceeds threshold
+ // After reaching timeout, wait at most
DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3 for
+ // heartbeat
+ if (timeoutReached
+ && elapsedTime
+ > Constants.POLL_SPLIT_RECORDS_TIMEOUTS
+ +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+ LOG.warn(
+ "Heartbeat wait timeout after {} ms since timeout
reached, force stopping. "
+ + "Total elapsed: {} ms",
+ elapsedTime -
Constants.POLL_SPLIT_RECORDS_TIMEOUTS,
+ elapsedTime);
+ break;
+ }
+
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
+
+ if (!recordIterator.hasNext()) {
+ // If we have data and reached timeout, continue waiting
for heartbeat
+ if (hasReceivedData && timeoutReached) {
+ Thread.sleep(100);
+ continue;
}
+ // Otherwise, just wait for more data
+ Thread.sleep(100);
Review Comment:
Using a fixed 100ms sleep in a tight polling loop could lead to high CPU
usage when waiting for data. The sleep interval should be configurable or use a
longer delay (e.g., related to DEBEZIUM_POLL_INTERVAL_MS constant).
Additionally, consider using a blocking queue or proper wait/notify mechanism
if the underlying Debezium reader supports it to avoid busy-waiting.
```suggestion
Thread.sleep(Constants.DEBEZIUM_POLL_INTERVAL_MS);
continue;
}
// Otherwise, just wait for more data
Thread.sleep(Constants.DEBEZIUM_POLL_INTERVAL_MS);
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +215,134 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information. 1. poll
+ * interval record. 2. Try to retrieve the heartbeat message, as it
returns the latest offset,
+ * preventing the next task from having to skip a large number of records.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
- }
- }
- // Check if maxInterval has been exceeded
+ // 2. poll record
+ while (!shouldStop) {
long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
+ boolean timeoutReached = maxIntervalMillis > 0 && elapsedTime
>= maxIntervalMillis;
+
+ // Timeout protection: force stop if waiting for heartbeat
exceeds threshold
+ // After reaching maxInterval, wait at most
DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3 for
+ // heartbeat
+ if (timeoutReached
+ && elapsedTime
+ > maxIntervalMillis
+ +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+ LOG.warn(
+ "Heartbeat wait timeout after {} ms since timeout
reached, force stopping. "
+ + "Total elapsed: {} ms",
+ elapsedTime - maxIntervalMillis,
+ elapsedTime);
break;
}
- }
- } finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
- }
- // get offset from split state
- try {
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
+ Iterator<SourceRecord> recordIterator =
+ sourceReader.pollRecords(readResult.getSplitState());
+
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+ continue;
}
- // set meta for binlog event
- if (sourceReader.isBinlogSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- metaResponse = offsetRes;
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+ if (timeoutReached) {
+ LOG.info(
+ "Max interval reached and heartbeat
received, stopping data reading");
+ shouldStop = true;
+ break;
+ }
+ // Skip heartbeat messages during normal processing
+ continue;
+ }
+
+ // Process data messages
+ List<String> serializedRecords =
+
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+ if (!CollectionUtils.isEmpty(serializedRecords)) {
+ String database = writeRecordRequest.getTargetDb();
+ String table = extractTable(element);
+ for (String record : serializedRecords) {
+ scannedRows++;
+ byte[] dataBytes = record.getBytes();
+ scannedBytes += dataBytes.length;
+ batchStreamLoad.writeRecord(database, table,
dataBytes);
+ }
+ }
}
- } else {
- throw new RuntimeException("split state is null");
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ scannedRows,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+
+ } finally {
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
- // wait all stream load finish
+ // 3. Extract offset from split state
+ metaResponse = extractOffsetMeta(sourceReader, readResult);
+ try {
+ // 4. wait all stream load finish
batchStreamLoad.forceFlush();
- // request fe api
+ // 5. request fe api update offset
batchStreamLoad.commitOffset(metaResponse, scannedRows,
scannedBytes);
} finally {
batchStreamLoad.resetTaskId();
}
}
- private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String
targetDb) {
- return batchStreamLoadMap.computeIfAbsent(
- jobId,
- k -> {
- LOG.info("Create DorisBatchStreamLoad for jobId={}",
jobId);
- return new DorisBatchStreamLoad(jobId, targetDb);
- });
+ public static boolean isHeartbeatEvent(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null
+ &&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+ }
Review Comment:
The heartbeat detection method here may be inconsistent with the one used in
FilteredRecordIterator classes. For MySQL, RecordUtils.isHeartbeatEvent is
used, while for Postgres/JDBC sources, SourceRecordUtils.isHeartbeatEvent is
used. This PipelineCoordinator.isHeartbeatEvent implementation manually checks
the schema name. Consider using the same utility methods (RecordUtils or
SourceRecordUtils) that are used in the FilteredRecordIterator to ensure
consistency across the codebase. If the implementations differ, heartbeat
events may not be detected correctly, leading to infinite waiting or incorrect
offset management.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +169,56 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/binlog split based on offset and
start the reader.
- MySqlSplit split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- DebeziumReader<SourceRecords, MySqlSplit> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
- // build split
- Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // reset binlog reader
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof BinlogSplitReader) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ // build split
+ Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta,
baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+ if (this.currentSplit instanceof MySqlSnapshotSplit) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit instanceof MySqlBinlogSplit) {
+ this.currentReader = getBinlogSplitReader(baseReq);
Review Comment:
Missing else clause for invalid split type. If the split is neither a
snapshot split nor a binlog split, currentReader will not be initialized,
leading to a NullPointerException on line 187. Add an else clause that throws
an IllegalStateException with a descriptive message to handle this edge case
explicitly.
```suggestion
this.currentReader = getBinlogSplitReader(baseReq);
} else {
throw new IllegalStateException(
"Unsupported MySqlSplit type: " +
this.currentSplit.getClass().getName());
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,70 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
- LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/stream split based on offset and
start the reader.
- SourceSplitBase split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- Fetcher<SourceRecords, SourceSplitBase> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader for job {}",
- baseReq.isReload(),
- baseReq.getJobId());
- // build split
- Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof
IncrementalSourceStreamFetcher) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
- } else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ LOG.info("Job {} prepare and submit split with offset: {}",
baseReq.getJobId(), offsetMeta);
+ // Build split
+ Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+
+ // Create reader based on split type
+ if (this.currentSplit.isSnapshotSplit()) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit.isStreamSplit()) {
+ this.currentReader = getBinlogSplitReader(baseReq);
}
Review Comment:
Potential resource leak: if prepareAndSubmitSplit is called on a
SourceReader that already has a currentReader, the old reader will be
overwritten without being closed. Since SourceReader instances are reused
across multiple requests for the same jobId (as seen in Env.getOrCreateReader),
this could happen if finishSplitRecords fails or if there's an exception before
cleanup. Consider closing the existing currentReader before creating a new one,
or add a check to ensure currentReader is null before proceeding.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -149,64 +147,70 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
- LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
-
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/stream split based on offset and
start the reader.
- SourceSplitBase split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- Fetcher<SourceRecords, SourceSplitBase> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader for job {}",
- baseReq.isReload(),
- baseReq.getJobId());
- // build split
- Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof
IncrementalSourceStreamFetcher) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
- } else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ LOG.info("Job {} prepare and submit split with offset: {}",
baseReq.getJobId(), offsetMeta);
+ // Build split
+ Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
+ this.currentSplit = splitFlag.f0;
+ LOG.info("Get a split: {}", this.currentSplit.toString());
+
+ // Create reader based on split type
+ if (this.currentSplit.isSnapshotSplit()) {
+ this.currentReader = getSnapshotSplitReader(baseReq);
+ } else if (this.currentSplit.isStreamSplit()) {
+ this.currentReader = getBinlogSplitReader(baseReq);
Review Comment:
Missing else clause for invalid split type. If the split is neither a
snapshot split nor a stream split, currentReader will not be initialized,
leading to a NullPointerException on line 171. Add an else clause that throws
an IllegalStateException with a descriptive message to handle this edge case
explicitly.
```suggestion
this.currentReader = getBinlogSplitReader(baseReq);
} else {
throw new IllegalStateException(
"Unsupported split type: " + this.currentSplit + ".
Expected snapshot or stream split.");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]