This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 004d7408119 branch-4.1: [fix](streamingjob) fix postgres DML silently 
dropped on task restart #61481 (#61564)
004d7408119 is described below

commit 004d74081199b55ade0ade9b42913d6f290741e8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Mar 21 10:46:48 2026 +0800

    branch-4.1: [fix](streamingjob) fix postgres DML silently dropped on task 
restart #61481 (#61564)
    
    Cherry-picked from #61481
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingMultiTblTask.java      |  2 +-
 .../source/reader/postgres/PostgresSourceReader.java | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 45d2cf2ffbb..32526f9c513 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -357,7 +357,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
                 log.warn("Failed to get task timeout reason, response: {}", 
response);
             }
         } catch (ExecutionException | InterruptedException ex) {
-            log.error("Send get task fail reason request failed: ", ex);
+            log.warn("Send get task fail reason request failed: ", ex);
         }
         return "";
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 737e36045d9..6a5670ad6de 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -64,6 +64,7 @@ import java.util.Properties;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.PostgresOffsetContext;
 import io.debezium.connector.postgresql.SourceInfo;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
@@ -430,6 +431,25 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
     }
 
+    /**
+     * Strip lsn_proc and lsn_commit from the binlog state offset before it is 
passed to debezium's
+     * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1, 
used by debezium 1.9.x
+     * even on PG14), BEGIN and DML messages within a transaction share the 
same XLogData.data_start
+     * as the transaction's begin_lsn. When begin_lsn equals the previous 
transaction's commit_lsn
+     * (i.e. no other WAL write exists between them), WalPositionLocator adds 
that lsn to lsnSeen
+     * during the find phase and then incorrectly filters the DML as 
already-processed during actual
+     * streaming. Removing these keys sets lastCommitStoredLsn=null, so the 
find phase exits
+     * immediately at the first received message and switch-off happens before 
any DML is filtered.
+     * See https://issues.apache.org/jira/browse/FLINK-39265.
+     */
+    @Override
+    public Map<String, String> extractBinlogStateOffset(Object splitState) {
+        Map<String, String> offset = 
super.extractBinlogStateOffset(splitState);
+        offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
+        offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+        return offset;
+    }
+
     @Override
     public void close(JobBaseConfig jobConfig) {
         super.close(jobConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to