Copilot commented on code in PR #60181:
URL: https://github.com/apache/doris/pull/60181#discussion_r2740831427
##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java:
##########
@@ -32,5 +32,6 @@ public class DataSourceConfigKeys {
public static final String OFFSET_INITIAL = "initial";
public static final String OFFSET_EARLIEST = "earliest";
public static final String OFFSET_LATEST = "latest";
Review Comment:
Breaking API change: the configuration key "split_size" has been renamed to
"snapshot_split_size". This will break existing jobs that use the old
configuration key. Consider supporting both keys with deprecation warning for
backward compatibility, or document this breaking change in the release notes.
```suggestion
public static final String OFFSET_LATEST = "latest";
/**
* Deprecated: use {@link #SNAPSHOT_SPLIT_SIZE} instead.
* Kept for backward compatibility with existing jobs using the
"split_size" key.
*/
@Deprecated
public static final String SPLIT_SIZE = "split_size";
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -68,6 +69,8 @@
public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
public static final String SPLIT_ID = "splitId";
private static final ObjectMapper objectMapper = new ObjectMapper();
Review Comment:
The default value for SNAPSHOT_PARALLELISM_DEFAULT is set to 4, but there's
no documentation or comment explaining why 4 was chosen as the default.
Consider adding a comment explaining the rationale for this value and under
what conditions users should adjust it.
```suggestion
private static final ObjectMapper objectMapper = new ObjectMapper();
// Default snapshot parallelism used when
DataSourceConfigKeys.SNAPSHOT_PARALLELISM is not set.
// 4 is chosen as a conservative default that provides some parallelism
on typical production
// machines without putting too much load on the source database or the
Doris FE. Increase this
// value if the source database and network can safely handle more
concurrent snapshot reads,
// or decrease it for small / under-provisioned databases or when
snapshotting causes high load.
```
##########
regression-test/conf/regression-conf.groovy:
##########
@@ -24,7 +24,7 @@ defaultDb = "regression_test"
// init cmd like: select @@session.tx_read_only
// at each time we connect.
// add allowLoadLocalInfile so that the jdbc can execute mysql load data from
client.
-jdbcUrl =
"jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round"
+jdbcUrl =
"jdbc:mysql://10.16.10.6:29939/?useLocalSessionState=true&allowLoadLocalInfile=true&zeroDateTimeBehavior=round"
Review Comment:
Hardcoded IP address and port in configuration file. This configuration
should not be committed to the repository as it contains environment-specific
settings (10.16.10.6:29939). This will break tests in other environments.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -162,42 +163,54 @@ public void writeRecord(String database, String table,
byte[] record) {
lock.unlock();
}
}
+
+ // Single table flush according to the STREAM_LOAD_MAX_BYTES
+ if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES) {
+ bufferFullFlush(bufferKey);
+ }
+ }
+
+ public void bufferFullFlush(String bufferKey) {
+ doFlush(bufferKey, false, true);
}
- public boolean cacheFullFlush() {
- return doFlush(true, true);
+ public void forceFlush() {
+ doFlush(null, true, false);
}
- public boolean forceFlush() {
- return doFlush(true, false);
+ public void cacheFullFlush() {
+ doFlush(null, true, true);
}
- private synchronized boolean doFlush(boolean waitUtilDone, boolean
cacheFull) {
+
+ private synchronized void doFlush(String bufferKey, boolean waitUtilDone,
boolean bufferFull) {
checkFlushException();
- if (waitUtilDone || cacheFull) {
- return flush(waitUtilDone);
+ if (waitUtilDone || bufferFull) {
+ flush(bufferKey, waitUtilDone);
}
- return false;
}
- private synchronized boolean flush(boolean waitUtilDone) {
+ private synchronized void flush(String bufferKey, boolean waitUtilDone) {
if (!waitUtilDone && bufferMap.isEmpty()) {
// bufferMap may have been flushed by other threads
LOG.info("bufferMap is empty, no need to flush");
- return false;
+ return;
}
- for (String key : bufferMap.keySet()) {
- if (waitUtilDone) {
- // Ensure that the interval satisfies intervalMS
- flushBuffer(key);
+
+ if (null == bufferKey) {
+ for (String key : bufferMap.keySet()) {
+ if (waitUtilDone) {
+ flushBuffer(key);
+ }
}
- }
- if (!waitUtilDone) {
- return false;
+ } else if (bufferMap.containsKey(bufferKey)) {
+ flushBuffer(bufferKey);
} else {
+ LOG.warn("buffer not found for key: {}, may be already flushed.",
bufferKey);
+ }
+ if (waitUtilDone) {
waitAsyncLoadFinish();
}
- return true;
}
Review Comment:
The flush method signature has changed from returning boolean to void, but
the return value was used by callers to determine if flush was successful. This
is a breaking API change. Verify that all callers have been updated to handle
the new void return type appropriately.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +178,234 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
+
LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/binlog split based on offset and
start the reader.
- MySqlSplit split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- DebeziumReader<SourceRecords, MySqlSplit> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
- // build split
- Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // reset binlog reader
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof BinlogSplitReader) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ String splitId = String.valueOf(offsetMeta.get(SPLIT_ID));
+ if (BINLOG_SPLIT_ID.equals(splitId)){
+ // Binlog split mode
+ return prepareBinlogSplit(offsetMeta, baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ // Extract snapshot split list
+ List<MySqlSnapshotSplit> snapshotSplits =
extractSnapshotSplits(offsetMeta, baseReq);
+ return prepareSnapshotSplits(snapshotSplits, baseReq);
}
-
- // build response with iterator
+ }
+
+ /**
+ * Extract snapshot splits from meta.
+ * Only supports format: {"splits": [{"splitId": "xxx", ...}, ...]}
+ *
+ * @return List of snapshot splits, or null if it's a binlog split
+ */
+ private List<MySqlSnapshotSplit> extractSnapshotSplits(
+ Map<String, Object> offsetMeta,
+ JobBaseRecordRequest baseReq) throws JsonProcessingException {
+
+ // Check if it contains "splits" array
+ Object splitsObj = offsetMeta.get("splits");
+ if (splitsObj == null) {
+ throw new RuntimeException("Invalid meta format: missing 'splits'
array");
+ }
+
+ if (!(splitsObj instanceof List)) {
+ throw new RuntimeException("Invalid meta format: 'splits' must be
an array");
+ }
+
+ // Parse splits array
+ List<Map<String, Object>> splitMetaList = (List<Map<String, Object>>)
splitsObj;
+ if (splitMetaList.isEmpty()) {
+ throw new RuntimeException("Invalid meta format: 'splits' array is
empty");
+ }
+
+ List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
+ for (Map<String, Object> splitMeta : splitMetaList) {
+ MySqlSnapshotSplit split = createSnapshotSplit(splitMeta, baseReq);
+ snapshotSplits.add(split);
+ }
+
+ LOG.info("Extracted {} snapshot split(s) from meta",
snapshotSplits.size());
+ return snapshotSplits;
+ }
+
+ /**
+ * Prepare snapshot splits (unified handling for single or multiple splits)
+ */
+ private SplitReadResult prepareSnapshotSplits(
+ List<MySqlSnapshotSplit> splits,
+ JobBaseRecordRequest baseReq) throws Exception {
+
+ LOG.info("Preparing {} snapshot split(s) for reading", splits.size());
+
+ // Clear previous contexts
+ this.snapshotReaderContexts.clear();
+ this.currentReaderIndex = 0;
+
+ // Create reader for each split and submit
+ for (int i = 0; i < splits.size(); i++) {
+ MySqlSnapshotSplit split = splits.get(i);
+
+ // Create independent reader (each has its own Debezium queue)
+ SnapshotSplitReader reader = getSnapshotSplitReader(baseReq, i);
+
+ // Submit split (triggers async reading, data goes into reader's
Debezium queue)
+ reader.submitSplit(split);
+
+ // Create split state
+ MySqlSnapshotSplitState splitState = new
MySqlSnapshotSplitState(split);
+
+ // Save context using generic SnapshotReaderContext
+ SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState> context =
+ new SnapshotReaderContext<>(split, reader, splitState);
+ snapshotReaderContexts.add(context);
+
+ LOG.info("Created reader {}/{} and submitted split: {} (table:
{})",
+ i + 1, splits.size(), split.splitId(),
split.getTableId().identifier());
Review Comment:
Potential resource leak: if an exception occurs after some snapshot readers
are created but before all are added to snapshotReaderContexts, the
already-created readers will not be closed. Consider using try-catch to ensure
cleanup, or add all contexts first with null readers and then initialize them.
```suggestion
// Track created readers so we can clean them up on failure
List<SnapshotSplitReader> createdReaders = new ArrayList<>();
try {
// Create reader for each split and submit
for (int i = 0; i < splits.size(); i++) {
MySqlSnapshotSplit split = splits.get(i);
// Create independent reader (each has its own Debezium
queue)
SnapshotSplitReader reader = getSnapshotSplitReader(baseReq,
i);
// Add to tracking list immediately after creation
createdReaders.add(reader);
// Submit split (triggers async reading, data goes into
reader's Debezium queue)
reader.submitSplit(split);
// Create split state
MySqlSnapshotSplitState splitState = new
MySqlSnapshotSplitState(split);
// Save context using generic SnapshotReaderContext
SnapshotReaderContext<MySqlSnapshotSplit,
SnapshotSplitReader, MySqlSnapshotSplitState> context =
new SnapshotReaderContext<>(split, reader,
splitState);
snapshotReaderContexts.add(context);
LOG.info("Created reader {}/{} and submitted split: {}
(table: {})",
i + 1, splits.size(), split.splitId(),
split.getTableId().identifier());
}
} catch (Exception e) {
// Best-effort cleanup of any readers that were created before
the failure
for (SnapshotSplitReader reader : createdReaders) {
if (reader != null) {
try {
reader.close();
} catch (Exception closeEx) {
LOG.warn("Error closing snapshot split reader during
cleanup", closeEx);
}
}
}
throw e;
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -162,42 +163,54 @@ public void writeRecord(String database, String table,
byte[] record) {
lock.unlock();
}
}
+
+ // Single table flush according to the STREAM_LOAD_MAX_BYTES
+ if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES) {
+ bufferFullFlush(bufferKey);
+ }
+ }
+
+ public void bufferFullFlush(String bufferKey) {
+ doFlush(bufferKey, false, true);
}
Review Comment:
The new bufferFullFlush method immediately flushes when buffer is full, but
there's no synchronization between writeRecord and bufferFullFlush. If multiple
threads are calling writeRecord concurrently, they could trigger multiple
flushes for the same buffer. Consider adding proper synchronization or using
the existing doFlush method's synchronization.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -796,6 +895,8 @@ public boolean hasNext() {
BinlogOffset position =
RecordUtils.getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}
Review Comment:
Missing return statement after storing nextRecord in hasNext(). The code
sets 'nextRecord = element' and returns true, but then continues to the next
check for isDataChangeRecord. This appears to be correct behavior for heartbeat
events, but the code flow could be clearer with a comment explaining that
heartbeat processing doesn't stop iteration.
```suggestion
}
// Heartbeat events are exposed as the next record and
stop iteration
// for this hasNext() call; we do not fall through to
data-change handling.
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -165,62 +178,234 @@ public List<AbstractSourceSplit>
getSourceSplits(FetchTableSplitsRequest ftsReq)
}
@Override
- public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq)
throws Exception {
+ public SplitReadResult prepareAndSubmitSplit(JobBaseRecordRequest baseReq)
throws Exception {
Map<String, Object> offsetMeta = baseReq.getMeta();
if (offsetMeta == null || offsetMeta.isEmpty()) {
throw new RuntimeException("miss meta offset");
}
+
LOG.info("Job {} read split records with offset: {}",
baseReq.getJobId(), offsetMeta);
- // If there is an active split being consumed, reuse it directly;
- // Otherwise, create a new snapshot/binlog split based on offset and
start the reader.
- MySqlSplit split = null;
- SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
- if (currentSplitRecords == null) {
- DebeziumReader<SourceRecords, MySqlSplit> currentReader =
this.getCurrentReader();
- if (baseReq.isReload() || currentReader == null) {
- LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
- // build split
- Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
- split = splitFlag.f0;
- // reset binlog reader
- // closeBinlogReader();
- currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
- this.setCurrentSplitRecords(currentSplitRecords);
- this.setCurrentSplit(split);
- } else if (currentReader instanceof BinlogSplitReader) {
- LOG.info("Continue poll records with current binlog reader");
- // only for binlog reader
- currentSplitRecords =
pollSplitRecordsWithCurrentReader(currentReader);
- split = this.getCurrentSplit();
- } else {
- throw new RuntimeException("Should not happen");
- }
+ String splitId = String.valueOf(offsetMeta.get(SPLIT_ID));
+ if (BINLOG_SPLIT_ID.equals(splitId)){
+ // Binlog split mode
+ return prepareBinlogSplit(offsetMeta, baseReq);
} else {
- LOG.info(
- "Continue read records with current split records,
splitId: {}",
- currentSplitRecords.getSplitId());
+ // Extract snapshot split list
+ List<MySqlSnapshotSplit> snapshotSplits =
extractSnapshotSplits(offsetMeta, baseReq);
+ return prepareSnapshotSplits(snapshotSplits, baseReq);
}
-
- // build response with iterator
+ }
+
+ /**
+ * Extract snapshot splits from meta.
+ * Only supports format: {"splits": [{"splitId": "xxx", ...}, ...]}
+ *
+ * @return List of snapshot splits, or null if it's a binlog split
+ */
+ private List<MySqlSnapshotSplit> extractSnapshotSplits(
+ Map<String, Object> offsetMeta,
+ JobBaseRecordRequest baseReq) throws JsonProcessingException {
+
+ // Check if it contains "splits" array
+ Object splitsObj = offsetMeta.get("splits");
+ if (splitsObj == null) {
+ throw new RuntimeException("Invalid meta format: missing 'splits'
array");
+ }
+
+ if (!(splitsObj instanceof List)) {
+ throw new RuntimeException("Invalid meta format: 'splits' must be
an array");
+ }
+
+ // Parse splits array
+ List<Map<String, Object>> splitMetaList = (List<Map<String, Object>>)
splitsObj;
+ if (splitMetaList.isEmpty()) {
+ throw new RuntimeException("Invalid meta format: 'splits' array is
empty");
+ }
+
+ List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
+ for (Map<String, Object> splitMeta : splitMetaList) {
+ MySqlSnapshotSplit split = createSnapshotSplit(splitMeta, baseReq);
+ snapshotSplits.add(split);
+ }
+
+ LOG.info("Extracted {} snapshot split(s) from meta",
snapshotSplits.size());
+ return snapshotSplits;
+ }
+
+ /**
+ * Prepare snapshot splits (unified handling for single or multiple splits)
+ */
+ private SplitReadResult prepareSnapshotSplits(
+ List<MySqlSnapshotSplit> splits,
+ JobBaseRecordRequest baseReq) throws Exception {
+
+ LOG.info("Preparing {} snapshot split(s) for reading", splits.size());
+
+ // Clear previous contexts
+ this.snapshotReaderContexts.clear();
+ this.currentReaderIndex = 0;
+
+ // Create reader for each split and submit
+ for (int i = 0; i < splits.size(); i++) {
+ MySqlSnapshotSplit split = splits.get(i);
+
+ // Create independent reader (each has its own Debezium queue)
+ SnapshotSplitReader reader = getSnapshotSplitReader(baseReq, i);
+
+ // Submit split (triggers async reading, data goes into reader's
Debezium queue)
+ reader.submitSplit(split);
+
+ // Create split state
+ MySqlSnapshotSplitState splitState = new
MySqlSnapshotSplitState(split);
+
+ // Save context using generic SnapshotReaderContext
+ SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState> context =
+ new SnapshotReaderContext<>(split, reader, splitState);
+ snapshotReaderContexts.add(context);
+
+ LOG.info("Created reader {}/{} and submitted split: {} (table:
{})",
+ i + 1, splits.size(), split.splitId(),
split.getTableId().identifier());
+ }
+
+ // Construct return result with all splits and states
+ SplitReadResult result = new SplitReadResult();
+
+ List<SourceSplit> allSplits = new ArrayList<>();
+ Map<String, Object> allStates = new HashMap<>();
+
+ for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState> context
+ : snapshotReaderContexts) {
+ MySqlSnapshotSplit split = context.getSplit();
+ allSplits.add(split);
+ allStates.put(split.splitId(), context.getSplitState());
+ }
+
+ result.setSplits(allSplits);
+ result.setSplitStates(allStates);
+
+ return result;
+ }
+
+ /**
+ * Prepare binlog split
+ */
+ private SplitReadResult prepareBinlogSplit(
+ Map<String, Object> offsetMeta,
+ JobBaseRecordRequest baseReq) throws Exception {
+ Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta,
baseReq);
+ this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+ this.binlogReader = getBinlogSplitReader(baseReq);
+
+ LOG.info("Prepare binlog split: {}", this.binlogSplit.toString());
+
+ this.binlogReader.submitSplit(this.binlogSplit);
+ this.binlogSplitState = new MySqlBinlogSplitState(this.binlogSplit);
+
SplitReadResult result = new SplitReadResult();
- MySqlSplitState currentSplitState = null;
- MySqlSplit currentSplit = this.getCurrentSplit();
- if (currentSplit.isSnapshotSplit()) {
- currentSplitState = new
MySqlSnapshotSplitState(currentSplit.asSnapshotSplit());
+ result.setSplits(Collections.singletonList(this.binlogSplit));
+
+ Map<String, Object> statesMap = new HashMap<>();
+ statesMap.put(this.binlogSplit.splitId(), this.binlogSplitState);
+ result.setSplitStates(statesMap);
+
+ return result;
+ }
+
+ @Override
+ public Iterator<SourceRecord> pollRecords() throws InterruptedException {
+ if (!snapshotReaderContexts.isEmpty()) {
+ // Snapshot split mode
+ return pollRecordsFromSnapshotReaders();
+ } else if (binlogReader != null) {
+ // Binlog split mode
+ return pollRecordsFromBinlogReader();
} else {
- currentSplitState = new
MySqlBinlogSplitState(currentSplit.asBinlogSplit());
+ throw new RuntimeException("No active snapshot or binlog reader
available");
}
+ }
+
+ /**
+ * Poll records from multiple snapshot readers sequentially.
+ * Directly pulls data from each reader's Debezium queue without extra
queue.
+ *
+ * <p>This implementation loops through all readers in a single call.
+ * If a reader has no data, it automatically tries the next one.
+ * Each snapshot split is read only once.
+ */
+ private Iterator<SourceRecord> pollRecordsFromSnapshotReaders()
+ throws InterruptedException {
+ if (snapshotReaderContexts.isEmpty()) {
+ return Collections.emptyIterator();
+ }
+
+ // Try all remaining readers (starting from currentReaderIndex)
+ int totalReaders = snapshotReaderContexts.size();
+ int attempts = totalReaders - currentReaderIndex;
+
+ for (int i = 0; i < attempts; i++) {
+ // Check if we've processed all splits
+ if (currentReaderIndex >= totalReaders) {
+ LOG.info("All {} snapshot splits have been processed",
totalReaders);
+ return Collections.emptyIterator();
+ }
+
+ // Get current reader context
+ SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader,
MySqlSnapshotSplitState> context =
+ snapshotReaderContexts.get(currentReaderIndex);
+
+ // Poll data directly from this reader's Debezium queue
+ Iterator<SourceRecords> dataIt =
context.getReader().pollSplitRecords();
+
+ if (dataIt == null || !dataIt.hasNext()) {
+ // This reader has no data currently, move to next reader and
continue
+ LOG.debug("Split {} has no data, trying next split",
context.getSplit().splitId());
+ currentReaderIndex++;
+ continue;
+ }
+
+ // Has data, process and return
+ SourceRecords sourceRecords = dataIt.next();
- Iterator<SourceRecord> filteredIterator =
- new FilteredRecordIterator(currentSplitRecords,
currentSplitState);
+ // Move to next reader for next call
+ currentReaderIndex++;
Review Comment:
The currentReaderIndex is incremented before checking if we've received
valid data. If the iterator returns null or an empty iterator, we've already
moved to the next reader. This could cause issues in retry scenarios or if the
same reader should be polled again. Consider incrementing the index only after
successfully processing data.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -633,15 +744,16 @@ public List<String> deserialize(Map<String, String>
config, SourceRecord element
/**
* Filtered record iterator that only returns data change records,
filtering out watermark,
- * heartbeat and other events. This is a private inner class that
encapsulates record filtering
- * logic, making the main method cleaner.
+ * heartbeat and other events. This is a private static inner class that
encapsulates record
+ * filtering logic, making the main method cleaner.
*/
private class FilteredRecordIterator implements Iterator<SourceRecord> {
private final Iterator<SourceRecord> sourceIterator;
private final SourceSplitState splitState;
private SourceRecord nextRecord;
- FilteredRecordIterator(SplitRecords currentSplitRecords,
SourceSplitState splitState) {
+ FilteredRecordIterator(SplitRecords currentSplitRecords,
SourceSplitState splitState,
+ JdbcIncrementalSourceReader reader) {
Review Comment:
The FilteredRecordIterator constructor signature has changed to require a
JdbcIncrementalSourceReader parameter, but this parameter is not used in the
constructor or stored as a field. This is inconsistent with the MySQL
implementation where the parameter is not required.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +224,226 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information.
+ *
+ * <p>Snapshot split: Returns immediately after reading; otherwise,
returns after the
+ * maxInterval.
+ *
+ * <p>Binlog split: Fetches data at the maxInterval. Returns immediately
if no data is found; if
+ * found, checks if the last record is a heartbeat record. If it is,
returns immediately;
+ * otherwise, fetches again until the heartbeat deadline.
+ *
+ * <p>Heartbeat events will carry the latest offset.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
- Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
+ boolean lastMessageIsHeartbeat = false;
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ isSnapshotSplit);
+
+ // 2. poll record
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
sourceReader.pollRecords();
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+
+ // Check if should stop
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (shouldStop(
+ isSnapshotSplit,
+ scannedRows > 0,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ maxIntervalMillis,
+ timeoutReached)) {
+ break;
}
+ continue;
}
- // Check if maxInterval has been exceeded
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
- break;
+
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+
+ // Mark last message as heartbeat (only for binlog
split)
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
+
+ // If already timeout, stop immediately when heartbeat
received
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (!isSnapshotSplit && timeoutReached) {
+ LOG.info(
+ "Binlog split max interval reached and
heartbeat received, stopping data reading");
+ shouldStop = true;
+ break;
+ }
+ // Skip heartbeat messages during normal processing
+ continue;
+ }
+
+ // Process data messages
+ List<String> serializedRecords =
+
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+ if (!CollectionUtils.isEmpty(serializedRecords)) {
+ String database = writeRecordRequest.getTargetDb();
+ String table = extractTable(element);
+ for (String record : serializedRecords) {
+ scannedRows++;
+ byte[] dataBytes = record.getBytes();
+ scannedBytes += dataBytes.length;
+ batchStreamLoad.writeRecord(database, table,
dataBytes);
+ }
+ // Mark last message as data (not heartbeat)
+ lastMessageIsHeartbeat = false;
+ }
}
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ scannedRows,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+
} finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
+ // 3. Extract offset from split state
+ List<Map<String, String>> metaResponse =
extractOffsetMeta(sourceReader, readResult);
+ // 4. wait all stream load finish
+ batchStreamLoad.forceFlush();
+
+ // 5. request fe api update offset
+ String currentTaskId = batchStreamLoad.getCurrentTaskId();
+ // The offset must be reset before commitOffset to prevent the next
taskId from being create
+ // by the fe.
+ batchStreamLoad.resetTaskId();
+ batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows,
scannedBytes);
+ }
+
+ public static boolean isHeartbeatEvent(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null
+ &&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+ }
+
+ /**
+ * Determine if we should stop polling.
+ *
+ * @param isSnapshotSplit whether this is a snapshot split
+ * @param hasData whether we have received any data
+ * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+ * @param elapsedTime total elapsed time in milliseconds
+ * @param maxIntervalMillis max interval in milliseconds
+ * @param timeoutReached whether timeout is reached
+ * @return true if should stop, false if should continue
+ */
+ private boolean shouldStop(
+ boolean isSnapshotSplit,
+ boolean hasData,
+ boolean lastMessageIsHeartbeat,
+ long elapsedTime,
+ long maxIntervalMillis,
+ boolean timeoutReached) {
+
+ // 1. Snapshot split with data: if no more data in queue, stop
immediately (no need to wait
+ // for timeout)
+ // snapshot split will be written to the debezium queue all at once.
+ // multiple snapshot splits are handled in the source reader.
+ if (isSnapshotSplit && hasData) {
+ LOG.info(
+ "Snapshot split finished, no more data available. Total
elapsed: {} ms",
+ elapsedTime);
+ return true;
}
- // get offset from split state
- try {
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
- }
- // set meta for binlog event
- if (sourceReader.isBinlogSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- metaResponse = offsetRes;
- }
- } else {
- throw new RuntimeException("split state is null");
- }
+ // 2. Not timeout yet: continue waiting
+ if (!timeoutReached) {
+ return false;
+ }
- // wait all stream load finish
- batchStreamLoad.forceFlush();
+ // === Below are checks after timeout is reached ===
- // request fe api
- batchStreamLoad.commitOffset(metaResponse, scannedRows,
scannedBytes);
- } finally {
- batchStreamLoad.resetTaskId();
+ // 3. No data received after timeout: stop
+ if (!hasData) {
+ LOG.info("No data received after timeout, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 4. Snapshot split after timeout (should not reach here, but keep as
safety check)
+ if (isSnapshotSplit) {
+ LOG.info("Snapshot split timeout reached, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 5. Binlog split + last message is heartbeat: stop immediately
+ if (lastMessageIsHeartbeat) {
+ LOG.info("Binlog split timeout and last message is heartbeat,
stopping");
+ return true;
+ }
+
+ // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout
protection
+ if (elapsedTime > maxIntervalMillis +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+ LOG.warn(
+ "Binlog split heartbeat wait timeout after {} ms, force
stopping. Total elapsed: {} ms",
+ elapsedTime - maxIntervalMillis,
+ elapsedTime);
+ return true;
}
+
+ // Continue waiting for heartbeat
+ return false;
}
Review Comment:
The method shouldStop has complex nested conditions that make it difficult
to understand and maintain. Consider extracting some of the conditions into
well-named helper methods (e.g., 'shouldStopForSnapshotSplit',
'shouldStopForBinlogSplit') to improve readability and maintainability.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +224,226 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information.
+ *
+ * <p>Snapshot split: Returns immediately after reading; otherwise,
returns after the
+ * maxInterval.
+ *
+ * <p>Binlog split: Fetches data at the maxInterval. Returns immediately
if no data is found; if
+ * found, checks if the last record is a heartbeat record. If it is,
returns immediately;
+ * otherwise, fetches again until the heartbeat deadline.
+ *
+ * <p>Heartbeat events will carry the latest offset.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
- Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
+ boolean lastMessageIsHeartbeat = false;
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ isSnapshotSplit);
+
+ // 2. poll record
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
sourceReader.pollRecords();
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+
+ // Check if should stop
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (shouldStop(
+ isSnapshotSplit,
+ scannedRows > 0,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ maxIntervalMillis,
+ timeoutReached)) {
+ break;
}
+ continue;
}
- // Check if maxInterval has been exceeded
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
- break;
+
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+
+ // Mark last message as heartbeat (only for binlog
split)
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
+
+ // If already timeout, stop immediately when heartbeat
received
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (!isSnapshotSplit && timeoutReached) {
+ LOG.info(
+ "Binlog split max interval reached and
heartbeat received, stopping data reading");
+ shouldStop = true;
+ break;
+ }
+ // Skip heartbeat messages during normal processing
+ continue;
+ }
+
+ // Process data messages
+ List<String> serializedRecords =
+
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+ if (!CollectionUtils.isEmpty(serializedRecords)) {
+ String database = writeRecordRequest.getTargetDb();
+ String table = extractTable(element);
+ for (String record : serializedRecords) {
+ scannedRows++;
+ byte[] dataBytes = record.getBytes();
+ scannedBytes += dataBytes.length;
+ batchStreamLoad.writeRecord(database, table,
dataBytes);
+ }
+ // Mark last message as data (not heartbeat)
+ lastMessageIsHeartbeat = false;
+ }
}
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ scannedRows,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+
} finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
+ // 3. Extract offset from split state
+ List<Map<String, String>> metaResponse =
extractOffsetMeta(sourceReader, readResult);
+ // 4. wait all stream load finish
+ batchStreamLoad.forceFlush();
+
+ // 5. request fe api update offset
+ String currentTaskId = batchStreamLoad.getCurrentTaskId();
+ // The offset must be reset before commitOffset to prevent the next
taskId from being create
+ // by the fe.
+ batchStreamLoad.resetTaskId();
+ batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows,
scannedBytes);
+ }
+
+ public static boolean isHeartbeatEvent(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null
+ &&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+ }
+
+ /**
+ * Determine if we should stop polling.
+ *
+ * @param isSnapshotSplit whether this is a snapshot split
+ * @param hasData whether we have received any data
+ * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+ * @param elapsedTime total elapsed time in milliseconds
+ * @param maxIntervalMillis max interval in milliseconds
+ * @param timeoutReached whether timeout is reached
+ * @return true if should stop, false if should continue
+ */
+ private boolean shouldStop(
+ boolean isSnapshotSplit,
+ boolean hasData,
+ boolean lastMessageIsHeartbeat,
+ long elapsedTime,
+ long maxIntervalMillis,
+ boolean timeoutReached) {
+
+ // 1. Snapshot split with data: if no more data in queue, stop
immediately (no need to wait
+ // for timeout)
+ // snapshot split will be written to the debezium queue all at once.
+ // multiple snapshot splits are handled in the source reader.
+ if (isSnapshotSplit && hasData) {
+ LOG.info(
+ "Snapshot split finished, no more data available. Total
elapsed: {} ms",
+ elapsedTime);
+ return true;
}
- // get offset from split state
- try {
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
- }
- // set meta for binlog event
- if (sourceReader.isBinlogSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- metaResponse = offsetRes;
- }
- } else {
- throw new RuntimeException("split state is null");
- }
+ // 2. Not timeout yet: continue waiting
+ if (!timeoutReached) {
+ return false;
+ }
- // wait all stream load finish
- batchStreamLoad.forceFlush();
+ // === Below are checks after timeout is reached ===
- // request fe api
- batchStreamLoad.commitOffset(metaResponse, scannedRows,
scannedBytes);
- } finally {
- batchStreamLoad.resetTaskId();
+ // 3. No data received after timeout: stop
+ if (!hasData) {
+ LOG.info("No data received after timeout, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 4. Snapshot split after timeout (should not reach here, but keep as
safety check)
+ if (isSnapshotSplit) {
+ LOG.info("Snapshot split timeout reached, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 5. Binlog split + last message is heartbeat: stop immediately
+ if (lastMessageIsHeartbeat) {
+ LOG.info("Binlog split timeout and last message is heartbeat,
stopping");
+ return true;
+ }
+
+ // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout
protection
+ if (elapsedTime > maxIntervalMillis +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
Review Comment:
The condition 'elapsedTime > maxIntervalMillis +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3' uses a magic number (3). This
represents the timeout protection multiplier for waiting for heartbeat.
Consider extracting this as a named constant (e.g.,
HEARTBEAT_WAIT_TIMEOUT_MULTIPLIER) to improve code clarity.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -177,107 +224,226 @@ public CompletableFuture<Void>
writeRecordsAsync(WriteRecordRequest writeRecordR
executor);
}
- /** Read data from SourceReader and write it to Doris, while returning
meta information. */
+ /**
+ * Read data from SourceReader and write it to Doris, while returning meta
information.
+ *
+ * <p>Snapshot split: Returns immediately after reading; otherwise,
returns after the
+ * maxInterval.
+ *
+ * <p>Binlog split: Fetches data at the maxInterval. Returns immediately
if no data is found; if
+ * found, checks if the last record is a heartbeat record. If it is,
returns immediately;
+ * otherwise, fetches again until the heartbeat deadline.
+ *
+ * <p>Heartbeat events will carry the latest offset.
+ */
public void writeRecords(WriteRecordRequest writeRecordRequest) throws
Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
- Map<String, String> metaResponse = new HashMap<>();
long scannedRows = 0L;
long scannedBytes = 0L;
+ int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
- readResult = sourceReader.readSplitRecords(writeRecordRequest);
- batchStreamLoad =
- getOrCreateBatchStreamLoad(
- writeRecordRequest.getJobId(),
writeRecordRequest.getTargetDb());
- batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
-
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
- batchStreamLoad.setToken(writeRecordRequest.getToken());
-
- // Record start time for maxInterval check
+ // 1. submit split async
+ readResult =
sourceReader.prepareAndSubmitSplit(writeRecordRequest);
+ batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest);
+
+ boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
+ boolean shouldStop = false;
+ boolean lastMessageIsHeartbeat = false;
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId(),
+ isSnapshotSplit);
+
+ // 2. poll record
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
sourceReader.pollRecords();
- // Use iterators to read and write.
- Iterator<SourceRecord> iterator = readResult.getRecordIterator();
- while (iterator != null && iterator.hasNext()) {
- SourceRecord element = iterator.next();
- List<String> serializedRecords =
-
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
-
- if (!CollectionUtils.isEmpty(serializedRecords)) {
- String database = writeRecordRequest.getTargetDb();
- String table = extractTable(element);
- for (String record : serializedRecords) {
- scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+
+ // Check if should stop
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (shouldStop(
+ isSnapshotSplit,
+ scannedRows > 0,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ maxIntervalMillis,
+ timeoutReached)) {
+ break;
}
+ continue;
}
- // Check if maxInterval has been exceeded
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (maxIntervalMillis > 0 && elapsedTime >= maxIntervalMillis)
{
- LOG.info(
- "Max interval {} seconds reached, stopping data
reading",
- writeRecordRequest.getMaxInterval());
- break;
+
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+
+ // Check if this is a heartbeat message
+ if (isHeartbeatEvent(element)) {
+ heartbeatCount++;
+
+ // Mark last message as heartbeat (only for binlog
split)
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
+
+ // If already timeout, stop immediately when heartbeat
received
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+
+ if (!isSnapshotSplit && timeoutReached) {
+ LOG.info(
+ "Binlog split max interval reached and
heartbeat received, stopping data reading");
+ shouldStop = true;
+ break;
+ }
+ // Skip heartbeat messages during normal processing
+ continue;
+ }
+
+ // Process data messages
+ List<String> serializedRecords =
+
sourceReader.deserialize(writeRecordRequest.getConfig(), element);
+
+ if (!CollectionUtils.isEmpty(serializedRecords)) {
+ String database = writeRecordRequest.getTargetDb();
+ String table = extractTable(element);
+ for (String record : serializedRecords) {
+ scannedRows++;
+ byte[] dataBytes = record.getBytes();
+ scannedBytes += dataBytes.length;
+ batchStreamLoad.writeRecord(database, table,
dataBytes);
+ }
+ // Mark last message as data (not heartbeat)
+ lastMessageIsHeartbeat = false;
+ }
}
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ scannedRows,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+
} finally {
- if (readResult != null) {
- // The LSN in the commit is the current offset, which is the
offset from the last
- // successful write.
- // Therefore, even if a subsequent write fails, it will not
affect the commit.
- sourceReader.commitSourceOffset(
- writeRecordRequest.getJobId(), readResult.getSplit());
- }
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
- // This must be called after commitSourceOffset; otherwise,
- // PG's confirmed lsn will not proceed.
- // This operation must be performed before
batchStreamLoad.commitOffset;
- // otherwise, fe might issue the next task for this job.
- sourceReader.finishSplitRecords();
+ // 3. Extract offset from split state
+ List<Map<String, String>> metaResponse =
extractOffsetMeta(sourceReader, readResult);
+ // 4. wait all stream load finish
+ batchStreamLoad.forceFlush();
+
+ // 5. request fe api update offset
+ String currentTaskId = batchStreamLoad.getCurrentTaskId();
+ // The offset must be reset before commitOffset to prevent the next
taskId from being create
+ // by the fe.
+ batchStreamLoad.resetTaskId();
+ batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows,
scannedBytes);
+ }
+
+ public static boolean isHeartbeatEvent(SourceRecord record) {
+ Schema valueSchema = record.valueSchema();
+ return valueSchema != null
+ &&
SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
+ }
+
+ /**
+ * Determine if we should stop polling.
+ *
+ * @param isSnapshotSplit whether this is a snapshot split
+ * @param hasData whether we have received any data
+ * @param lastMessageIsHeartbeat whether the last message is a heartbeat
+ * @param elapsedTime total elapsed time in milliseconds
+ * @param maxIntervalMillis max interval in milliseconds
+ * @param timeoutReached whether timeout is reached
+ * @return true if should stop, false if should continue
+ */
+ private boolean shouldStop(
+ boolean isSnapshotSplit,
+ boolean hasData,
+ boolean lastMessageIsHeartbeat,
+ long elapsedTime,
+ long maxIntervalMillis,
+ boolean timeoutReached) {
+
+ // 1. Snapshot split with data: if no more data in queue, stop
immediately (no need to wait
+ // for timeout)
+ // snapshot split will be written to the debezium queue all at once.
+ // multiple snapshot splits are handled in the source reader.
+ if (isSnapshotSplit && hasData) {
+ LOG.info(
+ "Snapshot split finished, no more data available. Total
elapsed: {} ms",
+ elapsedTime);
+ return true;
}
- // get offset from split state
- try {
- if (readResult.getSplitState() != null) {
- // Set meta information for hw
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
- }
- // set meta for binlog event
- if (sourceReader.isBinlogSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- metaResponse = offsetRes;
- }
- } else {
- throw new RuntimeException("split state is null");
- }
+ // 2. Not timeout yet: continue waiting
+ if (!timeoutReached) {
+ return false;
+ }
- // wait all stream load finish
- batchStreamLoad.forceFlush();
+ // === Below are checks after timeout is reached ===
- // request fe api
- batchStreamLoad.commitOffset(metaResponse, scannedRows,
scannedBytes);
- } finally {
- batchStreamLoad.resetTaskId();
+ // 3. No data received after timeout: stop
+ if (!hasData) {
+ LOG.info("No data received after timeout, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 4. Snapshot split after timeout (should not reach here, but keep as
safety check)
+ if (isSnapshotSplit) {
+ LOG.info("Snapshot split timeout reached, stopping. Elapsed: {}
ms", elapsedTime);
+ return true;
+ }
+
+ // 5. Binlog split + last message is heartbeat: stop immediately
+ if (lastMessageIsHeartbeat) {
+ LOG.info("Binlog split timeout and last message is heartbeat,
stopping");
+ return true;
+ }
+
+ // 6. Binlog split + no heartbeat yet: wait for heartbeat with timeout
protection
+ if (elapsedTime > maxIntervalMillis +
Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS * 3) {
+ LOG.warn(
+ "Binlog split heartbeat wait timeout after {} ms, force
stopping. Total elapsed: {} ms",
+ elapsedTime - maxIntervalMillis,
+ elapsedTime);
+ return true;
}
+
+ // Continue waiting for heartbeat
+ return false;
}
- private DorisBatchStreamLoad getOrCreateBatchStreamLoad(Long jobId, String
targetDb) {
- return batchStreamLoadMap.computeIfAbsent(
- jobId,
- k -> {
- LOG.info("Create DorisBatchStreamLoad for jobId={}",
jobId);
- return new DorisBatchStreamLoad(jobId, targetDb);
- });
+ private synchronized DorisBatchStreamLoad getOrCreateBatchStreamLoad(
+ WriteRecordRequest writeRecordRequest) {
+ DorisBatchStreamLoad batchStreamLoad =
+ batchStreamLoadMap.computeIfAbsent(
+ writeRecordRequest.getJobId(),
+ k -> {
+ LOG.info(
+ "Create DorisBatchStreamLoad for jobId={}",
+ writeRecordRequest.getJobId());
+ return new DorisBatchStreamLoad(
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTargetDb());
+ });
+ batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
+
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
+ batchStreamLoad.setToken(writeRecordRequest.getToken());
+ return batchStreamLoad;
Review Comment:
Race condition in getOrCreateBatchStreamLoad method. The method is
synchronized but then calls setters on the returned object outside the
synchronized block. If multiple threads call this method concurrently, they may
overwrite each other's values for currentTaskId, frontendAddress, and token on
the same DorisBatchStreamLoad instance.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SplitReadResult.java:
##########
@@ -18,18 +18,40 @@
package org.apache.doris.cdcclient.source.reader;
import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.kafka.connect.source.SourceRecord;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import lombok.Data;
-/** The result of reading a split with iterator. */
+/**
+ * Container for source splits and their associated states.
+ * Supports both single split (binlog) and multiple splits (snapshot).
+ * Iteration over records for these splits is handled separately (for example
via pollRecords).
+ */
@Data
public class SplitReadResult {
- private Iterator<SourceRecord> recordIterator;
- // MySqlSplitState, SourceSplitState
- private Object splitState;
- // MySqlSplit SourceSplitBase
- private SourceSplit split;
+ // List of splits (size=1 for binlog, size>=1 for snapshot)
+ private List<SourceSplit> splits;
+
+ // Map of split states (key: splitId, value: state)
+ private Map<String, Object> splitStates;
+
+ /**
+ * Get the first split ( The types in `splits` are the same.)
+ */
+ public SourceSplit getSplit() {
+ return splits != null && !splits.isEmpty() ? splits.get(0) : null;
+ }
Review Comment:
Missing null check before accessing splits list. The method assumes splits
is not null or empty, but there's no validation. If splits is null or empty,
this will throw a NullPointerException or IndexOutOfBoundsException.
##########
fs_brokers/cdc_client/pom.xml:
##########
@@ -209,6 +209,9 @@ under the License.
<version>${spotless.version}</version>
<configuration>
<java>
+ <includes>
+
<include>src/main/java/org/apache/doris/*.java</include>
Review Comment:
The Spotless configuration now only includes specific paths under
'src/main/java/org/apache/doris/*.java' which will exclude subdirectories. This
means files in subdirectories like 'org/apache/doris/cdcclient/' will not be
formatted. The include pattern should likely be
'src/main/java/org/apache/doris/**/*.java' to include all subdirectories.
```suggestion
<include>src/main/java/org/apache/doris/**/*.java</include>
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -481,13 +494,14 @@ public void resetTaskId() {
}
/** commit offfset to frontends. */
- public void commitOffset(Map<String, String> meta, long scannedRows, long
scannedBytes) {
+ public void commitOffset(
+ String taskId, List<Map<String, String>> meta, long scannedRows,
long scannedBytes) {
try {
String url = String.format(COMMIT_URL_PATTERN, frontendAddress,
targetDb);
Map<String, Object> commitParams = new HashMap<>();
commitParams.put("offset", OBJECT_MAPPER.writeValueAsString(meta));
commitParams.put("jobId", jobId);
- commitParams.put("taskId", currentTaskId);
+ commitParams.put("taskId", taskId);
Review Comment:
The commitOffset method signature has changed to include taskId as a
parameter, but the currentTaskId field is still being set via setter. This
creates confusion about which taskId should be used. Consider either using only
the parameter or only the field, not both. The implementation should be
clarified to show why both are needed.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -162,42 +163,54 @@ public void writeRecord(String database, String table,
byte[] record) {
lock.unlock();
}
}
+
+ // Single table flush according to the STREAM_LOAD_MAX_BYTES
+ if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES) {
+ bufferFullFlush(bufferKey);
+ }
+ }
+
+ public void bufferFullFlush(String bufferKey) {
Review Comment:
Typo in variable name 'buffferKey' should be 'bufferKey'.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -306,7 +319,7 @@ public String getTimeoutReason() {
log.warn("Failed to get task timeout reason, response: {}",
response);
}
} catch (ExecutionException | InterruptedException ex) {
- log.error("Send get task fail reason request failed: ", ex);
+ log.error("Send get fail reason request failed: ", ex);
Review Comment:
The log statement at line 322 has a typo: 'Send get fail reason request
failed' should be 'Send get task fail reason request failed' to match the
context and be more descriptive.
```suggestion
log.error("Send get task fail reason request failed: ", ex);
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -65,9 +65,9 @@ public Object fetchSplits(@RequestBody
FetchTableSplitsRequest ftsReq) {
SourceReader reader = Env.getCurrentEnv().getReader(ftsReq);
List splits = reader.getSourceSplits(ftsReq);
return RestResponse.success(splits);
- } catch (IllegalArgumentException ex) {
+ } catch (Exception ex) {
LOG.error("Failed to fetch splits, jobId={}", ftsReq.getJobId(),
ex);
- return RestResponse.internalError(ex.getMessage());
+ return
RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex));
}
Review Comment:
Exception handling has been broadened from catching IllegalArgumentException
to catching all Exceptions. While this may prevent crashes, it could hide
important errors. Consider logging at error level and differentiating between
expected validation errors and unexpected exceptions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]