This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 35433413a56 [fix](streaming-job) misc fixes for 
typo/log/validation/visibility (#63480)
35433413a56 is described below

commit 35433413a560885fdccdaa9e90249e845b2ae1f5
Author: wudi <[email protected]>
AuthorDate: Tue May 26 16:11:13 2026 +0800

    [fix](streaming-job) misc fixes for typo/log/validation/visibility (#63480)
    
    ## Summary
    
    Small fixes accumulated during a code review of the streaming-job
    module. None of these change runtime semantics for currently-supported
    scenarios; they tighten validation, fix a typo, and harden a concurrent
    field.
    
    - `StreamingInsertJob.cleanup()`: log was filling `dbId=%s` with
    `getJobId()`. Fix to `getDbId()`.
    - `StreamingInsertJob.getShowSQL()`: typo `TO DATABSE` → `TO DATABASE`
    in the synthesized SQL shown by SHOW JOBS.
    - `JdbcSourceOffsetProvider.currentOffset`: mark `volatile`. The write
    in `updateOffset()` happens outside `splitsLock` while
    `hasMoreDataToConsume()` / `getShowCurrentOffset()` read it without
    holding the lock.
    - `DataSourceConfigValidator.isValidValue`: `snapshot_split_size` /
    `snapshot_parallelism` previously only checked null/empty. Reject
    non-numeric and non-positive values up front so users get a clear
    CREATE-time error instead of a context-less `NumberFormatException` at
    runtime.
    - `CdcStreamTableValuedFunction.validate`: previously only checked
    `jdbc_url` / `type` / `table` / `offset`. Now also rejects missing
    `database` (MySQL), missing `schema` (PostgreSQL), and unsupported
    `type` values, so the error surfaces at TVF parse time rather than later
    inside `getRemoteDbName`.
---
 .../streaming/DataSourceConfigValidator.java       | 45 +++++++++---
 .../insert/streaming/StreamingInsertJob.java       |  4 +-
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  | 11 +--
 .../offset/jdbc/JdbcTvfSourceOffsetProvider.java   | 13 ++--
 .../CdcStreamTableValuedFunction.java              | 68 ++++++++++++++++--
 .../streaming/DataSourceConfigValidatorTest.java   | 52 ++++++++++++++
 .../cdc/tvf/test_cdc_stream_tvf_mysql.groovy       | 83 +++++++++++++++++++++-
 .../cdc/tvf/test_cdc_stream_tvf_postgres.groovy    | 32 +++++++++
 8 files changed, 282 insertions(+), 26 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index f37cbbff5dd..1b633605d71 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -109,8 +109,12 @@ public class DataSourceConfigValidator {
             }
         }
 
-        // Cross-field: verify-ca must be paired with a CA cert; otherwise the 
reader will
-        // silently fall back to the JVM default truststore and likely fail to 
connect.
+        validateSslVerifyCaPair(input);
+    }
+
+    // Cross-field: verify-ca must be paired with a CA cert; otherwise the 
reader will
+    // silently fall back to the JVM default truststore and likely fail to 
connect.
+    public static void validateSslVerifyCaPair(Map<String, String> input) 
throws IllegalArgumentException {
         if 
(DataSourceConfigKeys.SSL_MODE_VERIFY_CA.equals(input.get(DataSourceConfigKeys.SSL_MODE))
                 && (input.get(DataSourceConfigKeys.SSL_ROOTCERT) == null
                         || 
input.get(DataSourceConfigKeys.SSL_ROOTCERT).trim().isEmpty())) {
@@ -153,19 +157,44 @@ public class DataSourceConfigValidator {
             return isValidOffset(value, dataSourceType);
         }
 
-        // slot_name / publication_name are interpolated into PG DDL without 
quoting,
-        // so enforce unquoted-identifier grammar to prevent injection and 
runtime errors.
         if (key.equals(DataSourceConfigKeys.SLOT_NAME)
                 || key.equals(DataSourceConfigKeys.PUBLICATION_NAME)) {
-            return value.length() <= PG_MAX_IDENTIFIER_LENGTH
-                    && PG_IDENTIFIER_PATTERN.matcher(value).matches();
+            return isValidPgIdentifier(value);
         }
-        if (key.equals(DataSourceConfigKeys.SSL_MODE) && 
!ALLOW_SSL_MODES.contains(value)) {
-            return false;
+        if (key.equals(DataSourceConfigKeys.SSL_MODE)) {
+            return isValidSslMode(value);
+        }
+        if (key.equals(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)
+                || key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
+            return isPositiveInt(value);
         }
         return true;
     }
 
+    public static boolean isPositiveInt(String value) {
+        if (value == null) {
+            return false;
+        }
+        try {
+            return Integer.parseInt(value) > 0;
+        } catch (NumberFormatException e) {
+            return false;
+        }
+    }
+
+    // slot_name / publication_name are interpolated into PG DDL without 
quoting,
+    // so enforce unquoted-identifier grammar to prevent injection and runtime 
errors.
+    public static boolean isValidPgIdentifier(String value) {
+        return value != null
+                && !value.isEmpty()
+                && value.length() <= PG_MAX_IDENTIFIER_LENGTH
+                && PG_IDENTIFIER_PATTERN.matcher(value).matches();
+    }
+
+    public static boolean isValidSslMode(String value) {
+        return ALLOW_SSL_MODES.contains(value);
+    }
+
     /**
      * Check if the offset value is valid for the given data source type.
      * Supported: initial, snapshot, latest, JSON binlog/lsn position.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 060901125c6..55c761a5d42 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1108,7 +1108,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         .append("'='").append(entry.getValue()).append("',");
             }
             sb.deleteCharAt(sb.length() - 1);
-            sb.append(" ) TO DATABSE ").append(targetDb);
+            sb.append(" ) TO DATABASE ").append(targetDb);
             if (!targetProperties.isEmpty()) {
                 sb.append(" (");
                 for (Map.Entry<String, String> entry : 
targetProperties.entrySet()) {
@@ -1554,7 +1554,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
                     log.warn("failed to delete streaming job, response: {}", 
resp);
                     throw new JobException("deleteJobKey failed for jobId=%s, 
dbId=%s, status=%s",
-                            getJobId(), getJobId(), resp.getStatus());
+                            getJobId(), getDbId(), resp.getStatus());
                 }
             } catch (RpcException e) {
                 log.warn("failed to delete streaming job {}", resp, e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 9cd5ad3fefa..9b7d364895b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -87,7 +87,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     List<SnapshotSplit> remainingSplits = new ArrayList<>();
     List<SnapshotSplit> finishedSplits = new ArrayList<>();
 
-    JdbcOffset currentOffset;
+    volatile JdbcOffset currentOffset;
     Map<String, String> endBinlogOffset;
 
     @SerializedName("chw")
@@ -213,10 +213,10 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
 
     @Override
     public void updateOffset(Offset offset) {
-        this.currentOffset = (JdbcOffset) offset;
-        if (currentOffset.snapshotSplit()) {
+        JdbcOffset newOffset = (JdbcOffset) offset;
+        if (newOffset.snapshotSplit()) {
             synchronized (splitsLock) {
-                List<? extends AbstractSourceSplit> splits = 
currentOffset.getSplits();
+                List<? extends AbstractSourceSplit> splits = 
newOffset.getSplits();
                 for (AbstractSourceSplit split : splits) {
                     SnapshotSplit snapshotSplit = (SnapshotSplit) split;
                     String splitId = split.getSplitId();
@@ -247,10 +247,11 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
                 }
             }
         } else {
-            BinlogSplit binlogSplit = (BinlogSplit) 
currentOffset.getSplits().get(0);
+            BinlogSplit binlogSplit = (BinlogSplit) 
newOffset.getSplits().get(0);
             binlogOffsetPersist = new 
HashMap<>(binlogSplit.getStartingOffset());
             binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
         }
+        this.currentOffset = newOffset;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
index 9501f62df4f..6cbd2e63728 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
@@ -260,15 +260,15 @@ public class JdbcTvfSourceOffsetProvider extends 
JdbcSourceOffsetProvider {
      * adds it to finishedSplits. During txn replay remainingSplits is empty 
so removeIf returns
      * false naturally — chunkHighWatermarkMap is still updated for 
replayIfNeed to use later.
      *
-     * <p>Binlog: currentOffset is set above. Also mirror startingOffset into 
binlogOffsetPersist
-     * so it survives FE checkpoint via image (currentOffset has no 
@SerializedName).
+     * <p>Binlog: mirror startingOffset into binlogOffsetPersist so it 
survives FE checkpoint via
+     * image (currentOffset has no @SerializedName).
      */
     @Override
     public void updateOffset(Offset offset) {
-        this.currentOffset = (JdbcOffset) offset;
-        if (currentOffset.snapshotSplit()) {
+        JdbcOffset newOffset = (JdbcOffset) offset;
+        if (newOffset.snapshotSplit()) {
             synchronized (splitsLock) {
-                for (AbstractSourceSplit split : currentOffset.getSplits()) {
+                for (AbstractSourceSplit split : newOffset.getSplits()) {
                     SnapshotSplit ss = (SnapshotSplit) split;
                     boolean removed = remainingSplits.removeIf(v -> {
                         if (v.getSplitId().equals(ss.getSplitId())) {
@@ -289,12 +289,13 @@ public class JdbcTvfSourceOffsetProvider extends 
JdbcSourceOffsetProvider {
             }
         } else {
             // Mirror binlog offset into bop so it survives FE checkpoint
-            BinlogSplit bs = (BinlogSplit) currentOffset.getSplits().get(0);
+            BinlogSplit bs = (BinlogSplit) newOffset.getSplits().get(0);
             if (MapUtils.isNotEmpty(bs.getStartingOffset())) {
                 binlogOffsetPersist = new HashMap<>(bs.getStartingOffset());
                 binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
             }
         }
+        this.currentOffset = newOffset;
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index 24031de5e9b..885a3ec5a33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -25,6 +25,7 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.common.DataSourceType;
+import 
org.apache.doris.job.extensions.insert.streaming.DataSourceConfigValidator;
 import org.apache.doris.job.util.StreamingJobUtils;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TFileType;
@@ -106,18 +107,77 @@ public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunctio
     }
 
     private void validate(Map<String, String> properties) throws 
AnalysisException {
-        if (!properties.containsKey(DataSourceConfigKeys.JDBC_URL)) {
+        if 
(StringUtils.isEmpty(properties.get(DataSourceConfigKeys.JDBC_URL))) {
             throw new AnalysisException("jdbc_url is required");
         }
-        if (!properties.containsKey(DataSourceConfigKeys.TYPE)) {
+        if (StringUtils.isEmpty(properties.get(DataSourceConfigKeys.TYPE))) {
             throw new AnalysisException("type is required");
         }
-        if (!properties.containsKey(DataSourceConfigKeys.TABLE)) {
+        if (StringUtils.isEmpty(properties.get(DataSourceConfigKeys.TABLE))) {
             throw new AnalysisException("table is required");
         }
-        if (!properties.containsKey(DataSourceConfigKeys.OFFSET)) {
+        if (StringUtils.isEmpty(properties.get(DataSourceConfigKeys.OFFSET))) {
             throw new AnalysisException("offset is required");
         }
+        DataSourceType sourceType;
+        try {
+            sourceType = 
DataSourceType.valueOf(properties.get(DataSourceConfigKeys.TYPE).toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new AnalysisException("Unsupported type: " + 
properties.get(DataSourceConfigKeys.TYPE));
+        }
+        switch (sourceType) {
+            case MYSQL:
+                if 
(StringUtils.isEmpty(properties.get(DataSourceConfigKeys.DATABASE))) {
+                    throw new AnalysisException("database is required for 
MySQL");
+                }
+                break;
+            case POSTGRES:
+                if 
(StringUtils.isEmpty(properties.get(DataSourceConfigKeys.SCHEMA))) {
+                    throw new AnalysisException("schema is required for 
PostgreSQL");
+                }
+                validatePgIdentifierIfPresent(properties, 
DataSourceConfigKeys.SLOT_NAME);
+                validatePgIdentifierIfPresent(properties, 
DataSourceConfigKeys.PUBLICATION_NAME);
+                break;
+            default:
+                throw new AnalysisException("Unsupported type: " + sourceType);
+        }
+        String offset = properties.get(DataSourceConfigKeys.OFFSET);
+        if (!DataSourceConfigValidator.isValidOffset(offset, 
sourceType.name())) {
+            throw new AnalysisException("Invalid value for key 'offset': " + 
offset);
+        }
+        String sslMode = properties.get(DataSourceConfigKeys.SSL_MODE);
+        if (sslMode != null && 
!DataSourceConfigValidator.isValidSslMode(sslMode)) {
+            throw new AnalysisException("Invalid value for key 'ssl_mode': " + 
sslMode);
+        }
+        try {
+            DataSourceConfigValidator.validateSslVerifyCaPair(properties);
+        } catch (IllegalArgumentException e) {
+            throw new AnalysisException(e.getMessage());
+        }
+        validatePositiveIntIfPresent(properties, 
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
+        validatePositiveIntIfPresent(properties, 
DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
+    }
+
+    private static void validatePositiveIntIfPresent(Map<String, String> 
properties, String key)
+            throws AnalysisException {
+        String value = properties.get(key);
+        if (value == null) {
+            return;
+        }
+        if (!DataSourceConfigValidator.isPositiveInt(value)) {
+            throw new AnalysisException("Invalid value for key '" + key + "': 
" + value);
+        }
+    }
+
+    private static void validatePgIdentifierIfPresent(Map<String, String> 
properties, String key)
+            throws AnalysisException {
+        String value = properties.get(key);
+        if (value == null) {
+            return;
+        }
+        if (!DataSourceConfigValidator.isValidPgIdentifier(value)) {
+            throw new AnalysisException("Invalid value for key '" + key + "': 
" + value);
+        }
     }
 
     private void generateFileStatus() {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
index ce570f440a8..769517d9f65 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
@@ -224,4 +224,56 @@ public class DataSourceConfigValidatorTest {
             DataSourceConfigValidator.validateSource(props, 
DataSourceType.POSTGRES.name());
         }
     }
+
+    @Test
+    public void testSnapshotSplitSizeAcceptsPositiveInteger() {
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+        props.put(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, "8192");
+        DataSourceConfigValidator.validateSource(props, 
DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testSnapshotSplitSizeRejectsNonNumeric() {
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+        props.put(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, "abc");
+        assertReject(props);
+    }
+
+    @Test
+    public void testSnapshotSplitSizeRejectsZeroAndNegative() {
+        for (String v : new String[]{"0", "-1"}) {
+            Map<String, String> props = new HashMap<>();
+            props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+            props.put(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, v);
+            assertReject(props);
+        }
+    }
+
+    @Test
+    public void testSnapshotParallelismAcceptsPositiveInteger() {
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+        props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "4");
+        DataSourceConfigValidator.validateSource(props, 
DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testSnapshotParallelismRejectsNonNumeric() {
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+        props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "abc");
+        assertReject(props);
+    }
+
+    @Test
+    public void testSnapshotParallelismRejectsZeroAndNegative() {
+        for (String v : new String[]{"0", "-2"}) {
+            Map<String, String> props = new HashMap<>();
+            props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+            props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, v);
+            assertReject(props);
+        }
+    }
 }
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
index 0536a5b3bf5..ba6f0eacac3 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
@@ -57,6 +57,87 @@ suite("test_cdc_stream_tvf_mysql", 
"p0,external,mysql,external_docker,external_d
             exception "offset is required"
         }
 
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "table" = "t1",
+                "offset" = "latest")"""
+            exception "database is required for MySQL"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "unknown_db",
+                "jdbc_url" = "jdbc:foo://localhost:3306",
+                "table" = "t1",
+                "offset" = "latest")"""
+            exception "Unsupported type"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "table" = "t1",
+                "offset" = "latest")"""
+            exception "type is required"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "database" = "db1",
+                "table" = "t1",
+                "offset" = "latest",
+                "snapshot_split_size" = "abc")"""
+            exception "Invalid value for key 'snapshot_split_size'"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "database" = "db1",
+                "table" = "t1",
+                "offset" = "latest",
+                "snapshot_parallelism" = "0")"""
+            exception "Invalid value for key 'snapshot_parallelism'"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "database" = "db1",
+                "table" = "t1",
+                "offset" = "abc")"""
+            exception "Invalid value for key 'offset'"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "database" = "db1",
+                "table" = "t1",
+                "offset" = "latest",
+                "ssl_mode" = "bogus")"""
+            exception "Invalid value for key 'ssl_mode'"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "database" = "db1",
+                "table" = "t1",
+                "offset" = "latest",
+                "ssl_mode" = "verify-ca")"""
+            exception "ssl_mode 'verify-ca' requires ssl_rootcert to be set"
+        }
+
         // --- Data setup ---
 
         connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
@@ -161,7 +242,7 @@ suite("test_cdc_stream_tvf_mysql", 
"p0,external,mysql,external_docker,external_d
                 "database" = "${mysqlDb}",
                 "table" = "${table1}",
                 "offset" = 'notjson')"""
-            exception "Unsupported offset: notjson"
+            exception "Invalid value for key 'offset'"
         }
 
         // --- Non-existent table ---
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
index ced310a539e..18d331cc34c 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
@@ -31,6 +31,38 @@ suite("test_cdc_stream_tvf_postgres", 
"p0,external,pg,external_docker,external_d
         String bucket = getS3BucketName()
         String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
 
+        // --- Validation error tests (no JDBC connection needed) ---
+        test {
+            sql """select * from cdc_stream(
+                "type" = "postgres",
+                "jdbc_url" = "jdbc:postgresql://localhost:5432/db",
+                "table" = "t1",
+                "offset" = "latest")"""
+            exception "schema is required for PostgreSQL"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "postgres",
+                "jdbc_url" = "jdbc:postgresql://localhost:5432/db",
+                "schema" = "public",
+                "table" = "t1",
+                "offset" = "latest",
+                "slot_name" = "bad-name;drop")"""
+            exception "Invalid value for key 'slot_name'"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "postgres",
+                "jdbc_url" = "jdbc:postgresql://localhost:5432/db",
+                "schema" = "public",
+                "table" = "t1",
+                "offset" = "latest",
+                "publication_name" = "Bad Name")"""
+            exception "Invalid value for key 'publication_name'"
+        }
+
         // create test
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to