This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 35433413a56 [fix](streaming-job) misc fixes for
typo/log/validation/visibility (#63480)
35433413a56 is described below
commit 35433413a560885fdccdaa9e90249e845b2ae1f5
Author: wudi <[email protected]>
AuthorDate: Tue May 26 16:11:13 2026 +0800
[fix](streaming-job) misc fixes for typo/log/validation/visibility (#63480)
## Summary
Small fixes accumulated during a code review of the streaming-job
module. None of these change runtime semantics for currently-supported
scenarios; they tighten validation, fix a typo, and harden a concurrent
field.
- `StreamingInsertJob.cleanup()`: log was filling `dbId=%s` with
`getJobId()`. Fix to `getDbId()`.
- `StreamingInsertJob.getShowSQL()`: typo `TO DATABSE` → `TO DATABASE`
in the synthesized SQL shown by SHOW JOBS.
- `JdbcSourceOffsetProvider.currentOffset`: mark `volatile`. The write
in `updateOffset()` happens outside `splitsLock` while
`hasMoreDataToConsume()` / `getShowCurrentOffset()` read it without
holding the lock.
- `DataSourceConfigValidator.isValidValue`: `snapshot_split_size` /
`snapshot_parallelism` previously only checked null/empty. Reject
non-numeric and non-positive values up front so users get a clear
CREATE-time error instead of a context-less `NumberFormatException` at
runtime.
- `CdcStreamTableValuedFunction.validate`: previously only checked
`jdbc_url` / `type` / `table` / `offset`. Now also rejects missing
`database` (MySQL), missing `schema` (PostgreSQL), and unsupported
`type` values, so the error surfaces at TVF parse time rather than later
inside `getRemoteDbName`.
---
.../streaming/DataSourceConfigValidator.java | 45 +++++++++---
.../insert/streaming/StreamingInsertJob.java | 4 +-
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 11 +--
.../offset/jdbc/JdbcTvfSourceOffsetProvider.java | 13 ++--
.../CdcStreamTableValuedFunction.java | 68 ++++++++++++++++--
.../streaming/DataSourceConfigValidatorTest.java | 52 ++++++++++++++
.../cdc/tvf/test_cdc_stream_tvf_mysql.groovy | 83 +++++++++++++++++++++-
.../cdc/tvf/test_cdc_stream_tvf_postgres.groovy | 32 +++++++++
8 files changed, 282 insertions(+), 26 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index f37cbbff5dd..1b633605d71 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -109,8 +109,12 @@ public class DataSourceConfigValidator {
}
}
- // Cross-field: verify-ca must be paired with a CA cert; otherwise the
reader will
- // silently fall back to the JVM default truststore and likely fail to
connect.
+ validateSslVerifyCaPair(input);
+ }
+
+ // Cross-field: verify-ca must be paired with a CA cert; otherwise the
reader will
+ // silently fall back to the JVM default truststore and likely fail to
connect.
+ public static void validateSslVerifyCaPair(Map<String, String> input)
throws IllegalArgumentException {
if
(DataSourceConfigKeys.SSL_MODE_VERIFY_CA.equals(input.get(DataSourceConfigKeys.SSL_MODE))
&& (input.get(DataSourceConfigKeys.SSL_ROOTCERT) == null
||
input.get(DataSourceConfigKeys.SSL_ROOTCERT).trim().isEmpty())) {
@@ -153,19 +157,44 @@ public class DataSourceConfigValidator {
return isValidOffset(value, dataSourceType);
}
- // slot_name / publication_name are interpolated into PG DDL without
quoting,
- // so enforce unquoted-identifier grammar to prevent injection and
runtime errors.
if (key.equals(DataSourceConfigKeys.SLOT_NAME)
|| key.equals(DataSourceConfigKeys.PUBLICATION_NAME)) {
- return value.length() <= PG_MAX_IDENTIFIER_LENGTH
- && PG_IDENTIFIER_PATTERN.matcher(value).matches();
+ return isValidPgIdentifier(value);
}
- if (key.equals(DataSourceConfigKeys.SSL_MODE) &&
!ALLOW_SSL_MODES.contains(value)) {
- return false;
+ if (key.equals(DataSourceConfigKeys.SSL_MODE)) {
+ return isValidSslMode(value);
+ }
+ if (key.equals(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)
+ || key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
+ return isPositiveInt(value);
}
return true;
}
+ public static boolean isPositiveInt(String value) {
+ if (value == null) {
+ return false;
+ }
+ try {
+ return Integer.parseInt(value) > 0;
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ }
+
+ // slot_name / publication_name are interpolated into PG DDL without
quoting,
+ // so enforce unquoted-identifier grammar to prevent injection and runtime
errors.
+ public static boolean isValidPgIdentifier(String value) {
+ return value != null
+ && !value.isEmpty()
+ && value.length() <= PG_MAX_IDENTIFIER_LENGTH
+ && PG_IDENTIFIER_PATTERN.matcher(value).matches();
+ }
+
+ public static boolean isValidSslMode(String value) {
+ return ALLOW_SSL_MODES.contains(value);
+ }
+
/**
* Check if the offset value is valid for the given data source type.
* Supported: initial, snapshot, latest, JSON binlog/lsn position.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 060901125c6..55c761a5d42 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1108,7 +1108,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
.append("'='").append(entry.getValue()).append("',");
}
sb.deleteCharAt(sb.length() - 1);
- sb.append(" ) TO DATABSE ").append(targetDb);
+ sb.append(" ) TO DATABASE ").append(targetDb);
if (!targetProperties.isEmpty()) {
sb.append(" (");
for (Map.Entry<String, String> entry :
targetProperties.entrySet()) {
@@ -1554,7 +1554,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
log.warn("failed to delete streaming job, response: {}",
resp);
throw new JobException("deleteJobKey failed for jobId=%s,
dbId=%s, status=%s",
- getJobId(), getJobId(), resp.getStatus());
+ getJobId(), getDbId(), resp.getStatus());
}
} catch (RpcException e) {
log.warn("failed to delete streaming job {}", resp, e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 9cd5ad3fefa..9b7d364895b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -87,7 +87,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
List<SnapshotSplit> remainingSplits = new ArrayList<>();
List<SnapshotSplit> finishedSplits = new ArrayList<>();
- JdbcOffset currentOffset;
+ volatile JdbcOffset currentOffset;
Map<String, String> endBinlogOffset;
@SerializedName("chw")
@@ -213,10 +213,10 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
@Override
public void updateOffset(Offset offset) {
- this.currentOffset = (JdbcOffset) offset;
- if (currentOffset.snapshotSplit()) {
+ JdbcOffset newOffset = (JdbcOffset) offset;
+ if (newOffset.snapshotSplit()) {
synchronized (splitsLock) {
- List<? extends AbstractSourceSplit> splits =
currentOffset.getSplits();
+ List<? extends AbstractSourceSplit> splits =
newOffset.getSplits();
for (AbstractSourceSplit split : splits) {
SnapshotSplit snapshotSplit = (SnapshotSplit) split;
String splitId = split.getSplitId();
@@ -247,10 +247,11 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
}
}
} else {
- BinlogSplit binlogSplit = (BinlogSplit)
currentOffset.getSplits().get(0);
+ BinlogSplit binlogSplit = (BinlogSplit)
newOffset.getSplits().get(0);
binlogOffsetPersist = new
HashMap<>(binlogSplit.getStartingOffset());
binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
}
+ this.currentOffset = newOffset;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
index 9501f62df4f..6cbd2e63728 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java
@@ -260,15 +260,15 @@ public class JdbcTvfSourceOffsetProvider extends
JdbcSourceOffsetProvider {
* adds it to finishedSplits. During txn replay remainingSplits is empty
so removeIf returns
* false naturally — chunkHighWatermarkMap is still updated for
replayIfNeed to use later.
*
- * <p>Binlog: currentOffset is set above. Also mirror startingOffset into
binlogOffsetPersist
- * so it survives FE checkpoint via image (currentOffset has no
@SerializedName).
+ * <p>Binlog: mirror startingOffset into binlogOffsetPersist so it
survives FE checkpoint via
+ * image (currentOffset has no @SerializedName).
*/
@Override
public void updateOffset(Offset offset) {
- this.currentOffset = (JdbcOffset) offset;
- if (currentOffset.snapshotSplit()) {
+ JdbcOffset newOffset = (JdbcOffset) offset;
+ if (newOffset.snapshotSplit()) {
synchronized (splitsLock) {
- for (AbstractSourceSplit split : currentOffset.getSplits()) {
+ for (AbstractSourceSplit split : newOffset.getSplits()) {
SnapshotSplit ss = (SnapshotSplit) split;
boolean removed = remainingSplits.removeIf(v -> {
if (v.getSplitId().equals(ss.getSplitId())) {
@@ -289,12 +289,13 @@ public class JdbcTvfSourceOffsetProvider extends
JdbcSourceOffsetProvider {
}
} else {
// Mirror binlog offset into bop so it survives FE checkpoint
- BinlogSplit bs = (BinlogSplit) currentOffset.getSplits().get(0);
+ BinlogSplit bs = (BinlogSplit) newOffset.getSplits().get(0);
if (MapUtils.isNotEmpty(bs.getStartingOffset())) {
binlogOffsetPersist = new HashMap<>(bs.getStartingOffset());
binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
}
}
+ this.currentOffset = newOffset;
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index 24031de5e9b..885a3ec5a33 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -25,6 +25,7 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.common.DataSourceType;
+import
org.apache.doris.job.extensions.insert.streaming.DataSourceConfigValidator;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileType;
@@ -106,18 +107,77 @@ public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunctio
}
private void validate(Map<String, String> properties) throws
AnalysisException {
- if (!properties.containsKey(DataSourceConfigKeys.JDBC_URL)) {
+ if
(StringUtils.isEmpty(properties.get(DataSourceConfigKeys.JDBC_URL))) {
throw new AnalysisException("jdbc_url is required");
}
- if (!properties.containsKey(DataSourceConfigKeys.TYPE)) {
+ if (StringUtils.isEmpty(properties.get(DataSourceConfigKeys.TYPE))) {
throw new AnalysisException("type is required");
}
- if (!properties.containsKey(DataSourceConfigKeys.TABLE)) {
+ if (StringUtils.isEmpty(properties.get(DataSourceConfigKeys.TABLE))) {
throw new AnalysisException("table is required");
}
- if (!properties.containsKey(DataSourceConfigKeys.OFFSET)) {
+ if (StringUtils.isEmpty(properties.get(DataSourceConfigKeys.OFFSET))) {
throw new AnalysisException("offset is required");
}
+ DataSourceType sourceType;
+ try {
+ sourceType =
DataSourceType.valueOf(properties.get(DataSourceConfigKeys.TYPE).toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new AnalysisException("Unsupported type: " +
properties.get(DataSourceConfigKeys.TYPE));
+ }
+ switch (sourceType) {
+ case MYSQL:
+ if
(StringUtils.isEmpty(properties.get(DataSourceConfigKeys.DATABASE))) {
+ throw new AnalysisException("database is required for
MySQL");
+ }
+ break;
+ case POSTGRES:
+ if
(StringUtils.isEmpty(properties.get(DataSourceConfigKeys.SCHEMA))) {
+ throw new AnalysisException("schema is required for
PostgreSQL");
+ }
+ validatePgIdentifierIfPresent(properties,
DataSourceConfigKeys.SLOT_NAME);
+ validatePgIdentifierIfPresent(properties,
DataSourceConfigKeys.PUBLICATION_NAME);
+ break;
+ default:
+ throw new AnalysisException("Unsupported type: " + sourceType);
+ }
+ String offset = properties.get(DataSourceConfigKeys.OFFSET);
+ if (!DataSourceConfigValidator.isValidOffset(offset,
sourceType.name())) {
+ throw new AnalysisException("Invalid value for key 'offset': " +
offset);
+ }
+ String sslMode = properties.get(DataSourceConfigKeys.SSL_MODE);
+ if (sslMode != null &&
!DataSourceConfigValidator.isValidSslMode(sslMode)) {
+ throw new AnalysisException("Invalid value for key 'ssl_mode': " +
sslMode);
+ }
+ try {
+ DataSourceConfigValidator.validateSslVerifyCaPair(properties);
+ } catch (IllegalArgumentException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+ validatePositiveIntIfPresent(properties,
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
+ validatePositiveIntIfPresent(properties,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
+ }
+
+ private static void validatePositiveIntIfPresent(Map<String, String>
properties, String key)
+ throws AnalysisException {
+ String value = properties.get(key);
+ if (value == null) {
+ return;
+ }
+ if (!DataSourceConfigValidator.isPositiveInt(value)) {
+ throw new AnalysisException("Invalid value for key '" + key + "':
" + value);
+ }
+ }
+
+ private static void validatePgIdentifierIfPresent(Map<String, String>
properties, String key)
+ throws AnalysisException {
+ String value = properties.get(key);
+ if (value == null) {
+ return;
+ }
+ if (!DataSourceConfigValidator.isValidPgIdentifier(value)) {
+ throw new AnalysisException("Invalid value for key '" + key + "':
" + value);
+ }
}
private void generateFileStatus() {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
index ce570f440a8..769517d9f65 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
@@ -224,4 +224,56 @@ public class DataSourceConfigValidatorTest {
DataSourceConfigValidator.validateSource(props,
DataSourceType.POSTGRES.name());
}
}
+
+ @Test
+ public void testSnapshotSplitSizeAcceptsPositiveInteger() {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+ props.put(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, "8192");
+ DataSourceConfigValidator.validateSource(props,
DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testSnapshotSplitSizeRejectsNonNumeric() {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+ props.put(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, "abc");
+ assertReject(props);
+ }
+
+ @Test
+ public void testSnapshotSplitSizeRejectsZeroAndNegative() {
+ for (String v : new String[]{"0", "-1"}) {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+ props.put(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE, v);
+ assertReject(props);
+ }
+ }
+
+ @Test
+ public void testSnapshotParallelismAcceptsPositiveInteger() {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+ props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "4");
+ DataSourceConfigValidator.validateSource(props,
DataSourceType.MYSQL.name());
+ }
+
+ @Test
+ public void testSnapshotParallelismRejectsNonNumeric() {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+ props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "abc");
+ assertReject(props);
+ }
+
+ @Test
+ public void testSnapshotParallelismRejectsZeroAndNegative() {
+ for (String v : new String[]{"0", "-2"}) {
+ Map<String, String> props = new HashMap<>();
+ props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:mysql://host/db");
+ props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, v);
+ assertReject(props);
+ }
+ }
}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
index 0536a5b3bf5..ba6f0eacac3 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
@@ -57,6 +57,87 @@ suite("test_cdc_stream_tvf_mysql",
"p0,external,mysql,external_docker,external_d
exception "offset is required"
}
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "table" = "t1",
+ "offset" = "latest")"""
+ exception "database is required for MySQL"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "unknown_db",
+ "jdbc_url" = "jdbc:foo://localhost:3306",
+ "table" = "t1",
+ "offset" = "latest")"""
+ exception "Unsupported type"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "table" = "t1",
+ "offset" = "latest")"""
+ exception "type is required"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "database" = "db1",
+ "table" = "t1",
+ "offset" = "latest",
+ "snapshot_split_size" = "abc")"""
+ exception "Invalid value for key 'snapshot_split_size'"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "database" = "db1",
+ "table" = "t1",
+ "offset" = "latest",
+ "snapshot_parallelism" = "0")"""
+ exception "Invalid value for key 'snapshot_parallelism'"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "database" = "db1",
+ "table" = "t1",
+ "offset" = "abc")"""
+ exception "Invalid value for key 'offset'"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "database" = "db1",
+ "table" = "t1",
+ "offset" = "latest",
+ "ssl_mode" = "bogus")"""
+ exception "Invalid value for key 'ssl_mode'"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "database" = "db1",
+ "table" = "t1",
+ "offset" = "latest",
+ "ssl_mode" = "verify-ca")"""
+ exception "ssl_mode 'verify-ca' requires ssl_rootcert to be set"
+ }
+
// --- Data setup ---
connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
@@ -161,7 +242,7 @@ suite("test_cdc_stream_tvf_mysql",
"p0,external,mysql,external_docker,external_d
"database" = "${mysqlDb}",
"table" = "${table1}",
"offset" = 'notjson')"""
- exception "Unsupported offset: notjson"
+ exception "Invalid value for key 'offset'"
}
// --- Non-existent table ---
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
index ced310a539e..18d331cc34c 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
@@ -31,6 +31,38 @@ suite("test_cdc_stream_tvf_postgres",
"p0,external,pg,external_docker,external_d
String bucket = getS3BucketName()
String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+ // --- Validation error tests (no JDBC connection needed) ---
+ test {
+ sql """select * from cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" = "jdbc:postgresql://localhost:5432/db",
+ "table" = "t1",
+ "offset" = "latest")"""
+ exception "schema is required for PostgreSQL"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" = "jdbc:postgresql://localhost:5432/db",
+ "schema" = "public",
+ "table" = "t1",
+ "offset" = "latest",
+ "slot_name" = "bad-name;drop")"""
+ exception "Invalid value for key 'slot_name'"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" = "jdbc:postgresql://localhost:5432/db",
+ "schema" = "public",
+ "table" = "t1",
+ "offset" = "latest",
+ "publication_name" = "Bad Name")"""
+ exception "Invalid value for key 'publication_name'"
+ }
+
// create test
connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
// sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]