github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3427813608
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -90,20 +126,55 @@ public SourceReader getReaderAndClaim(JobBaseConfig
jobConfig, String taskId) {
DataSource ds = resolveDataSource(jobConfig.getDataSource());
String jobId = jobConfig.getJobId();
Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+ SourceReader staleReader = null;
+ JobBaseConfig staleConfig = null;
+ SourceReader reader;
lock.lock();
try {
JobContext context = jobContexts.get(jobId);
+ if (context != null
+ && jobConfig instanceof WriteRecordRequest
+ && ((WriteRecordRequest) jobConfig).isRebuildReader()) {
+ // FE declared the previous task abnormal: swap in a fresh
reader instance so the
+ // old task's thread can never reach the new fetcher.
+ LOG.info(
+ "Rebuild reader for job {} on FE request, discard
current instance", jobId);
+ jobContexts.remove(jobId);
+ staleReader = context.reader;
+ staleConfig = context.jobConfig != null ? context.jobConfig :
jobConfig;
+ context = null;
+ }
if (context == null) {
LOG.info("Creating new reader for job {}, dataSource {}",
jobId, ds);
context = new JobContext(jobId, ds, jobConfig.getConfig());
context.initializeReader();
jobContexts.put(jobId, context);
}
context.ownerTaskId = taskId;
- return context.getReader(ds);
+ context.jobConfig = jobConfig;
+ if (jobConfig instanceof WriteRecordRequest) {
+ context.maxIntervalMs = ((WriteRecordRequest)
jobConfig).getMaxInterval() * 1000;
+ }
+ context.lastAliveTime = System.currentTimeMillis();
+ reader = context.getReader(ds);
} finally {
lock.unlock();
}
+ if (staleReader != null) {
+ // free the engine/slot connection before the caller submits the
new fetcher
+ try {
+ staleReader.release(staleConfig);
Review Comment:
This discards the old `SourceReader` after calling `release()`, but
`release()` only reaches `finishSplitRecords()`. Both JDBC reader
implementations create a fixed `pollExecutor` in `initialize()`, and neither
`finishSplitRecords()` nor `release()` shuts that executor down. A concrete
path is: FE restart or manual pause sets `rebuildReader`, the next
`/api/writeRecords` enters this block, the old context is removed, `release()`
returns, and the next reader creates another `snapshot-reader-*` executor while
the old daemon threads remain alive forever. The idle reaper uses the same
discard pattern. Please add a local destroy/close path that stops the engine
while keeping the PG slot/source resources, and shuts down the per-reader
executor before the discarded reader becomes unreachable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]