This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eeef49eafdc [enhance](job) add zero-row hint for Kafka read_committed 
load (#63664)
eeef49eafdc is described below

commit eeef49eafdc121df570f4ab7f1e1c380a35d9e53
Author: hui lai <[email protected]>
AuthorDate: Mon Jun 1 11:42:12 2026 +0800

    [enhance](job) add zero-row hint for Kafka read_committed load (#63664)
    
    When Kafka routine load is configured with
    `isolation.level=read_committed`, the consumer may consume 0 rows while
    the task partition lag is still positive. This can happen when upstream
    producers use Kafka transactions and some records are not committed yet,
    so they are invisible to the read_committed consumer.
    
    This PR adds an `OtherMsg` hint for this case:
    - routine load task consumes 0 rows
    - task partition lag is positive
    - Kafka property `isolation.level=read_committed` is configured
    
    The message helps users distinguish this case from ordinary no-data
    consumption. A FE debug point and regression test are added to ensure
    the warning can be reported deterministically.
---
 .../routineload/kafka/KafkaRoutineLoadJob.java     |  30 +++++
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  25 ++++
 .../test_routine_load_error_info.groovy            | 141 ++++++++++++++-------
 3 files changed, 148 insertions(+), 48 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
index 9464be78d05..e1290b03996 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaRoutineLoadJob.java
@@ -33,6 +33,7 @@ import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
@@ -95,6 +96,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     public static final String KAFKA_FILE_CATALOG = "kafka";
     public static final String PROP_GROUP_ID = "group.id";
+    private static final String KAFKA_ISOLATION_LEVEL = "isolation.level";
+    private static final String KAFKA_READ_COMMITTED = "read_committed";
+    private static final String HAS_POSITIVE_LAG_DEBUG_POINT = 
"KafkaRoutineLoadJob.hasPositiveLagForTask";
+    private static final String READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE = 
"Kafka routine load consumed 0 rows "
+            + "while lag is still positive under 
isolation.level=read_committed. If the upstream producer uses "
+            + "Kafka transactions, some records may be in uncommitted 
transactions and are not visible yet.";
 
     @SerializedName("bl")
     private String brokerList;
@@ -351,6 +358,29 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws 
UserException {
         updateProgressAndOffsetsCache(attachment);
         super.updateProgress(attachment);
+        updateReadCommittedLagHint(attachment);
+    }
+
+    private void updateReadCommittedLagHint(RLTaskTxnCommitAttachment 
attachment) {
+        if (DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
+                || (attachment.getTotalRows() == 0 && isReadCommitted() && 
hasPositiveLagForTask(attachment))) {
+            setOtherMsg(READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE);
+        }
+    }
+
+    private boolean isReadCommitted() {
+        return 
KAFKA_READ_COMMITTED.equalsIgnoreCase(customProperties.get(KAFKA_ISOLATION_LEVEL));
+    }
+
+    private boolean hasPositiveLagForTask(RLTaskTxnCommitAttachment 
attachment) {
+        Map<Integer, Long> partitionIdToOffset = ((KafkaProgress) 
attachment.getProgress()).getOffsetByPartition();
+        for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
+            Long latestOffset = 
cachedPartitionWithLatestOffsets.get(entry.getKey());
+            if (latestOffset != null && latestOffset > entry.getValue() + 1) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 6acd00a3f72..09d876ab2e0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -36,6 +36,7 @@ import org.apache.doris.load.RoutineLoadDesc;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
 import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
+import org.apache.doris.load.routineload.kafka.KafkaProgress;
 import org.apache.doris.load.routineload.kafka.KafkaRoutineLoadJob;
 import org.apache.doris.load.routineload.kafka.KafkaTaskInfo;
 import org.apache.doris.mysql.privilege.MockedAuth;
@@ -176,6 +177,30 @@ public class KafkaRoutineLoadJobTest {
         }
     }
 
+    @Test
+    public void 
testUpdateProgressWarnsWhenReadCommittedTaskHasZeroRowsAndLag() throws 
UserException {
+        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, 
"kafka_routine_load_job", 1L,
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+        Map<String, String> customProperties = Maps.newHashMap();
+        customProperties.put("isolation.level", "read_committed");
+        Deencapsulation.setField(routineLoadJob, "customProperties", 
customProperties);
+
+        Map<Integer, Long> cachedPartitionWithLatestOffsets = 
Maps.newHashMap();
+        cachedPartitionWithLatestOffsets.put(1, 20L);
+        Deencapsulation.setField(routineLoadJob, 
"cachedPartitionWithLatestOffsets",
+                cachedPartitionWithLatestOffsets);
+
+        Map<Integer, Long> taskProgress = Maps.newHashMap();
+        taskProgress.put(1, 10L);
+        RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
+        Deencapsulation.setField(attachment, "progress", new 
KafkaProgress(taskProgress));
+
+        Deencapsulation.invoke(routineLoadJob, "updateProgress", attachment);
+
+        String otherMsg = Deencapsulation.getField(routineLoadJob, "otherMsg");
+        Assert.assertTrue(otherMsg.contains("some records may be in 
uncommitted transactions"));
+    }
+
     @Test
     public void testProcessTimeOutTasks() throws Exception {
         RoutineLoadManager routineLoadManager = 
Mockito.mock(RoutineLoadManager.class);
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index 2715b8a45ca..260824a05eb 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.apache.doris.regression.util.RoutineLoadTestUtils
 import org.apache.kafka.clients.admin.AdminClient
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.producer.ProducerConfig
 
 suite("test_routine_load_error_info","nonConcurrent") {
@@ -30,52 +30,19 @@ suite("test_routine_load_error_info","nonConcurrent") {
                   "test_error_info",
                 ]
 
-    String enabled = context.config.otherConfigs.get("enableKafkaTest")
-    String kafka_port = context.config.otherConfigs.get("kafka_port")
-    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
-    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+    def kafkaTestEnabled = RoutineLoadTestUtils.isKafkaTestEnabled(context)
+    def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
 
     // send data to kafka 
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        def props = new Properties()
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
-        // add timeout config
-        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")  
-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
-
-        // check conenction
-        def verifyKafkaConnection = { prod ->
-            try {
-                logger.info("=====try to connect Kafka========")
-                def partitions = 
prod.partitionsFor("__connection_verification_topic")
-                return partitions != null
-            } catch (Exception e) {
-                throw new Exception("Kafka connect fail: 
${e.message}".toString())
-            }
-        }
-        // Create kafka producer
-        def producer = new KafkaProducer<>(props)
+    if (kafkaTestEnabled) {
+        def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
         try {
-            logger.info("Kafka connecting: ${kafka_broker}")
-            if (!verifyKafkaConnection(producer)) {
-                throw new Exception("can't get any kafka info")
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
[kafkaCsvTopic], txt.readLines())
             }
-        } catch (Exception e) {
-            logger.error("FATAL: " + e.getMessage())
+        } finally {
             producer.close()
-            throw e  
-        }
-        logger.info("Kafka connect success")
-        for (String kafkaCsvTopic in kafkaCsvTpoics) {
-            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
-            def lines = txt.readLines()
-            lines.each { line ->
-                logger.info("=====${line}========")
-                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
-                producer.send(record)
-            }
         }
     }
 
@@ -165,15 +132,47 @@ suite("test_routine_load_error_info","nonConcurrent") {
                 )
                 FROM KAFKA
                 (
-                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_broker_list" = "${kafka_broker}",
                     "kafka_topic" = "${kafkaTopic}",
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
                 );
         """
     }
 
+    def createReadCommittedJob = {jobName, tableName, kafkaTopic ->
+        sql """
+        CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${kafkaTopic}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING",
+                    "property.isolation.level" = "read_committed"
+                );
+        """
+    }
+
+    def createKafkaTopic = {kafkaTopic ->
+        def adminProps = new Properties()
+        adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        def adminClient = AdminClient.create(adminProps)
+        try {
+            adminClient.createTopics([new NewTopic(kafkaTopic, 1, (short) 
1)]).all().get()
+        } finally {
+            adminClient.close()
+        }
+    }
+
     // case 1: task failed
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+    if (kafkaTestEnabled) {
         // create table
         def jobName = "test_error_info"
         def tableName = "test_routine_error_info"
@@ -209,7 +208,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
     }
 
     // case 2: reschedule job
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+    if (kafkaTestEnabled) {
         def jobName = "test_error_info"
         def tableName = "test_routine_error_info"
         try {
@@ -242,7 +241,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
     }
 
     // case 3: memory limit
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+    if (kafkaTestEnabled) {
         def jobName = "test_memory_limit_error_info"
         def tableName = "test_routine_memory_limit_error_info"
         
@@ -276,4 +275,50 @@ suite("test_routine_load_error_info","nonConcurrent") {
             sql "DROP TABLE IF EXISTS ${tableName}"
         }
     }
-}
\ No newline at end of file
+
+    // case 4: read_committed lag hint
+    if (kafkaTestEnabled) {
+        def jobName = "test_read_committed_lag_error_info"
+        def tableName = "test_routine_read_committed_lag_error_info"
+        def kafkaTopic = 
"test_read_committed_lag_error_info_${System.currentTimeMillis()}"
+        def debugPoint = "KafkaRoutineLoadJob.hasPositiveLagForTask"
+
+        try {
+            createKafkaTopic(kafkaTopic)
+            def producer = 
RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+            try {
+                def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTpoics[0]}.csv""").text
+                RoutineLoadTestUtils.sendTestDataToKafka(producer, 
[kafkaTopic], txt.readLines())
+            } finally {
+                producer.close()
+            }
+            createTable(tableName)
+            sql "sync"
+            GetDebugPoint().enableDebugPointForAllFEs(debugPoint)
+            createReadCommittedJob(jobName, tableName, kafkaTopic)
+            sql "sync"
+
+            // check error info
+            def count = 0
+            while (true) {
+                def res = sql "show routine load for ${jobName}"
+                log.info("show routine load: ${res[0].toString()}".toString())
+                log.info("other msg: ${res[0][19].toString()}".toString())
+                if (res[0][19].toString() != "") {
+                    assertTrue(res[0][19].toString().contains("some records 
may be in uncommitted transactions"))
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                    break;
+                }
+                sleep(1000)
+            }
+        } finally {
+            GetDebugPoint().disableDebugPointForAllFEs(debugPoint)
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to