This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e025050cdb0 [fix](streaming-job) streaming CDC reliability fixes 
(scheduling latch, fetch-meta reason, PG db name, flaky case) (#64310)
e025050cdb0 is described below

commit e025050cdb01305c6080fb48c1bd1562565de996
Author: wudi <[email protected]>
AuthorDate: Wed Jun 17 10:46:57 2026 +0800

    [fix](streaming-job) streaming CDC reliability fixes (scheduling latch, 
fetch-meta reason, PG db name, flaky case) (#64310)
    
    ## Proposed changes
    
    Several independent reliability fixes for streaming CDC jobs.
    
    1. **Drop the 0-row latch in
    `JdbcSourceOffsetProvider.onTaskCommitted`.** Setting `hasMoreData =
    false` on a 0-row/0-byte commit was wrong: a 0-row task does not mean
    the source is caught up (e.g. a large upstream transaction is still
    buffered on the cdc_client side). The latch made the scheduler stop
    dispatching tasks and could freeze streaming. The interface default is a
    no-op, so the override is simply removed; `hasMoreData` is still
    advanced by `fetchRemoteMeta()` and `hasMoreDataToConsume()`.
    
    2. **Set the fetch-meta failure reason after pausing.** `fetchMeta()`
    set the failure reason and then paused. A concurrent task-success
    callback (which clears the reason under the job write lock) could wipe
    the freshly-set reason, leaving the job `PAUSED` with an empty reason
    and auto-resume disabled. Pausing first orders the reason update after
    any in-flight success callback, so the reason survives.
    
    3. **Reject an over-long PostgreSQL database name at CREATE JOB.**
    PostgreSQL truncates a database name longer than its identifier limit,
    but the replication-slot lookup compares the full configured name, so an
    existing slot looks missing and the job then fails creating a duplicate
    slot. The connection itself still succeeds (PG truncates at connect
    time), so no connectivity check catches it. A new config
    `streaming_pg_max_identifier_length` (default 63, adjustable for a
    larger NAMEDATALEN build) gates a fail-fast check.
    
    4. **Stabilize `test_streaming_postgres_job_special_offset`.** PAUSE
    cancels the FE task but the in-flight cdc_client reader keeps polling
    (up to `max_interval`) and could stream-load rows inserted right after
    the pause, defeating the ALTER-offset reposition. The test now
    deterministically drains the lingering reader before inserting the
    before-mark rows.
---
 .../main/java/org/apache/doris/common/Config.java  |  4 ++
 .../streaming/PostgresResourceValidator.java       | 17 ++++++++
 .../insert/streaming/StreamingInsertJob.java       |  6 +--
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |  7 ----
 .../streaming/PostgresResourceValidatorTest.java   | 45 ++++++++++++++++++++++
 ...st_streaming_postgres_job_special_offset.groovy | 31 +++++++++++++++
 6 files changed, 100 insertions(+), 10 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 92ba0b7d5ca..33432425a53 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1199,6 +1199,10 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int streaming_cdc_heavy_rpc_timeout_sec = 600;
 
+    // Max byte length of a PG database name for a CDC job; raise only for a 
larger NAMEDATALEN build.
+    @ConfField(mutable = true, masterOnly = true)
+    public static int streaming_pg_max_identifier_length = 63;
+
     @ConfField(mutable = true, masterOnly = true)
     public static int streaming_cdc_fetch_splits_batch_size = 100;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
index d35cda9fe41..ae6277b3028 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidator.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.common.DataSourceType;
@@ -25,6 +26,7 @@ import org.apache.doris.job.util.StreamingJobUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -44,6 +46,8 @@ public class PostgresResourceValidator {
 
     public static void validate(Map<String, String> sourceProperties, String 
jobId, List<String> tableNames)
             throws JobException {
+        // PG truncates an over-long db name, so the slot lookup never matches 
it; reject up front.
+        
checkDatabaseNameLength(sourceProperties.get(DataSourceConfigKeys.DATABASE));
         String slotName = resolveSlotName(sourceProperties, jobId);
         String publicationName = resolvePublicationName(sourceProperties, 
jobId);
         // Pattern-match ownership: name equals the default = Doris-owned 
(auto); otherwise user.
@@ -123,6 +127,19 @@ public class PostgresResourceValidator {
         return StringUtils.isNotBlank(name) ? name : 
DataSourceConfigKeys.defaultPublicationName(jobId);
     }
 
+    private static void checkDatabaseNameLength(String database) throws 
JobException {
+        if (StringUtils.isBlank(database)) {
+            return;
+        }
+        // PG measures the identifier limit in bytes (NAMEDATALEN-1), so 
compare encoded bytes.
+        int bytes = database.getBytes(StandardCharsets.UTF_8).length;
+        if (bytes > Config.streaming_pg_max_identifier_length) {
+            throw new JobException("database name '" + database + "' is " + 
bytes + " bytes, exceeding "
+                    + Config.streaming_pg_max_identifier_length + "; 
PostgreSQL truncates it and the"
+                    + " replication-slot lookup would fail.");
+        }
+    }
+
     private static boolean publicationExists(Connection conn, String 
publicationName) throws Exception {
         try (PreparedStatement ps = conn.prepareStatement("SELECT 1 FROM 
pg_publication WHERE pubname = ?")) {
             ps.setString(1, publicationName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 09090ba3ad1..be14f79fa34 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -709,12 +709,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                     || 
!InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
                 // When a job is manually paused, it does not need to be set 
again,
                 // otherwise, it may be woken up by auto resume.
+                // Pause before setting the reason: updateJobStatus's 
writeLock orders this after any
+                // task-success callback that clears failureReason, so a 
success can't wipe the reason.
+                this.updateJobStatus(JobStatus.PAUSED);
                 this.setFailureReason(
                         new 
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
                                 "Failed to fetch meta, " + ex.getMessage()));
-                // If fetching meta fails, the job is paused
-                // and auto resume will automatically wake it up.
-                this.updateJobStatus(JobStatus.PAUSED);
 
                 if (MetricRepo.isInit) {
                     
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 9b7d364895b..3cfe6734564 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -924,13 +924,6 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         return -1;
     }
 
-    @Override
-    public void onTaskCommitted(long scannedRows, long loadBytes) {
-        if (scannedRows == 0 && loadBytes == 0) {
-            hasMoreData = false;
-        }
-    }
-
     @Override
     public boolean hasReachedEnd() {
         if (!isSnapshotOnlyMode()) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidatorTest.java
new file mode 100644
index 00000000000..27e3b04b2cd
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/PostgresResourceValidatorTest.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.exception.JobException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PostgresResourceValidatorTest {
+
+    // A 22-char CJK database name is length()==22 but 66 bytes in UTF-8; PG 
truncates it to 63 bytes.
+    // The byte-based check must reject it before connecting (validate fails 
on the very first line).
+    @Test
+    public void testRejectMultibyteOverLongDatabaseName() {
+        String dbName = StringUtils.repeat("εΊ“", 22);
+        Assert.assertEquals(22, dbName.length());
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.DATABASE, dbName);
+        JobException e = Assert.assertThrows(JobException.class,
+                () -> PostgresResourceValidator.validate(props, "1", 
Collections.emptyList()));
+        Assert.assertTrue(e.getMessage(), e.getMessage().contains("bytes"));
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
index c4214145151..d9cf5886bcc 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
@@ -143,6 +143,37 @@ suite("test_streaming_postgres_job_special_offset", 
"p0,external,pg,external_doc
             def jobStatus = sql """select status from jobs("type"="insert") 
where Name='${jobName}'"""
             return jobStatus[0][0] == "PAUSED"
         })
+
+        // PAUSE cancels the FE task but does NOT stop the in-flight 
cdc_client reader started before
+        // the pause: it keeps polling up to max_interval (default 10s) and 
would stream-load whatever
+        // rows we insert next, defeating the ALTER-offset reposition (this is 
the historical source of
+        // flakiness). No new reader is dispatched while PAUSED, so drain the 
lingering one
+        // deterministically: insert a disposable probe row, then wait until 
the row count stays stable
+        // for a window longer than max_interval. An active reader would have 
loaded the probe within
+        // max_interval, so a stable window proves no reader is consuming 
anymore. Finally delete the
+        // probe from both sides so it never pollutes the before/after-mark 
assertions or the .out.
+        def probeId = 10
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES 
(${probeId}, 'drain_probe')"""
+        }
+        int stablePolls = 0
+        long lastCnt = -1L
+        Awaitility.await().atMost(120, SECONDS).pollInterval(3, 
SECONDS).until({
+            long c = sql("""SELECT count(*) FROM 
${currentDb}.${table1}""")[0][0] as long
+            if (c == lastCnt) {
+                stablePolls++
+            } else {
+                stablePolls = 0
+                lastCnt = c
+            }
+            // 5 * 3s = 15s stable > max_interval (10s) => the lingering 
reader has fully drained.
+            return stablePolls >= 5
+        })
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id = 
${probeId}"""
+        }
+        sql """DELETE FROM ${currentDb}.${table1} WHERE id = ${probeId}"""
+
         def alterLsn = ""
         connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
             // insert data BEFORE the LSN mark


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to