This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new fe860299e7c branch-4.1: [Feature](streamjob) Streaming job support
cdc_stream TVF #61826 (#61970)
fe860299e7c is described below
commit fe860299e7c4e514f6d61661662732413551c448
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 1 13:30:16 2026 +0800
branch-4.1: [Feature](streamjob) Streaming job support cdc_stream TVF
#61826 (#61970)
Cherry-picked from #61826
Co-authored-by: wudi <[email protected]>
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 1 +
.../insert/streaming/AbstractStreamingTask.java | 11 +
.../insert/streaming/StreamingInsertJob.java | 50 ++-
.../insert/streaming/StreamingInsertTask.java | 11 +-
.../insert/streaming/StreamingMultiTblTask.java | 2 +
.../doris/job/offset/SourceOffsetProvider.java | 42 +-
.../job/offset/SourceOffsetProviderFactory.java | 2 +
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 42 +-
.../offset/jdbc/JdbcTvfSourceOffsetProvider.java | 425 +++++++++++++++++++++
.../job/offset/s3/S3SourceOffsetProvider.java | 3 +-
.../trees/plans/commands/AlterJobCommand.java | 16 +
.../java/org/apache/doris/planner/ScanNode.java | 4 +
.../main/java/org/apache/doris/qe/Coordinator.java | 12 +
.../CdcStreamTableValuedFunction.java | 35 +-
.../cdcclient/controller/ClientController.java | 2 +-
.../cdcclient/service/PipelineCoordinator.java | 95 +++--
.../source/reader/mysql/MySqlSourceReader.java | 17 +
.../reader/postgres/PostgresSourceReader.java | 4 +
.../src/main/resources/application.properties | 3 +-
.../tvf/test_streaming_job_cdc_stream_mysql.out | 11 +
.../tvf/test_streaming_job_cdc_stream_postgres.out | 11 +
...g_job_cdc_stream_postgres_latest_alter_cred.out | 11 +
...eaming_job_cdc_stream_postgres_pause_resume.out | 13 +
.../tvf/test_streaming_job_cdc_stream_mysql.groovy | 128 +++++++
.../test_streaming_job_cdc_stream_postgres.groovy | 131 +++++++
...ob_cdc_stream_postgres_latest_alter_cred.groovy | 249 ++++++++++++
...ing_job_cdc_stream_postgres_pause_resume.groovy | 238 ++++++++++++
27 files changed, 1497 insertions(+), 72 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 9d2d5034eab..d1efc0b4c88 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -35,6 +35,7 @@ public class DataSourceConfigKeys {
public static final String OFFSET_LATEST = "latest";
public static final String OFFSET_SNAPSHOT = "snapshot";
public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
+ public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
public static final String SSL_MODE = "ssl_mode";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
index 2618d8a122c..25da9ee90a6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
@@ -33,6 +33,8 @@ import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@Log4j2
@@ -68,6 +70,15 @@ public abstract class AbstractStreamingTask {
public abstract void run() throws JobException;
+ /**
+ * Returns the IDs of backends that ran the scan node for this task.
+ * Subclasses backed by a TVF query (e.g. StreamingInsertTask) override
this
+ * to return the actual scan backend IDs from the coordinator.
+ */
+ public List<Long> getScanBackendIds() {
+ return Collections.emptyList();
+ }
+
public abstract boolean onSuccess() throws JobException;
public abstract void closeOrReleaseResources();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c999b9d99e3..cc908a5aa1c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -51,6 +51,7 @@ import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
+import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
@@ -72,6 +73,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.transaction.TransactionException;
@@ -300,8 +302,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.tvfType = currentTvf.getFunctionName();
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
- // validate offset props
- if (jobProperties.getOffsetProperty() != null) {
+ this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
+ this.offsetProvider.initOnCreate();
+ // validate offset props, only for s3 cause s3 tvf no offset prop
+ if (jobProperties.getOffsetProperty() != null
+ && S3TableValuedFunction.NAME.equalsIgnoreCase(tvfType)) {
Offset offset =
validateOffset(jobProperties.getOffsetProperty());
this.offsetProvider.updateOffset(offset);
}
@@ -536,6 +541,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (originTvfProps == null) {
this.originTvfProps =
getCurrentTvf().getProperties().getMap();
}
+ // when fe restart, offsetProvider.jobId may be null
+ offsetProvider.ensureInitialized(getJobId(), originTvfProps);
offsetProvider.fetchRemoteMeta(originTvfProps);
} else {
offsetProvider.fetchRemoteMeta(new HashMap<>());
@@ -653,7 +660,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() +
attachment.getLoadBytes());
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() +
attachment.getNumFiles());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() +
attachment.getFileBytes());
-
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+ Offset endOffset =
offsetProvider.deserializeOffset(attachment.getOffset());
+ offsetProvider.updateOffset(endOffset);
+ if (!isReplay) {
+ offsetProvider.onTaskCommitted(attachment.getScannedRows(),
attachment.getLoadBytes());
+ if (runningStreamTask != null) {
+
offsetProvider.applyEndOffsetToTask(runningStreamTask.getRunningOffset(),
endOffset);
+ }
+ }
//update metric
if (MetricRepo.isInit && !isReplay) {
@@ -757,7 +771,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
*/
private void modifyPropertiesInternal(Map<String, String> inputProperties)
throws AnalysisException, JobException {
StreamingJobProperties inputStreamProps = new
StreamingJobProperties(inputProperties);
- if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) &&
this.tvfType != null) {
+ if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())
+ && S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) {
Offset offset =
validateOffset(inputStreamProps.getOffsetProperty());
this.offsetProvider.updateOffset(offset);
if (Config.isCloudMode()) {
@@ -1016,6 +1031,17 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
LoadJob loadJob = loadJobs.get(0);
LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+ String offsetJson = offsetProvider.getCommitOffsetJson(
+ runningStreamTask.getRunningOffset(),
+ runningStreamTask.getTaskId(),
+ runningStreamTask.getScanBackendIds());
+
+
+ if (StringUtils.isBlank(offsetJson)) {
+ throw new TransactionException("Cannot find offset for
attachment, load job id is "
+ + runningStreamTask.getTaskId());
+ }
txnState.setTxnCommitAttachment(new
StreamingTaskTxnCommitAttachment(
getJobId(),
runningStreamTask.getTaskId(),
@@ -1023,7 +1049,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
-
runningStreamTask.getRunningOffset().toSerializedJson()));
+ offsetJson));
} finally {
if (shouldReleaseLock) {
writeUnlock();
@@ -1186,10 +1212,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
checkDataQuality(offsetRequest);
updateNoTxnJobStatisticAndOffset(offsetRequest);
- if (offsetRequest.getScannedRows() == 0 &&
offsetRequest.getLoadBytes() == 0) {
- JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider)
offsetProvider;
- op.setHasMoreData(false);
- }
+ offsetProvider.onTaskCommitted(offsetRequest.getScannedRows(),
offsetRequest.getLoadBytes());
if (offsetRequest.getTableSchemas() != null) {
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider)
offsetProvider;
op.setTableSchemas(offsetRequest.getTableSchemas());
@@ -1278,6 +1301,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
*/
public void cleanup() throws JobException {
log.info("cleanup streaming job {}", getJobId());
+
// s3 tvf clean offset
if (tvfType != null && Config.isCloudMode()) {
Cloud.DeleteStreamingJobResponse resp = null;
@@ -1298,6 +1322,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ // For TVF path, provider fields may be null after FE restart
+ if (this.offsetProvider instanceof JdbcTvfSourceOffsetProvider) {
+ if (originTvfProps == null) {
+ this.originTvfProps = getCurrentTvf().getProperties().getMap();
+ }
+ offsetProvider.ensureInitialized(getJobId(), originTvfProps);
+ }
+
if (this.offsetProvider instanceof JdbcSourceOffsetProvider) {
// jdbc clean chunk meta table
((JdbcSourceOffsetProvider)
this.offsetProvider).cleanMeta(getJobId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 635c639256c..54832eb6fb1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -43,6 +43,7 @@ import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -98,7 +99,7 @@ public class StreamingInsertTask extends
AbstractStreamingTask {
if (!baseCommand.getParsedPlan().isPresent()) {
throw new JobException("Can not get Parsed plan");
}
- this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand,
runningOffset);
+ this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand,
runningOffset, getTaskId());
this.taskCommand.setLabelName(Optional.of(labelName));
this.stmtExecutor = new StmtExecutor(ctx, new
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
}
@@ -126,6 +127,14 @@ public class StreamingInsertTask extends
AbstractStreamingTask {
}
}
+ @Override
+ public List<Long> getScanBackendIds() {
+ if (stmtExecutor != null && stmtExecutor.getCoord() != null) {
+ return stmtExecutor.getCoord().getScanBackendIds();
+ }
+ return Collections.emptyList();
+ }
+
@Override
public boolean onSuccess() throws JobException {
if (getIsCanceled().get()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 2c9fbf6fe17..c35c0a27ac2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -192,6 +192,8 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
Map<String, String> props = generateStreamLoadProps();
request.setStreamLoadProps(props);
+ //`meta` refers to the data synchronized by the job in this instance,
+ // while `sourceProperties.offset` is the data entered by the user.
Map<String, Object> splitMeta = offset.generateMeta();
Preconditions.checkArgument(!splitMeta.isEmpty(), "split meta is
empty");
request.setMeta(splitMeta);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 16fb2394fe3..33a24378704 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -22,6 +22,7 @@ import
org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import java.util.List;
import java.util.Map;
/**
@@ -36,6 +37,20 @@ public interface SourceOffsetProvider {
*/
String getSourceType();
+ /**
+ * Initialize the offset provider with job ID and original TVF properties.
+ * Only sets in-memory fields; safe to call on both fresh start and FE
restart.
+ * May perform remote calls (e.g. fetching snapshot splits), so throws
JobException.
+ */
+ default void ensureInitialized(Long jobId, Map<String, String>
originTvfProps) throws JobException {}
+
+ /**
+ * Performs one-time initialization that must run only on fresh job
creation, not on FE restart.
+ * For example, fetching and persisting snapshot splits to the meta table.
+ * Default: no-op (most providers need no extra setup).
+ */
+ default void initOnCreate() throws JobException {}
+
/**
* Get next offset to consume
*
@@ -59,11 +74,12 @@ public interface SourceOffsetProvider {
/**
* Rewrite the TVF parameters in the SQL based on the current offset.
+ * Only implemented by TVF-based providers (e.g. S3, cdc_stream).
*
* @param nextOffset
* @return rewritten InsertIntoTableCommand
*/
- InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset nextOffset);
+ InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset nextOffset, long taskId);
/**
* Update the offset of the source.
@@ -110,6 +126,30 @@ public interface SourceOffsetProvider {
return null;
}
+ /**
+ * Returns the serialized JSON offset to store in txn commit attachment.
+ * Default: serialize running offset directly (e.g. S3 path).
+ * CDC stream TVF overrides to pull actual end offset from BE after
fetchRecordStream completes.
+ * scanBackendIds: IDs of the BEs that ran the TVF scan node, used to
locate taskOffsetCache.
+ */
+ default String getCommitOffsetJson(Offset runningOffset, long taskId,
List<Long> scanBackendIds) {
+ return runningOffset.toSerializedJson();
+ }
+
+ /**
+ * Called after each task is committed. Providers that track data
availability
+ * (e.g. JDBC binlog) can use this to update internal state such as
hasMoreData.
+ * Default: no-op.
+ */
+ default void onTaskCommitted(long scannedRows, long loadBytes) {}
+
+ /**
+ * Applies the end offset from a committed task back onto the running
offset object
+ * in-place, so that showRange() can display the full [start, end]
interval.
+ * Default: no-op (only meaningful for JDBC providers).
+ */
+ default void applyEndOffsetToTask(Offset runningOffset, Offset endOffset)
{}
+
/**
* Returns true if the provider has reached a natural completion point
* and the job should be marked as FINISHED.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
index adc49249eb1..30f9d0edd57 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
@@ -18,6 +18,7 @@
package org.apache.doris.job.offset;
import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
import org.apache.doris.job.offset.s3.S3SourceOffsetProvider;
import lombok.extern.log4j.Log4j2;
@@ -31,6 +32,7 @@ public class SourceOffsetProviderFactory {
static {
map.put("s3", S3SourceOffsetProvider.class);
+ map.put("cdc_stream", JdbcTvfSourceOffsetProvider.class);
}
public static SourceOffsetProvider createSourceOffsetProvider(String
sourceType) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 964659e79f4..245429ac357 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -70,10 +70,10 @@ import java.util.stream.Collectors;
public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
public static final String SPLIT_ID = "splitId";
private static final ObjectMapper objectMapper = new ObjectMapper();
- private final int snapshotParallelism;
- private Long jobId;
- private DataSourceType sourceType;
- private Map<String, String> sourceProperties = new HashMap<>();
+ protected int snapshotParallelism = 1;
+ protected Long jobId;
+ protected DataSourceType sourceType;
+ protected Map<String, String> sourceProperties = new HashMap<>();
List<SnapshotSplit> remainingSplits = new ArrayList<>();
List<SnapshotSplit> finishedSplits = new ArrayList<>();
@@ -92,6 +92,16 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
volatile boolean hasMoreData = true;
+ /**
+ * No-arg constructor for subclass use.
+ */
+ public JdbcSourceOffsetProvider() {
+ this.chunkHighWatermarkMap = new HashMap<>();
+ }
+
+ /**
+ * Constructor for FROM Source TO Database.
+ */
public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType,
Map<String, String> sourceProperties) {
this.jobId = jobId;
this.sourceType = sourceType;
@@ -155,10 +165,13 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
return null;
}
+ /**
+ * Should never call this.
+ */
@Override
- public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset nextOffset) {
- // todo: only for cdc tvf
- return null;
+ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset nextOffset,
+ long taskId) {
+ throw new UnsupportedOperationException("rewriteTvfParams not
supported for " + getClass().getSimpleName());
}
@Override
@@ -347,7 +360,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
@Override
public Offset deserializeOffsetProperty(String offset) {
- // no need
+ // no need cause cdc_stream has offset property
return null;
}
@@ -415,7 +428,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
* Assign the HW value to the synchronized Split,
* and remove the Split from remainSplit and place it in finishedSplit.
*/
- private List<SnapshotSplit> recalculateRemainingSplits(
+ protected List<SnapshotSplit> recalculateRemainingSplits(
Map<String, Map<String, Map<String, String>>>
chunkHighWatermarkMap,
Map<String, List<SnapshotSplit>> snapshotSplits) {
if (this.finishedSplits == null) {
@@ -541,7 +554,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
}
- private boolean checkNeedSplitChunks(Map<String, String> sourceProperties)
{
+ protected boolean checkNeedSplitChunks(Map<String, String>
sourceProperties) {
String startMode = sourceProperties.get(DataSourceConfigKeys.OFFSET);
if (startMode == null) {
return false;
@@ -550,11 +563,18 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startMode);
}
- private boolean isSnapshotOnlyMode() {
+ protected boolean isSnapshotOnlyMode() {
String offset = sourceProperties.get(DataSourceConfigKeys.OFFSET);
return DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset);
}
+ @Override
+ public void onTaskCommitted(long scannedRows, long loadBytes) {
+ if (scannedRows == 0 && loadBytes == 0) {
+ hasMoreData = false;
+ }
+ }
+
@Override
public boolean hasReachedEnd() {
return isSnapshotOnlyMode()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
new file mode 100644
index 00000000000..803deec145a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
@@ -0,0 +1,425 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.jdbc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.httpv2.entity.ResponseBody;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.split.AbstractSourceSplit;
+import org.apache.doris.job.cdc.split.BinlogSplit;
+import org.apache.doris.job.cdc.split.SnapshotSplit;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * OffsetProvider for cdc_stream TVF path.
+ *
+ * <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
+ * <ul>
+ * <li>offset commit: FE pulls actual end offset from BE via
/api/getTaskOffset/{taskId} in
+ * beforeCommitted, stores in txn attachment (transactionally safe)</li>
+ * <li>cloud mode snapshot: attachment carries cumulative
chunkHighWatermarkMap so that
+ * replayOnCloudMode can recover full state from the single latest
attachment in MS</li>
+ * <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap
populated by
+ * replayOnCommitted/replayOnCloudMode -> updateOffset), not from
EditLog</li>
+ * <li>updateOffset: during replay remainingSplits is empty so removeIf
returns false naturally;
+ * chunkHighWatermarkMap is always updated unconditionally to support
recovery</li>
+ * <li>replayIfNeed: checks currentOffset directly — snapshot triggers
remainingSplits rebuild
+ * from meta + chunkHighWatermarkMap; binlog needs no action
(currentOffset already set)</li>
+ * </ul>
+ */
+@Log4j2
+public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * No-arg constructor required by
SourceOffsetProviderFactory.createSourceOffsetProvider().
+ */
+ public JdbcTvfSourceOffsetProvider() {
+ super();
+ }
+
+ /**
+ * Initializes provider state and fetches snapshot splits from BE.
+ * splitChunks is called here (rather than in StreamingInsertJob) to keep
+ * all cdc_stream-specific init logic inside the provider.
+ */
+ @Override
+ public void ensureInitialized(Long jobId, Map<String, String>
originTvfProps) throws JobException {
+ // Always refresh fields that may be updated via ALTER JOB (e.g.
credentials, parallelism).
+ this.sourceProperties = originTvfProps;
+ this.snapshotParallelism = Integer.parseInt(
+
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+ DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+
+ if (this.jobId != null) {
+ return;
+ }
+ // One-time initialization below — safe to skip on FE restart because
the provider
+ // is reconstructed fresh (getPersistInfo returns null), so jobId is
null then too.
+ this.jobId = jobId;
+ this.chunkHighWatermarkMap = new HashMap<>();
+ String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
+ Preconditions.checkArgument(type != null, "type is required");
+ this.sourceType = DataSourceType.valueOf(type.toUpperCase());
+ String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
+ Preconditions.checkArgument(table != null, "table is required for
cdc_stream TVF");
+ }
+
+ /**
+ * Called once on fresh job creation (not on FE restart).
+ * Fetches snapshot splits from BE and persists them to the meta table.
+ */
+ @Override
+ public void initOnCreate() throws JobException {
+ String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
+ splitChunks(Collections.singletonList(table));
+ }
+
+ /**
+ * Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
+ * so the BE knows where to start reading and can report
+ * the end offset back via taskOffsetCache.
+ */
+ @Override
+ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand,
+ Offset runningOffset, long taskId) {
+ JdbcOffset offset = (JdbcOffset) runningOffset;
+ Map<String, String> props = new HashMap<>();
+ Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan
-> {
+ if (plan instanceof UnboundTVFRelation) {
+ UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+ props.putAll(originTvfRel.getProperties().getMap());
+ props.put(CdcStreamTableValuedFunction.META_KEY, new
Gson().toJson(offset.generateMeta()));
+ props.put(CdcStreamTableValuedFunction.JOB_ID_KEY,
String.valueOf(jobId));
+ props.put(CdcStreamTableValuedFunction.TASK_ID_KEY,
String.valueOf(taskId));
+ return new UnboundTVFRelation(
+ originTvfRel.getRelationId(),
originTvfRel.getFunctionName(), new Properties(props));
+ }
+ return plan;
+ });
+ InsertIntoTableCommand cmd = new InsertIntoTableCommand((LogicalPlan)
rewritePlan,
+ Optional.empty(), Optional.empty(), Optional.empty(), true,
Optional.empty());
+ cmd.setJobId(originCommand.getJobId());
+ return cmd;
+ }
+
+ /**
+ * Returns the serialized JSON offset to store in txn commit attachment.
+ *
+ * <p>Calls BE /api/getTaskOffset/{taskId} to get the actual end offset
recorded after
+ * fetchRecordStream completes (stored in
PipelineCoordinator.taskOffsetCache).
+ *
+ * <p>For cloud + snapshot: returns cumulative list (all previously
completed chunks +
+ * current task's new splits) so that replayOnCloudMode can recover full
state from latest attachment.
+ * For non-cloud snapshot / binlog: returns only current task's splits.
+ */
+ @Override
+ public String getCommitOffsetJson(Offset runningOffset, long taskId,
List<Long> scanBackendIds) {
+ List<Map<String, String>> currentTaskEndOffset =
fetchTaskEndOffset(taskId, scanBackendIds);
+ if (CollectionUtils.isEmpty(currentTaskEndOffset)) {
+ return "";
+ }
+
+ // Cloud + snapshot: prepend all previously completed chunks so the
attachment is
+ // self-contained for replayOnCloudMode (MS only keeps the latest
attachment)
+ if (Config.isCloudMode() && ((JdbcOffset)
runningOffset).snapshotSplit()) {
+ List<Map<String, String>> cumulative =
buildCumulativeSnapshotOffset(currentTaskEndOffset);
+ return new Gson().toJson(cumulative);
+ }
+ return new Gson().toJson(currentTaskEndOffset);
+ }
+
+ /**
+ * Queries each scan backend in order until one returns a non-empty offset
for this taskId.
+ * Only the BE that ran the cdc_stream TVF scan node will have the entry
in taskOffsetCache.
+ */
+ private List<Map<String, String>> fetchTaskEndOffset(long taskId,
List<Long> scanBackendIds) {
+ InternalService.PRequestCdcClientRequest request =
+ InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/getTaskOffset/" + taskId).build();
+ for (Long beId : scanBackendIds) {
+ Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+ if (backend == null) {
+ log.info("Backend {} not found for task {}, skipping", beId,
taskId);
+ continue;
+ }
+ String rawResponse = null;
+ try {
+ TNetworkAddress address = new
TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ Future<PRequestCdcClientResult> future =
+
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+ InternalService.PRequestCdcClientResult result = future.get();
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ log.warn("Failed to get task {} offset from BE {}: {}",
taskId,
+ backend.getHost(),
result.getStatus().getErrorMsgs(0));
+ continue;
+ }
+ rawResponse = result.getResponse();
+ ResponseBody<List<Map<String, String>>> responseObj =
OBJECT_MAPPER.readValue(
+ rawResponse,
+ new TypeReference<ResponseBody<List<Map<String,
String>>>>() {});
+ List<Map<String, String>> data = responseObj.getData();
+ if (!CollectionUtils.isEmpty(data)) {
+ log.info("Fetched task {} offset from BE {}: {}", taskId,
backend.getHost(), data);
+ return data;
+ }
+ } catch (Exception ex) {
+ log.warn("Get task offset error for task {} from BE {}, raw
response: {}",
+ taskId, backend.getHost(), rawResponse, ex);
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Merges existing chunkHighWatermarkMap (all previously completed chunks)
with
+ * current task's new splits, deduplicating by splitId.
+ */
+ private List<Map<String, String>> buildCumulativeSnapshotOffset(
+ List<Map<String, String>> currentTaskSplits) {
+ Set<String> currentSplitIds = currentTaskSplits.stream()
+ .map(m -> m.get(SPLIT_ID)).collect(Collectors.toSet());
+
+ List<Map<String, String>> result = new ArrayList<>();
+ // Add all previously completed chunks (skip any that overlap with
current task)
+ if (MapUtils.isNotEmpty(chunkHighWatermarkMap)) {
+ for (Map.Entry<String, Map<String, Map<String, String>>> tableEntry
+ : chunkHighWatermarkMap.entrySet()) {
+ for (Map.Entry<String, Map<String, String>> splitEntry
+ : tableEntry.getValue().entrySet()) {
+ if (!currentSplitIds.contains(splitEntry.getKey())) {
+ Map<String, String> map = new
HashMap<>(splitEntry.getValue());
+ map.put(SPLIT_ID, splitEntry.getKey());
+ result.add(map);
+ }
+ }
+ }
+ }
+ result.addAll(currentTaskSplits);
+ return result;
+ }
+
+ /**
+ * TVF path updateOffset.
+ *
+ * <p>Snapshot: always writes to chunkHighWatermarkMap (needed for cloud
cumulative attachment
+ * and FE-restart recovery). In normal flow removeIf finds the split in
remainingSplits and
+ * adds it to finishedSplits. During txn replay remainingSplits is empty
so removeIf returns
+ * false naturally — chunkHighWatermarkMap is still updated for
replayIfNeed to use later.
+ *
+ * <p>Binlog: currentOffset is set above; no extra state needed for TVF
recovery path.
+ */
+ @Override
+ public void updateOffset(Offset offset) {
+ this.currentOffset = (JdbcOffset) offset;
+ if (currentOffset.snapshotSplit()) {
+ for (AbstractSourceSplit split : currentOffset.getSplits()) {
+ SnapshotSplit ss = (SnapshotSplit) split;
+ boolean removed = remainingSplits.removeIf(v -> {
+ if (v.getSplitId().equals(ss.getSplitId())) {
+ ss.setTableId(v.getTableId());
+ ss.setSplitKey(v.getSplitKey());
+ ss.setSplitStart(v.getSplitStart());
+ ss.setSplitEnd(v.getSplitEnd());
+ return true;
+ }
+ return false;
+ });
+ if (removed) {
+ finishedSplits.add(ss);
+ }
+ chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k ->
new HashMap<>())
+ .put(ss.getSplitId(), ss.getHighWatermark());
+ }
+ }
+ // Binlog: currentOffset is already set; no binlogOffsetPersist needed
for TVF path.
+ }
+
+ /**
+ * TVF path recovery: offsetProviderPersist is always null (no EditLog
write).
+ * currentOffset is set by replayOnCommitted/replayOnCloudMode ->
updateOffset before this runs.
+ *
+ * <ul>
+ * <li>snapshot: mid-snapshot restart — rebuild remainingSplits from
meta + chunkHighWatermarkMap</li>
+ * <li>binlog: currentOffset already correct from updateOffset; nothing
to do</li>
+ * </ul>
+ */
+ @Override
+ public void replayIfNeed(StreamingInsertJob job) throws JobException {
+ if (currentOffset == null) {
+ // No committed txn yet. If snapshot splits exist in the meta
table (written by
+ // initOnCreate), restore remainingSplits so getNextOffset()
returns snapshot splits
+ // instead of a BinlogSplit (which would incorrectly skip the
snapshot phase).
+ Map<String, List<SnapshotSplit>> snapshotSplits =
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
+ if (MapUtils.isNotEmpty(snapshotSplits)) {
+ recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
+ log.info("Replaying TVF offset provider for job {}: no
committed txn,"
+ + " restored {} remaining splits from meta",
job.getJobId(), remainingSplits.size());
+ } else {
+ log.info("Replaying TVF offset provider for job {}: no
committed txn,"
+ + " no snapshot splits in meta", job.getJobId());
+ }
+ return;
+ }
+ if (currentOffset.snapshotSplit()) {
+ log.info("Replaying TVF offset provider for job {}: restoring
snapshot state from txn replay",
+ job.getJobId());
+ Map<String, List<SnapshotSplit>> snapshotSplits =
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
+ if (MapUtils.isNotEmpty(snapshotSplits)) {
+ // During replay, buildTableKey() may have stored entries
under "null.null"
+ // because sourceProperties was not yet initialized. Remap all
committed splits
+ // under the actual table key so recalculateRemainingSplits
can match them.
+ Map<String, Map<String, Map<String, String>>> effectiveMap =
+ remapChunkHighWatermarkMap(snapshotSplits);
+ List<SnapshotSplit> lastSnapshotSplits =
+ recalculateRemainingSplits(effectiveMap,
snapshotSplits);
+ if (remainingSplits.isEmpty()) {
+ if (!lastSnapshotSplits.isEmpty()) {
+ currentOffset = new JdbcOffset(lastSnapshotSplits);
+ } else if (!isSnapshotOnlyMode()) {
+ BinlogSplit binlogSplit = new BinlogSplit();
+ binlogSplit.setFinishedSplits(finishedSplits);
+ currentOffset = new
JdbcOffset(Collections.singletonList(binlogSplit));
+ }
+ }
+ }
+ } else {
+ log.info("Replaying TVF offset provider for job {}: binlog offset
already set, nothing to do",
+ job.getJobId());
+ }
+ }
+
+ /**
+ * Builds the chunkHighWatermarkMap outer key as schema.table (if schema
is present)
+ * or database.table, matching the format used by snapshotSplits keys in
+ * recalculateRemainingSplits.
+ */
+ private String buildTableKey() {
+ String schema = sourceProperties.get(DataSourceConfigKeys.SCHEMA);
+ String qualifier = (schema != null && !schema.isEmpty())
+ ? schema : sourceProperties.get(DataSourceConfigKeys.DATABASE);
+ return qualifier + "." +
sourceProperties.get(DataSourceConfigKeys.TABLE);
+ }
+
+ /**
+ * Remaps chunkHighWatermarkMap to use the actual table key from
snapshotSplits.
+ *
+ * <p>During FE-restart replay, buildTableKey() produces "null.null"
because sourceProperties
+ * is not yet initialized (ensureInitialized has not been called). This
causes
+ * recalculateRemainingSplits to miss all committed splits (key mismatch)
and re-process
+ * them, leading to data duplication.
+ *
+ * <p>The fix: flatten all inner splitId->highWatermark entries across
every outer key and
+ * re-key them under the actual table name read from the meta table
(snapshotSplits key).
+ * TVF always has exactly one table, so snapshotSplits has exactly one
entry.
+ *
+ * <tableName -> splitId -> chunk of highWatermark>
+ */
+ private Map<String, Map<String, Map<String, String>>>
remapChunkHighWatermarkMap(
+ Map<String, List<SnapshotSplit>> snapshotSplits) {
+ if (MapUtils.isEmpty(chunkHighWatermarkMap)) {
+ return chunkHighWatermarkMap;
+ }
+ // Flatten all committed splitId -> highWatermark entries, ignoring
the (possibly wrong) outer key
+ Map<String, Map<String, String>> flatCommitted = new HashMap<>();
+ for (Map<String, Map<String, String>> inner :
chunkHighWatermarkMap.values()) {
+ if (inner != null) {
+ flatCommitted.putAll(inner);
+ }
+ }
+ if (flatCommitted.isEmpty()) {
+ return chunkHighWatermarkMap;
+ }
+ // Re-key under the actual table name from snapshotSplits (TVF has
exactly one table).
+ // getTableName() in recalculateRemainingSplits splits by "." and
takes the last segment,
+ // so using the plain table name as key (no dots) ensures a direct
match.
+ String actualTableKey = snapshotSplits.keySet().iterator().next();
+ Map<String, Map<String, Map<String, String>>> remapped = new
HashMap<>();
+ remapped.put(actualTableKey, flatCommitted);
+ return remapped;
+ }
+
+ /**
+ * TVF path does not persist to EditLog; state is recovered via txn replay.
+ * This override is defensive — the persistOffsetProviderIfNeed() call path
+ * only runs in the non-TVF commitOffset flow and won't reach here.
+ */
+ @Override
+ public String getPersistInfo() {
+ return null;
+ }
+
+ @Override
+ public void applyEndOffsetToTask(Offset runningOffset, Offset endOffset) {
+ if (!(runningOffset instanceof JdbcOffset) || !(endOffset instanceof
JdbcOffset)) {
+ return;
+ }
+ JdbcOffset running = (JdbcOffset) runningOffset;
+ JdbcOffset end = (JdbcOffset) endOffset;
+ if (running.snapshotSplit()) {
+ for (int i = 0; i < running.getSplits().size() && i <
end.getSplits().size(); i++) {
+ SnapshotSplit rSplit = (SnapshotSplit)
running.getSplits().get(i);
+ SnapshotSplit eSplit = (SnapshotSplit) end.getSplits().get(i);
+ rSplit.setHighWatermark(eSplit.getHighWatermark());
+ }
+ } else {
+ BinlogSplit rSplit = (BinlogSplit) running.getSplits().get(0);
+ BinlogSplit eSplit = (BinlogSplit) end.getSplits().get(0);
+ // deserializeOffset stores binlog position in startingOffset
+ rSplit.setEndingOffset(eSplit.getStartingOffset());
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 23996fb8f38..886e03f21c3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -131,7 +131,8 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand, Offset runningOffset) {
+ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand
originCommand,
+ Offset runningOffset, long taskId) {
S3Offset offset = (S3Offset) runningOffset;
Map<String, String> props = new HashMap<>();
// rewrite plan
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 584ee0848db..fc63710f93d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -246,6 +246,22 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync, Ne
inputTvf.getProperties().getMap().get("uri")),
"The uri property cannot be modified in ALTER JOB");
break;
+ case "cdc_stream":
+ // type, jdbc_url, database, schema, and table identify the
source and cannot be changed.
+ // user, password, driver_url, driver_class, etc. are
modifiable (credential rotation).
+ for (String unmodifiable : new String[] {
+ DataSourceConfigKeys.TYPE,
+ DataSourceConfigKeys.JDBC_URL,
+ DataSourceConfigKeys.DATABASE,
+ DataSourceConfigKeys.SCHEMA,
+ DataSourceConfigKeys.TABLE}) {
+ Preconditions.checkArgument(
+ Objects.equals(
+
originTvf.getProperties().getMap().get(unmodifiable),
+
inputTvf.getProperties().getMap().get(unmodifiable)),
+ "The '%s' property cannot be modified in ALTER JOB
for cdc_stream", unmodifiable);
+ }
+ break;
default:
throw new IllegalArgumentException("Unsupported tvf type:" +
inputTvf.getFunctionName());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 38e4059711c..9ecdb14e90b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -574,6 +574,10 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
return scanBackendIds.size();
}
+ public Set<Long> getScanBackendIds() {
+ return scanBackendIds;
+ }
+
public int getScanRangeNum() {
return Integer.MAX_VALUE;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 397ddfb982b..dac24f2f798 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3516,6 +3516,18 @@ public class Coordinator implements CoordInterface {
return backendAddresses;
}
+ /**
+ * Returns the IDs of backends that have scan ranges assigned, collected
from each ScanNode's
+ * scanBackendIds (populated during plan phase).
+ */
+ public List<Long> getScanBackendIds() {
+ Set<Long> result = Sets.newHashSet();
+ for (ScanNode scanNode : scanNodes) {
+ result.addAll(scanNode.getScanBackendIds());
+ }
+ return Lists.newArrayList(result);
+ }
+
public List<PlanFragment> getFragments() {
return fragments;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index 621d2fbe6b6..ce91f8a7fe7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -29,7 +29,10 @@ import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileType;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -41,6 +44,14 @@ import java.util.UUID;
public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunction {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String URI =
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";
+ private static final String ENABLE_CDC_CLIENT_KEY = "enable_cdc_client";
+ private static final String HTTP_ENABLE_RANGE_REQUEST_KEY =
"http.enable.range.request";
+ private static final String HTTP_ENABLE_CHUNK_RESPONSE_KEY =
"http.enable.chunk.response";
+ private static final String HTTP_METHOD_KEY = "http.method";
+ private static final String HTTP_PAYLOAD_KEY = "http.payload";
+ public static final String JOB_ID_KEY = "job.id";
+ public static final String TASK_ID_KEY = "task.id";
+ public static final String META_KEY = "meta";
public CdcStreamTableValuedFunction(Map<String, String> properties) throws
AnalysisException {
validate(properties);
@@ -51,24 +62,34 @@ public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunctio
Map<String, String> copyProps = new HashMap<>(properties);
copyProps.put("format", "json");
super.parseCommonProperties(copyProps);
- this.processedParams.put("enable_cdc_client", "true");
- this.processedParams.put("uri", URI);
- this.processedParams.put("http.enable.range.request", "false");
- this.processedParams.put("http.enable.chunk.response", "true");
- this.processedParams.put("http.method", "POST");
+ this.processedParams.put(ENABLE_CDC_CLIENT_KEY, "true");
+ this.processedParams.put(URI_KEY, URI);
+ this.processedParams.put(HTTP_ENABLE_RANGE_REQUEST_KEY, "false");
+ this.processedParams.put(HTTP_ENABLE_CHUNK_RESPONSE_KEY, "true");
+ this.processedParams.put(HTTP_METHOD_KEY, "POST");
String payload = generateParams(properties);
- this.processedParams.put("http.payload", payload);
+ this.processedParams.put(HTTP_PAYLOAD_KEY, payload);
this.backendConnectProperties.putAll(processedParams);
generateFileStatus();
}
private String generateParams(Map<String, String> properties) throws
AnalysisException {
FetchRecordRequest recordRequest = new FetchRecordRequest();
- recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+ String defaultJobId = UUID.randomUUID().toString().replace("-", "");
+ recordRequest.setJobId(properties.getOrDefault(JOB_ID_KEY,
defaultJobId));
recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
recordRequest.setConfig(properties);
try {
+ // for tvf with job
+ if (properties.containsKey(TASK_ID_KEY)) {
+ recordRequest.setTaskId(properties.remove(TASK_ID_KEY));
+ String meta = properties.remove(META_KEY);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(meta),
"meta is required when task.id is provided");
+ Map<String, Object> metaMap = objectMapper.readValue(meta, new
TypeReference<Map<String, Object>>() {});
+ recordRequest.setMeta(metaMap);
+ }
+
return objectMapper.writeValueAsString(recordRequest);
} catch (IOException e) {
LOG.warn("Failed to serialize fetch record request", e);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 0b677208404..3d6f1af85d1 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -135,7 +135,7 @@ public class ClientController {
}
@RequestMapping(path = "/api/getTaskOffset/{taskId}", method =
RequestMethod.POST)
- public Object getTaskIdOffset(@PathVariable String taskId) {
+ public Object getTaskIdOffset(@PathVariable("taskId") String taskId) {
return
RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId));
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 850c3038b91..46fabe2d418 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -74,8 +74,9 @@ public class PipelineCoordinator {
private static final String SPLIT_ID = "splitId";
// jobId
private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new
ConcurrentHashMap<>();
- // taskId, offset
- private final Map<String, Map<String, String>> taskOffsetCache = new
ConcurrentHashMap<>();
+ // taskId -> list of split offsets (accumulates all splits processed in
one task)
+ private final Map<String, List<Map<String, String>>> taskOffsetCache =
+ new ConcurrentHashMap<>();
// taskId -> writeFailReason
private final Map<String, String> taskErrorMaps = new
ConcurrentHashMap<>();
private final ThreadPoolExecutor executor;
@@ -107,14 +108,16 @@ public class PipelineCoordinator {
SourceReader sourceReader;
SplitReadResult readResult;
try {
+ LOG.info(
+ "Fetch record request with meta {}, jobId={}, taskId={}",
+ fetchReq.getMeta(),
+ fetchReq.getJobId(),
+ fetchReq.getTaskId());
+ // TVF doesn't have meta value; meta need to be extracted from the
offset.
if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
- LOG.info(
- "Generate initial meta for fetch record request,
jobId={}, taskId={}",
- fetchReq.getJobId(),
- fetchReq.getTaskId());
- // means the request did not originate from the job, only tvf
Map<String, Object> meta = generateMeta(fetchReq.getConfig());
fetchReq.setMeta(meta);
+ LOG.info("Generated meta for job {}: {}", fetchReq.getJobId(),
meta);
}
sourceReader = Env.getCurrentEnv().getReader(fetchReq);
@@ -144,19 +147,33 @@ public class PipelineCoordinator {
OutputStream rawOutputStream)
throws Exception {
SourceSplit split = readResult.getSplit();
- Map<String, String> lastMeta = null;
+ boolean isSnapshotSplit = sourceReader.isSnapshotSplit(split);
int rowCount = 0;
+ int heartbeatCount = 0;
BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+ boolean hasReceivedData = false;
+ boolean lastMessageIsHeartbeat = false;
+ long startTime = System.currentTimeMillis();
try {
- // Poll records using the existing mechanism
boolean shouldStop = false;
- long startTime = System.currentTimeMillis();
+ LOG.info(
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ fetchRecord.getJobId(),
+ fetchRecord.getTaskId(),
+ isSnapshotSplit);
while (!shouldStop) {
Iterator<SourceRecord> recordIterator =
sourceReader.pollRecords();
if (!recordIterator.hasNext()) {
Thread.sleep(100);
long elapsedTime = System.currentTimeMillis() - startTime;
- if (elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS) {
+ boolean timeoutReached = elapsedTime >
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+ if (shouldStop(
+ isSnapshotSplit,
+ hasReceivedData,
+ lastMessageIsHeartbeat,
+ elapsedTime,
+ Constants.POLL_SPLIT_RECORDS_TIMEOUTS,
+ timeoutReached)) {
break;
}
continue;
@@ -164,8 +181,19 @@ public class PipelineCoordinator {
while (recordIterator.hasNext()) {
SourceRecord element = recordIterator.next();
if (isHeartbeatEvent(element)) {
- shouldStop = true;
- break;
+ heartbeatCount++;
+ if (!isSnapshotSplit) {
+ lastMessageIsHeartbeat = true;
+ }
+ long elapsedTime = System.currentTimeMillis() -
startTime;
+ boolean timeoutReached =
+ elapsedTime >
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+ if (!isSnapshotSplit && timeoutReached) {
+ shouldStop = true;
+ break;
+ }
+ // Heartbeat before timeout: skip and keep reading.
+ continue;
}
DeserializeResult result =
sourceReader.deserialize(fetchRecord.getConfig(),
element);
@@ -175,9 +203,18 @@ public class PipelineCoordinator {
bos.write(LINE_DELIMITER);
}
rowCount += result.getRecords().size();
+ hasReceivedData = true;
+ lastMessageIsHeartbeat = false;
}
}
}
+ LOG.info(
+ "Fetched {} records and {} heartbeats in {} ms for
jobId={} taskId={}",
+ rowCount,
+ heartbeatCount,
+ System.currentTimeMillis() - startTime,
+ fetchRecord.getJobId(),
+ fetchRecord.getTaskId());
// force flush buffer
bos.flush();
} finally {
@@ -186,32 +223,10 @@ public class PipelineCoordinator {
sourceReader.finishSplitRecords();
}
- LOG.info(
- "Fetch records completed, jobId={}, taskId={}, splitId={},
rowCount={}",
- fetchRecord.getJobId(),
- fetchRecord.getTaskId(),
- split.splitId(),
- rowCount);
-
- if (readResult.getSplitState() != null) {
- if (sourceReader.isSnapshotSplit(split)) {
- lastMeta =
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- lastMeta.put(SPLIT_ID, split.splitId());
- } else if (sourceReader.isBinlogSplit(split)) {
- lastMeta =
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- } else {
- throw new RuntimeException(
- "unknown split type: " +
split.getClass().getSimpleName());
- }
- } else {
- throw new RuntimeException("split state is null");
- }
-
+ List<Map<String, String>> offsetMeta = extractOffsetMeta(sourceReader,
readResult);
if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) {
- taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta);
+ taskOffsetCache.put(fetchRecord.getTaskId(), offsetMeta);
}
-
// Convention: standalone TVF uses a UUID jobId; job-driven TVF will
use a numeric Long
// jobId (set via rewriteTvfParams). When the job-driven path is
implemented,
// rewriteTvfParams must inject the job's Long jobId into the TVF
properties
@@ -751,8 +766,8 @@ public class PipelineCoordinator {
return commitOffsets;
}
- public Map<String, String> getOffsetWithTaskId(String taskId) {
- Map<String, String> taskOffset = taskOffsetCache.remove(taskId);
- return taskOffset == null ? new HashMap<>() : taskOffset;
+ public List<Map<String, String>> getOffsetWithTaskId(String taskId) {
+ List<Map<String, String>> taskOffset = taskOffsetCache.remove(taskId);
+ return taskOffset == null ? new ArrayList<>() : taskOffset;
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index a4f3a9e2547..9d77f3912eb 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -62,6 +62,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.types.logical.RowType;
import org.apache.kafka.connect.source.SourceRecord;
@@ -848,6 +849,22 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)));
}
+ // todo: Currently, only one split key is supported; future will
require multiple split
+ // keys.
+ if (cdcConfig.containsKey(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY)) {
+ String database = cdcConfig.get(DataSourceConfigKeys.DATABASE);
+ String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+ Preconditions.checkArgument(
+ database != null && !database.isEmpty() && table != null
&& !table.isEmpty(),
+ "When '%s' is set, both '%s' and '%s' must be configured
(include_tables is not supported).",
+ DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY,
+ DataSourceConfigKeys.DATABASE,
+ DataSourceConfigKeys.TABLE);
+ ObjectPath objectPath = new ObjectPath(database, table);
+ configFactory.chunkKeyColumn(
+ objectPath,
cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
+ }
+
return configFactory.createConfig(0);
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 492e0650d4b..b6d28510613 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -221,6 +221,10 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)));
}
+ if (cdcConfig.containsKey(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY)) {
+
configFactory.chunkKeyColumn(cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
+ }
+
Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
dbzProps.put("interval.handling.mode", "string");
configFactory.debeziumProperties(dbzProps);
diff --git a/fs_brokers/cdc_client/src/main/resources/application.properties
b/fs_brokers/cdc_client/src/main/resources/application.properties
index a7ea3996a6f..d22c98d434b 100644
--- a/fs_brokers/cdc_client/src/main/resources/application.properties
+++ b/fs_brokers/cdc_client/src/main/resources/application.properties
@@ -19,4 +19,5 @@ spring.web.resources.add-mappings=false
server.port=9096
backend.http.port=8040
# see doris-meta/image/VERSION
-cluster.token=cluster-token
\ No newline at end of file
+cluster.token=cluster-token
+spring.mvc.async.request-timeout=300000
\ No newline at end of file
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.out
new file mode 100644
index 00000000000..c9f1ceb1307
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !snapshot_data --
+A1 1
+B1 2
+
+-- !final_data --
+A1 1
+B1 2
+C1 3
+D1 4
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.out
new file mode 100644
index 00000000000..c9f1ceb1307
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !snapshot_data --
+A1 1
+B1 2
+
+-- !final_data --
+A1 1
+B1 2
+C1 3
+D1 4
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.out
new file mode 100644
index 00000000000..3f6c816fa20
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !offset_latest_data --
+C1 3
+D1 4
+
+-- !final_data --
+C1 3
+D1 4
+E1 5
+F1 6
+
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
new file mode 100644
index 00000000000..33451a3763a
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !snapshot_data --
+A1 1
+B1 2
+
+-- !final_data --
+A1 1
+B1 2
+C1 30
+D1 4
+E1 5
+F1 6
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy
new file mode 100644
index 00000000000..fdc50c76118
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql.groovy
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for MySQL.
+ *
+ * Scenario:
+ * 1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ * 2. Binlog phase: INSERT (C1, D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_mysql",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_job_cdc_stream_mysql_name"
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_mysql_tbl"
+ def mysqlDb = "test_cdc_db"
+ def mysqlTable = "test_streaming_job_cdc_stream_mysql_src"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `name` varchar(200) NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`name`)
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // prepare source table with pre-existing snapshot data
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}"""
+ sql """CREATE TABLE ${mysqlDb}.${mysqlTable} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES
('A1', 1)"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES
('B1', 2)"""
+ }
+
+ // create streaming job via cdc_stream TVF (offset=initial → snapshot
then binlog)
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "${mysqlTable}",
+ "offset" = "initial",
+ "snapshot_split_key" = "name"
+ )
+ """
+
+ // wait for at least one snapshot task to succeed
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+ log.info("SucceedTaskCount: " + cnt)
+ cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // verify snapshot data
+ qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
+
+ // insert incremental rows in MySQL
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES
('C1', 3)"""
+ sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES
('D1', 4)"""
+ }
+
+ // wait for binlog tasks to pick up the new rows
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+ log.info("incremental rows: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy
new file mode 100644
index 00000000000..3fa853e2118
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ * 1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ * 2. Binlog phase: INSERT (C1, D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_postgres",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_job_cdc_stream_postgres_name"
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_postgres_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def pgTable = "test_streaming_job_cdc_stream_postgres_src"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `name` varchar(200) NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`name`)
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // prepare source table with pre-existing snapshot data
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+ "name" varchar(200) PRIMARY KEY,
+ "age" int2
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('B1', 2)"""
+ }
+
+ // create streaming job via cdc_stream TVF (offset=initial → snapshot
then binlog)
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "initial",
+ "snapshot_split_size" = "1"
+ )
+ """
+
+ // wait for at least one snapshot task to succeed
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def cnt = sql """select SucceedTaskCount from
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+ log.info("SucceedTaskCount: " + cnt)
+ cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // verify snapshot data
+ qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
+
+ // insert incremental rows in PostgreSQL
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('C1', 3)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('D1', 4)"""
+ }
+
+ // wait for binlog tasks to pick up the new rows
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+ log.info("incremental rows: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
new file mode 100644
index 00000000000..01d3defafaf
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test cdc_stream TVF streaming job with offset=latest and credential update
via ALTER JOB.
+ *
+ * Scenario A — offset=latest:
+ * 1. Pre-existing rows (A1, B1) are present in the PG source table before
the job is created.
+ * 2. Job is created with offset=latest: snapshot is skipped, only changes
after job creation are synced.
+ * 3. INSERT (C1, D1) after job creation → should appear in Doris.
+ * 4. Verify A1, B1 are NOT in Doris (skipped by offset=latest).
+ *
+ * Scenario B — ALTER with wrong credentials:
+ * 1. Pause the job.
+ * 2. ALTER JOB with wrong password (same type/database/table, so the
unmodifiable-property check passes).
+ * 3. Resume the job → fetchMeta fails → job auto-pauses with an error
message.
+ * 4. Verify the error message reflects the credential failure.
+ *
+ * Scenario C — ALTER back to correct credentials and recover:
+ * 1. Alter back to correct credentials while still PAUSED.
+ * 2. Resume the job → job returns to RUNNING.
+ * 3. INSERT (E1, F1) after recovery → should appear in Doris (no data loss
during pause).
+ * 4. Verify final data: only C1, D1, E1, F1 are present (A1, B1 still
absent).
+ */
+suite("test_streaming_job_cdc_stream_postgres_latest_alter_cred",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+
+ def jobName = "test_streaming_job_cdc_stream_pg_latest_alter_cred"
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_pg_latest_alter_cred_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def pgTable = "test_streaming_job_cdc_stream_pg_latest_alter_cred_src"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `name` varchar(200) NOT NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`name`)
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+ String jdbcUrl =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}"
+
+ // ── Scenario A setup: pre-existing rows exist before job creation
─────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+ "name" varchar(200) PRIMARY KEY,
+ "age" int2
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('B1', 2)"""
+ }
+
+ // ── Scenario A: create job with offset=latest (skip snapshot)
─────────────────
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" = "${jdbcUrl}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "latest"
+ )
+ """
+
+ // wait for job to reach RUNNING state (at least one task scheduled)
+ try {
+ Awaitility.await().atMost(60, SECONDS).pollInterval(1,
SECONDS).until({
+ def status = sql """select status from jobs("type"="insert")
where Name='${jobName}'"""
+ status.size() == 1 && status.get(0).get(0) == "RUNNING"
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ // insert rows AFTER job creation — only these should appear in Doris
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('C1', 3)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('D1', 4)"""
+ }
+
+ // wait for C1 and D1 to arrive via binlog
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+ log.info("binlog rows (offset=latest): " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // offset=latest: A1 and B1 must NOT be in Doris (they existed before
job creation)
+ def preExisting = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('A1', 'B1')"""
+ assert (preExisting.get(0).get(0) as int) == 0 :
+ "offset=latest: pre-existing rows A1/B1 must not be synced,
but found ${preExisting.get(0).get(0)}"
+
+ qt_offset_latest_data """ SELECT * FROM ${currentDb}.${dorisTable}
ORDER BY name """
+
+ // ── Scenario B: ALTER to wrong credentials → job should auto-pause
────────────
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
+ def s = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ s.size() == 1 && s.get(0).get(0) == "PAUSED"
+ })
+
+ // alter with wrong password; type/jdbc_url/database/schema/table
remain unchanged
+ sql """
+ ALTER JOB ${jobName}
+ INSERT INTO ${currentDb}.${dorisTable} (name, age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" = "${jdbcUrl}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "wrong_password_for_test",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "latest"
+ )
+ """
+
+ sql """RESUME JOB where jobname = '${jobName}'"""
+
+ // job should re-pause due to credential failure
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def s = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("status after wrong cred resume: " + s)
+ s.size() == 1 && s.get(0).get(0) == "PAUSED"
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ def errorMsgAfterWrongCred = sql """select ErrorMsg from
jobs("type"="insert") where Name='${jobName}'"""
+ log.info("error msg after wrong cred: " + errorMsgAfterWrongCred)
+ assert errorMsgAfterWrongCred.get(0).get(0) != null &&
!errorMsgAfterWrongCred.get(0).get(0).isEmpty() :
+ "ErrorMsg should be non-empty when wrong credentials are used"
+
+ // ── Scenario C: ALTER back to correct credentials and recover
─────────────────
+ sql """
+ ALTER JOB ${jobName}
+ INSERT INTO ${currentDb}.${dorisTable} (name, age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" = "${jdbcUrl}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "latest"
+ )
+ """
+
+ sql """RESUME JOB where jobname = '${jobName}'"""
+
+ // verify job transitions back to RUNNING
+ try {
+ Awaitility.await().atMost(60, SECONDS).pollInterval(1,
SECONDS).until({
+ def s = sql """select status from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("status after correct cred resume: " + s)
+ s.size() == 1 && s.get(0).get(0) == "RUNNING"
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ // insert rows after recovery; they should be consumed
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('E1', 5)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('F1', 6)"""
+ }
+
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('E1', 'F1')"""
+ log.info("rows after credential recovery: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // final state: only post-creation rows (C1, D1, E1, F1) — A1/B1
absent throughout
+ qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
+
+ def finalJobInfo = sql """select status, FailedTaskCount, ErrorMsg
+ from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("final job info: " + finalJobInfo)
+ assert finalJobInfo.get(0).get(0) == "RUNNING"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
new file mode 100644
index 00000000000..d1f9263d7ac
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test pause and resume of a streaming INSERT job using cdc_stream TVF for
PostgreSQL.
+ *
+ * Scenario:
+ * 1. Snapshot phase: pre-existing rows (A1, B1) are synced via full
snapshot.
+ * 2. Binlog phase begins: INSERT (C1, D1) are applied; job enters steady
binlog state.
+ * 3. Job is paused; verify status stays PAUSED and currentOffset /
endOffset are non-empty.
+ * 4. While paused: INSERT (E1, F1) and UPDATE C1.age → 30 into the PG
source table.
+ * 5. Job is resumed; verify status transitions to RUNNING.
+ * 6. Verify all rows inserted before and during pause eventually appear in
Doris (no data loss).
+ * 7. Verify FailedTaskCount == 0 and no error message after resume.
+ */
+suite("test_streaming_job_cdc_stream_postgres_pause_resume",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_job_cdc_stream_pg_pause_resume"
+ def currentDb = (sql "select database()")[0][0]
+ def dorisTable = "test_streaming_job_cdc_stream_pg_pause_resume_tbl"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def pgTable = "test_streaming_job_cdc_stream_pg_pause_resume_src"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+ // Use UNIQUE key so that UPDATE rows overwrite in Doris
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+ `name` varchar(200) NOT NULL,
+ `age` int NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`name`)
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ── Phase 1: prepare source table with pre-existing snapshot rows
──────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+ "name" varchar(200) PRIMARY KEY,
+ "age" int2
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('A1', 1)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('B1', 2)"""
+ }
+
+ // ── Phase 2: create streaming job (offset=initial → snapshot then
binlog) ─────
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+ SELECT name, age FROM cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${pgTable}",
+ "offset" = "initial"
+ )
+ """
+
+ // wait for snapshot tasks to complete (A1 and B1 visible)
+ try {
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('A1', 'B1')"""
+ log.info("snapshot rows: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
+
+ // ── Phase 3: insert incremental rows; wait for binlog phase to catch
up ────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('C1', 3)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('D1', 4)"""
+ }
+
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+ log.info("binlog rows before pause: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // ── Phase 4: verify offset display in binlog state
────────────────────────────
+ def jobInfoBeforePause = sql """select currentOffset, endOffset,
FailedTaskCount, ErrorMsg
+ from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("job info before pause: " + jobInfoBeforePause)
+
+ def currentOffsetBeforePause = jobInfoBeforePause.get(0).get(0) as
String
+ def failedCountBeforePause = jobInfoBeforePause.get(0).get(2)
+ // currentOffset should be non-empty and contain the binlog split id
+ assert currentOffsetBeforePause != null &&
!currentOffsetBeforePause.isEmpty() :
+ "currentOffset should be non-empty in binlog state, got:
${currentOffsetBeforePause}"
+ assert currentOffsetBeforePause.contains("binlog-split") :
+ "currentOffset should contain 'binlog-split' in binlog state,
got: ${currentOffsetBeforePause}"
+ assert (failedCountBeforePause as int) == 0 : "FailedTaskCount should
be 0 before pause"
+
+ // ── Phase 5: pause the job
─────────────────────────────────────────────────────
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+
+ // verify job transitions to PAUSED
+ try {
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1,
SECONDS).until({
+ def status = sql """select status from jobs("type"="insert")
where Name='${jobName}'"""
+ log.info("job status after pause: " + status)
+ status.size() == 1 && status.get(0).get(0) == "PAUSED"
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ // verify PAUSED state is stable (not auto-resumed) for several seconds
+ sleep(5000)
+ def pausedStatus = sql """select status from jobs("type"="insert")
where Name='${jobName}'"""
+ assert pausedStatus.get(0).get(0) == "PAUSED" : "Job should remain
PAUSED, got: ${pausedStatus.get(0).get(0)}"
+
+ // verify offset is still reported correctly while paused
+ def jobInfoWhilePaused = sql """select currentOffset, endOffset from
jobs("type"="insert") where Name='${jobName}'"""
+ log.info("job info while paused: " + jobInfoWhilePaused)
+ def currentOffsetWhilePaused = jobInfoWhilePaused.get(0).get(0) as
String
+ assert currentOffsetWhilePaused != null &&
!currentOffsetWhilePaused.isEmpty() :
+ "currentOffset should still be non-empty while paused"
+
+ // ── Phase 6: DML while job is paused
─────────────────────────────────────────
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // new rows added while paused
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('E1', 5)"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age)
VALUES ('F1', 6)"""
+ // update an existing row while paused; after resume Doris
(UNIQUE) should reflect new value
+ sql """UPDATE ${pgDB}.${pgSchema}.${pgTable} SET age = 30 WHERE
name = 'C1'"""
+ }
+
+ // ── Phase 7: resume the job
────────────────────────────────────────────────────
+ sql """RESUME JOB where jobname = '${jobName}'"""
+
+ // verify job transitions back to RUNNING
+ try {
+ Awaitility.await().atMost(60, SECONDS).pollInterval(1,
SECONDS).until({
+ def status = sql """select status from jobs("type"="insert")
where Name='${jobName}'"""
+ log.info("job status after resume: " + status)
+ status.size() == 1 && status.get(0).get(0) == "RUNNING"
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ throw ex
+ }
+
+ // ── Phase 8: wait for rows inserted while paused to appear
────────────────────
+ try {
+ Awaitility.await().atMost(120, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT count(1) FROM
${currentDb}.${dorisTable} WHERE name IN ('E1', 'F1')"""
+ log.info("rows inserted while paused: " + rows)
+ (rows.get(0).get(0) as int) == 2
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // wait for UPDATE on C1 to be applied (age=30 in Doris)
+ try {
+ Awaitility.await().atMost(60, SECONDS).pollInterval(2,
SECONDS).until({
+ def rows = sql """SELECT age FROM ${currentDb}.${dorisTable}
WHERE name = 'C1'"""
+ log.info("C1 age after resume: " + rows)
+ rows.size() == 1 && (rows.get(0).get(0) as int) == 30
+ })
+ } catch (Exception ex) {
+ log.info("job: " + (sql """select * from jobs("type"="insert")
where Name='${jobName}'"""))
+ log.info("tasks: " + (sql """select * from tasks("type"="insert")
where JobName='${jobName}'"""))
+ throw ex
+ }
+
+ // ── Phase 9: assert final correctness
─────────────────────────────────────────
+ qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY
name """
+
+ def jobInfoAfterResume = sql """select status, currentOffset,
FailedTaskCount, ErrorMsg
+ from jobs("type"="insert") where
Name='${jobName}'"""
+ log.info("job info after resume: " + jobInfoAfterResume)
+ assert jobInfoAfterResume.get(0).get(0) == "RUNNING" : "Job should be
RUNNING after resume"
+ def currentOffsetAfterResume = jobInfoAfterResume.get(0).get(1) as
String
+ assert currentOffsetAfterResume != null &&
!currentOffsetAfterResume.isEmpty() :
+ "currentOffset should be non-empty after resume"
+ assert (jobInfoAfterResume.get(0).get(2) as int) == 0 :
"FailedTaskCount should be 0 after resume"
+ def errorMsgAfterResume = jobInfoAfterResume.get(0).get(3) as String
+ assert errorMsgAfterResume == null || errorMsgAfterResume.isEmpty() :
+ "ErrorMsg should be empty after successful resume, got:
${errorMsgAfterResume}"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${dorisTable} force"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]