This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f8aed3b5547 branch-4.0: [Fix](StreamingJob) fix postgres consumer data
in multi backend #59798 (#59841)
f8aed3b5547 is described below
commit f8aed3b55472658823d123d05aef2e142f859b6e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 14 10:01:14 2026 +0800
branch-4.0: [Fix](StreamingJob) fix postgres consumer data in multi backend
#59798 (#59841)
Cherry-picked from #59798
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingMultiTblTask.java | 3 +-
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 55 ++++++++++++++++++++++
.../cdcclient/controller/ClientController.java | 14 ++++++
.../source/reader/JdbcIncrementalSourceReader.java | 5 +-
.../source/reader/mysql/MySqlSourceReader.java | 5 +-
.../reader/postgres/PostgresSourceReader.java | 3 ++
.../cdc/test_streaming_mysql_job_priv.groovy | 22 ++++++++-
.../cdc/test_streaming_postgres_job_priv.groovy | 52 +++++++++++---------
8 files changed, 133 insertions(+), 26 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 08f1bb5ccaf..1f955f0a2c3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -108,12 +108,13 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
- log.info("start to run streaming multi task, offset is {}",
runningOffset.toString());
sendWriteRequest();
}
private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
+ log.info("start to run streaming multi task {} in backend {}/{},
offset is {}",
+ taskId, backend.getId(), backend.getHost(),
runningOffset.toString());
this.runningBackendId = backend.getId();
WriteRecordRequest params = buildRequestParams();
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 560887d61ad..0c114ae8e64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -18,6 +18,7 @@
package org.apache.doris.job.offset.jdbc;
import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
@@ -430,6 +431,10 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
this.remainingSplits = tableSplits.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
+ } else {
+ // The source reader is automatically initialized when the split
is obtained.
+ // In latest mode, a separate init is required.init source reader
+ initSourceReader();
}
}
@@ -490,6 +495,56 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode);
}
+ /**
+ * Source reader needs to be initialized here.
+ * For example, PG slots need to be created first;
+ * otherwise, conflicts will occur in multi-backends scenarios.
+ */
+ private void initSourceReader() throws JobException {
+ Backend backend = StreamingJobUtils.selectBackend();
+ JobBaseConfig requestParams = new JobBaseConfig(getJobId(),
sourceType.name(), sourceProperties);
+ InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/initReader")
+ .setParams(new Gson().toJson(requestParams)).build();
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
+ InternalService.PRequestCdcClientResult result = null;
+ try {
+ Future<PRequestCdcClientResult> future =
+
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+ result = future.get();
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ log.warn("Failed to init job {} reader, {}", getJobId(),
result.getStatus().getErrorMsgs(0));
+ throw new JobException(
+ "Failed to init source reader," +
result.getStatus().getErrorMsgs(0) + ", response: "
+ + result.getResponse());
+ }
+ String response = result.getResponse();
+ try {
+ ResponseBody<String> responseObj = objectMapper.readValue(
+ response,
+ new TypeReference<ResponseBody<String>>() {
+ }
+ );
+ if (responseObj.getCode() == RestApiStatusCode.OK.code) {
+ log.info("Init {} source reader successfully, response:
{}", getJobId(), responseObj.getData());
+ return;
+ } else {
+ throw new JobException("Failed to init source reader,
error: " + responseObj.getData());
+ }
+ } catch (JobException jobex) {
+ log.warn("Failed to init {} source reader, {}", getJobId(),
response);
+ throw new JobException(jobex.getMessage());
+ } catch (Exception e) {
+ log.warn("Failed to init {} source reader, {}", getJobId(),
response);
+ throw new JobException("Failed to init source reader, cause "
+ e.getMessage());
+ }
+ } catch (ExecutionException | InterruptedException ex) {
+ log.warn("init source reader: ", ex);
+ throw new JobException(ex);
+ }
+ }
+
public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 2ca45ad2474..b3302e2c785 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -27,6 +27,8 @@ import
org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
import java.util.List;
import org.slf4j.Logger;
@@ -44,6 +46,18 @@ public class ClientController {
@Autowired private PipelineCoordinator pipelineCoordinator;
+ /** init source reader */
+ @RequestMapping(path = "/api/initReader", method = RequestMethod.POST)
+ public Object initSourceReader(@RequestBody JobBaseConfig jobConfig) {
+ try {
+ SourceReader reader = Env.getCurrentEnv().getReader(jobConfig);
+ return RestResponse.success("Source reader initialized
successfully");
+ } catch (Exception ex) {
+ LOG.error("Failed to create reader, jobId={}",
jobConfig.getJobId(), ex);
+ return
RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex));
+ }
+ }
+
/** Fetch source splits for snapshot */
@RequestMapping(path = "/api/fetchSplits", method = RequestMethod.POST)
public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 541e3354828..70ab3961acc 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -172,7 +172,7 @@ public abstract class JdbcIncrementalSourceReader
implements SourceReader {
// build split
Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
split = splitFlag.f0;
- closeBinlogReader();
+ // closeBinlogReader();
currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
this.setCurrentSplitRecords(currentSplitRecords);
this.setCurrentSplit(split);
@@ -616,6 +616,9 @@ public abstract class JdbcIncrementalSourceReader
implements SourceReader {
@Override
public void finishSplitRecords() {
this.setCurrentSplitRecords(null);
+ // Close after each read, the binlog client will occupy the connection.
+ closeBinlogReader();
+ this.setCurrentReader(null);
}
private Map<TableId, TableChanges.TableChange>
getTableSchemas(JobBaseConfig config) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 27fbf3be88b..a3f14a953b6 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -188,7 +188,7 @@ public class MySqlSourceReader implements SourceReader {
Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
split = splitFlag.f0;
// reset binlog reader
- closeBinlogReader();
+ // closeBinlogReader();
currentSplitRecords = pollSplitRecordsWithSplit(split,
baseReq);
this.setCurrentSplitRecords(currentSplitRecords);
this.setCurrentSplit(split);
@@ -718,6 +718,9 @@ public class MySqlSourceReader implements SourceReader {
@Override
public void finishSplitRecords() {
this.setCurrentSplitRecords(null);
+ // Close after each read, the binlog client will occupy the connection.
+ closeBinlogReader();
+ this.setCurrentReader(null);
}
@Override
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 52c3674444b..53b648bf38a 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -107,6 +107,9 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
postgresDialect.getSlotName(),
postgresDialect.getPluginName());
// skip creating the replication slot when the slot exists.
if (slotInfo != null) {
+ LOG.info(
+ "The replication slot {} already exists, skip creating
it.",
+ postgresDialect.getSlotName());
return;
}
PostgresReplicationConnection replicationConnection =
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index 23f3ce7f679..d16bc57e73e 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv",
"p0,external,mysql,external_docker,extern
sql """FLUSH PRIVILEGES"""
}
- sleep(30000)
+ def jobSucceedTaskCnt = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt)
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobStatus = sql """ select status,
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobStatus: " + jobStatus)
+ // check job status running and increase a success task
+ jobStatus.size() == 1 && 'RUNNING' ==
jobStatus.get(0).get(0) && jobStatus.get(0).get(1) >
jobSucceedTaskCnt.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
// check incremental data
qt_select """ SELECT * FROM ${tableName} order by name asc """
-
sql """DROP USER IF EXISTS '${user}'"""
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 682e575596e..9c0cd6a464c 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -68,8 +68,9 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
sql """GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA ${pgSchema} TO
${newPgUser}"""
}
- // create job by new user
- sql """CREATE JOB ${jobName}
+ test {
+ // create job by new user
+ sql """CREATE JOB ${jobName}
ON STREAMING
FROM POSTGRES (
"jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
@@ -86,24 +87,7 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
"table.create.properties.replication_num" = "1"
)
"""
-
- // check job running
- try {
- Awaitility.await().atMost(300, SECONDS)
- .pollInterval(1, SECONDS).until(
- {
- def jobStatus = sql """ select status, ErrorMsg from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
- log.info("jobStatus: " + jobStatus)
- // check job status
- jobStatus.size() == 1 && 'PAUSED' ==
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch
meta")
- }
- )
- } catch (Exception ex){
- def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
- def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
- log.info("show job: " + showjob)
- log.info("show task: " + showtask)
- throw ex;
+ exception "Failed to init source reader"
}
// grant replication to user
@@ -112,6 +96,25 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
}
+ // create job by new user
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${newPgUser}",
+ "password" = "${newPgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${tableName}",
+ "offset" = "latest"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
Awaitility.await().atMost(300, SECONDS)
.pollInterval(3, SECONDS).until(
{
@@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age)
VALUES ('Doris',18);"""
}
- sleep(30000)
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(3, SECONDS).until(
+ {
+ def jobSucceedTaskCount = sql """select SucceedTaskCount
from jobs("type"="insert") where Name='${jobName}'"""
+ log.info("jobSucceedTaskCount: " + jobSucceedTaskCount)
+ jobSucceedTaskCount.size() == 1 &&
jobSucceedTaskCount.get(0).get(0) >= '2'
+ }
+ )
// check incremental data
qt_select """ SELECT * FROM ${tableName} order by name asc """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]