This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f8aed3b5547 branch-4.0: [Fix](StreamingJob) fix postgres consumer data 
in multi backend #59798 (#59841)
f8aed3b5547 is described below

commit f8aed3b55472658823d123d05aef2e142f859b6e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 14 10:01:14 2026 +0800

    branch-4.0: [Fix](StreamingJob) fix postgres consumer data in multi backend 
#59798 (#59841)
    
    Cherry-picked from #59798
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingMultiTblTask.java    |  3 +-
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  | 55 ++++++++++++++++++++++
 .../cdcclient/controller/ClientController.java     | 14 ++++++
 .../source/reader/JdbcIncrementalSourceReader.java |  5 +-
 .../source/reader/mysql/MySqlSourceReader.java     |  5 +-
 .../reader/postgres/PostgresSourceReader.java      |  3 ++
 .../cdc/test_streaming_mysql_job_priv.groovy       | 22 ++++++++-
 .../cdc/test_streaming_postgres_job_priv.groovy    | 52 +++++++++++---------
 8 files changed, 133 insertions(+), 26 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 08f1bb5ccaf..1f955f0a2c3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -108,12 +108,13 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
             log.info("task has been canceled, task id is {}", getTaskId());
             return;
         }
-        log.info("start to run streaming multi task, offset is {}", 
runningOffset.toString());
         sendWriteRequest();
     }
 
     private void sendWriteRequest() throws JobException {
         Backend backend = StreamingJobUtils.selectBackend();
+        log.info("start to run streaming multi task {} in backend {}/{}, 
offset is {}",
+                taskId, backend.getId(), backend.getHost(), 
runningOffset.toString());
         this.runningBackendId = backend.getId();
         WriteRecordRequest params = buildRequestParams();
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 560887d61ad..0c114ae8e64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -18,6 +18,7 @@
 package org.apache.doris.job.offset.jdbc;
 
 import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.httpv2.rest.RestApiStatusCode;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.CompareOffsetRequest;
 import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -430,6 +431,10 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
             this.remainingSplits = tableSplits.values().stream()
                     .flatMap(List::stream)
                     .collect(Collectors.toList());
+        } else {
+            // The source reader is automatically initialized when the split 
is obtained.
+            // In latest mode, a separate init is required.init source reader
+            initSourceReader();
         }
     }
 
@@ -490,6 +495,56 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode);
     }
 
+    /**
+     * Source reader needs to be initialized here.
+     * For example, PG slots need to be created first;
+     * otherwise, conflicts will occur in multi-backends scenarios.
+     */
+    private void initSourceReader() throws JobException {
+        Backend backend = StreamingJobUtils.selectBackend();
+        JobBaseConfig requestParams = new JobBaseConfig(getJobId(), 
sourceType.name(), sourceProperties);
+        InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
+                .setApi("/api/initReader")
+                .setParams(new Gson().toJson(requestParams)).build();
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
+        InternalService.PRequestCdcClientResult result = null;
+        try {
+            Future<PRequestCdcClientResult> future =
+                    
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+            result = future.get();
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                log.warn("Failed to init job {} reader, {}", getJobId(), 
result.getStatus().getErrorMsgs(0));
+                throw new JobException(
+                        "Failed to init source reader," + 
result.getStatus().getErrorMsgs(0) + ", response: "
+                                + result.getResponse());
+            }
+            String response = result.getResponse();
+            try {
+                ResponseBody<String> responseObj = objectMapper.readValue(
+                        response,
+                        new TypeReference<ResponseBody<String>>() {
+                        }
+                );
+                if (responseObj.getCode() == RestApiStatusCode.OK.code) {
+                    log.info("Init {} source reader successfully, response: 
{}", getJobId(), responseObj.getData());
+                    return;
+                } else {
+                    throw new JobException("Failed to init source reader, 
error: " + responseObj.getData());
+                }
+            } catch (JobException jobex) {
+                log.warn("Failed to init {} source reader, {}", getJobId(), 
response);
+                throw new JobException(jobex.getMessage());
+            } catch (Exception e) {
+                log.warn("Failed to init {} source reader, {}", getJobId(), 
response);
+                throw new JobException("Failed to init source reader, cause " 
+ e.getMessage());
+            }
+        } catch (ExecutionException | InterruptedException ex) {
+            log.warn("init source reader: ", ex);
+            throw new JobException(ex);
+        }
+    }
+
     public void cleanMeta(Long jobId) throws JobException {
         // clean meta table
         StreamingJobUtils.deleteJobMeta(jobId);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 2ca45ad2474..b3302e2c785 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -27,6 +27,8 @@ import 
org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
 import org.apache.doris.job.cdc.request.JobBaseConfig;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -44,6 +46,18 @@ public class ClientController {
 
     @Autowired private PipelineCoordinator pipelineCoordinator;
 
+    /** init source reader */
+    @RequestMapping(path = "/api/initReader", method = RequestMethod.POST)
+    public Object initSourceReader(@RequestBody JobBaseConfig jobConfig) {
+        try {
+            SourceReader reader = Env.getCurrentEnv().getReader(jobConfig);
+            return RestResponse.success("Source reader initialized 
successfully");
+        } catch (Exception ex) {
+            LOG.error("Failed to create reader, jobId={}", 
jobConfig.getJobId(), ex);
+            return 
RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex));
+        }
+    }
+
     /** Fetch source splits for snapshot */
     @RequestMapping(path = "/api/fetchSplits", method = RequestMethod.POST)
     public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 541e3354828..70ab3961acc 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -172,7 +172,7 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
                 // build split
                 Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
                 split = splitFlag.f0;
-                closeBinlogReader();
+                // closeBinlogReader();
                 currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
                 this.setCurrentSplitRecords(currentSplitRecords);
                 this.setCurrentSplit(split);
@@ -616,6 +616,9 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
     @Override
     public void finishSplitRecords() {
         this.setCurrentSplitRecords(null);
+        // Close after each read, the binlog client will occupy the connection.
+        closeBinlogReader();
+        this.setCurrentReader(null);
     }
 
     private Map<TableId, TableChanges.TableChange> 
getTableSchemas(JobBaseConfig config) {
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 27fbf3be88b..a3f14a953b6 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -188,7 +188,7 @@ public class MySqlSourceReader implements SourceReader {
                 Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
                 split = splitFlag.f0;
                 // reset binlog reader
-                closeBinlogReader();
+                // closeBinlogReader();
                 currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
                 this.setCurrentSplitRecords(currentSplitRecords);
                 this.setCurrentSplit(split);
@@ -718,6 +718,9 @@ public class MySqlSourceReader implements SourceReader {
     @Override
     public void finishSplitRecords() {
         this.setCurrentSplitRecords(null);
+        // Close after each read, the binlog client will occupy the connection.
+        closeBinlogReader();
+        this.setCurrentReader(null);
     }
 
     @Override
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 52c3674444b..53b648bf38a 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -107,6 +107,9 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
                             postgresDialect.getSlotName(), 
postgresDialect.getPluginName());
             // skip creating the replication slot when the slot exists.
             if (slotInfo != null) {
+                LOG.info(
+                        "The replication slot {} already exists, skip creating 
it.",
+                        postgresDialect.getSlotName());
                 return;
             }
             PostgresReplicationConnection replicationConnection =
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index 23f3ce7f679..d16bc57e73e 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,extern
             sql """FLUSH PRIVILEGES"""
         }
 
-        sleep(30000)
+        def jobSucceedTaskCnt = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+        log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt)
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobStatus = sql """ select status, 
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobStatus: " + jobStatus)
+                        // check job status running and increase a success task
+                        jobStatus.size() == 1 && 'RUNNING' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1) > 
jobSucceedTaskCnt.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
 
         // check incremental data
         qt_select """ SELECT * FROM ${tableName} order by name asc """
 
-
         sql """DROP USER IF EXISTS '${user}'"""
         sql """
         DROP JOB IF EXISTS where jobname =  '${jobName}'
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 682e575596e..9c0cd6a464c 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -68,8 +68,9 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
             sql """GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA ${pgSchema} TO 
${newPgUser}"""
         }
 
-        // create job by new user
-        sql """CREATE JOB ${jobName}
+        test {
+            // create job by new user
+            sql """CREATE JOB ${jobName}
                 ON STREAMING
                 FROM POSTGRES (
                     "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
@@ -86,24 +87,7 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
                   "table.create.properties.replication_num" = "1"
                 )
             """
-
-        // check job running
-        try {
-            Awaitility.await().atMost(300, SECONDS)
-                    .pollInterval(1, SECONDS).until(
-                    {
-                        def jobStatus = sql """ select status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
-                        log.info("jobStatus: " + jobStatus)
-                        // check job status
-                        jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch 
meta")
-                    }
-            )
-        } catch (Exception ex){
-            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
-            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
-            log.info("show job: " + showjob)
-            log.info("show task: " + showtask)
-            throw ex;
+            exception "Failed to init source reader"
         }
 
         // grant replication to user
@@ -112,6 +96,25 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
         }
 
 
+        // create job by new user
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM POSTGRES (
+                    "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "org.postgresql.Driver",
+                    "user" = "${newPgUser}",
+                    "password" = "${newPgPassword}",
+                    "database" = "${pgDB}",
+                    "schema" = "${pgSchema}",
+                    "include_tables" = "${tableName}", 
+                    "offset" = "latest"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
         Awaitility.await().atMost(300, SECONDS)
                 .pollInterval(3, SECONDS).until(
                 {
@@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
             sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) 
VALUES ('Doris',18);"""
         }
 
-        sleep(30000)
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(3, SECONDS).until(
+                {
+                    def jobSucceedTaskCount = sql """select SucceedTaskCount 
from jobs("type"="insert") where Name='${jobName}'"""
+                    log.info("jobSucceedTaskCount: " + jobSucceedTaskCount)
+                    jobSucceedTaskCount.size() == 1 && 
jobSucceedTaskCount.get(0).get(0) >= '2'
+                }
+        )
 
         // check incremental data
         qt_select """ SELECT * FROM ${tableName} order by name asc """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to