This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0f1f0cd4cad branch-2.1: [bugfix](load) Fixed import failure caused by 
the $. symbol (#53337)
0f1f0cd4cad is described below

commit 0f1f0cd4cadb475e5ac95b46fd681213ac95de3c
Author: lw112 <[email protected]>
AuthorDate: Thu Jul 17 14:03:58 2025 +0800

    branch-2.1: [bugfix](load) Fixed import failure caused by the $. symbol 
(#53337)
    
    Problem Summary:
    
    Of course, master is normal.
    
    1. Routine load task
    ```
    CREATE ROUTINE LOAD kafka_load_task ON test_123
    WITH APPEND
    COLUMNS(ot,time=from_unixtime(`ot`),id,name,content),
    PRECEDING FILTER ((`ot` > 0) AND (`id` != ''))
    PROPERTIES
    (
    "max_error_number" = "0",
    "max_filter_ratio" = "1.0",
    "max_batch_interval" = "10",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "format" = "json",
    "jsonpaths" = "[
            \"$.time\",
            \"$.id\",
            \"$.name\",
            \"$.\"
    ]",
    "strip_outer_array" = "false",
    "num_as_string" = "false",
    "fuzzy_parse" = "false",
    "strict_mode" = "false",
    "timezone" = "Asia/Shanghai",
    "exec_mem_limit" = "2147483648"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "127.0.0.1:9092",
    "kafka_topic" = "test-topic",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING",
    "property.group.id" = "test-groupid-1"
    );
    ```
    `I want to use the $. symbol to put the entire JSON string from Kafka
    into the context field. In the version before the fix, an error would
    occur: exception=[E-1721] Size of filter doesn't match size of column:
    size=0, filter.size=1. The JSON data in Kafka has other fields besides
    time, id, and name, but my requirement is only to get time, id, and
    name.`
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/vec/exec/format/json/new_json_reader.cpp    |   2 +
 .../test_routine_load_jsonpath_dollar_job.json     |   3 +
 .../test_routine_load_jsonpath_dollar.groovy       | 158 +++++++++++++++++++++
 3 files changed, 163 insertions(+)

diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 5953ec1319c..2ef30667f2a 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1722,6 +1722,8 @@ Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
                 nullable_column = assert_cast<ColumnNullable*>(column_ptr);
                 target_column_ptr = &nullable_column->get_nested_column();
                 nullable_column->get_null_map_data().push_back(0);
+            } else {
+                target_column_ptr = column_ptr;
             }
             auto* column_string = 
assert_cast<ColumnString*>(target_column_ptr);
             
column_string->insert_data(_simdjson_ondemand_padding_buffer.data(),
diff --git 
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
new file mode 100644
index 00000000000..86a8a9b2bcf
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
@@ -0,0 +1,3 @@
+{"time": 1752600673, "id": 1, "name": "test1", "extra": "field1"}
+{"time": 1752600674, "id": 2, "name": "test2", "extra": "field2"}
+{"time": 1752600675, "id": 3, "name": "test3", "extra": "field3"}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
new file mode 100644
index 00000000000..2dd0f792111
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_jsonpath_dollar", "p0") {
+    def tableName = "test_routine_load_jsonpath_dollar"
+    def jobName = "test_routine_load_jsonpath_dollar_job"
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // Send test data to Kafka
+        def props = new Properties()
+        props.put("bootstrap.servers", 
"${externalEnvIp}:${kafka_port}".toString())
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
+        def producer = new KafkaProducer<>(props)
+
+        def kafkaJson = new 
File("""${context.file.parent}/data/${jobName}.json""").text
+        def lines = kafkaJson.readLines()
+        lines.each { line ->
+            logger.info("=====${line}========")
+            def record = new ProducerRecord<>(jobName, null, line)
+            producer.send(record)
+        }
+        producer.close()
+        try {
+            sql """
+                DROP TABLE IF EXISTS ${tableName}
+            """
+
+            sql """
+                CREATE TABLE ${tableName} (
+                    time BIGINT,
+                    id INT,
+                    name VARCHAR(50),
+                    content TEXT
+                )
+                UNIQUE KEY(time, id)
+                DISTRIBUTED BY HASH(time, id) BUCKETS 1
+                PROPERTIES (
+                    "replication_num" = "1"
+                );
+            """
+
+            // Create routine load job with $. in jsonpaths
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+                COLUMNS(time, id, name, content)
+                PROPERTIES
+                (
+                    "format" = "json",
+                    "jsonpaths" = '[\"\$.time\", \"\$.id\", \"\$.name\", 
\"\$.\"]',
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200",
+                    "max_error_number" = "0",
+                    "strip_outer_array" = "false",
+                    "strict_mode" = "false"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "${jobName}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            sql "sync"
+
+            // Wait for routine load to be in RUNNING state
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                log.info("routine load state: ${state}")
+                if (state == "RUNNING") {
+                    break
+                }
+                if (count >= 60) {
+                    log.error("routine load failed to start after 60 seconds")
+                    assertEquals("RUNNING", state)
+                    break
+                }
+                count++
+            }
+
+            // Wait for data to be loaded
+            count = 0
+            while (true) {
+                def res = sql "select count(*) from ${tableName}"
+                def state = sql "show routine load for ${jobName}"
+                log.info("routine load state: ${state[0][8].toString()}")
+                log.info("routine load statistic: ${state[0][14].toString()}")
+                if (res[0][0] > 0) {
+                    break
+                }
+                if (count >= 60) {
+                    log.error("routine load can not load data for long time")
+                    break
+                }
+                sleep(5000)
+                count++
+            }
+
+            sql "sync"
+            def result = sql "select * from ${tableName} order by time, id"
+            log.info("Loaded data: ${result}")
+
+            def rowCount = sql "select count(*) from ${tableName}"
+            assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+            def contentCheck = sql "select content from ${tableName} where id 
= 1"
+            assertTrue(contentCheck.size() > 0, "No data found for id = 1")
+            def jsonContent = contentCheck[0][0].toString()
+            assertTrue(jsonContent.contains("test1"), "Content should contain 
the full JSON with 'test1'")
+            assertTrue(jsonContent.contains("field1"), "Content should contain 
the full JSON with 'field1'")
+            assertTrue(jsonContent.contains("time"), "Content should contain 
the full JSON with 'time' field")
+
+            def specificData = sql "select time, id, name from ${tableName} 
where id = 1"
+            assertEquals(1642505600, specificData[0][0])
+            assertEquals(1, specificData[0][1])
+            assertEquals("test1", specificData[0][2])
+
+        } finally {
+            try {
+                sql "stop routine load for ${jobName}"
+            } catch (Exception e) {
+                log.info("Stop routine load failed: ${e.getMessage()}")
+            }
+
+            try {
+                sql "DROP TABLE IF EXISTS ${tableName}"
+            } catch (Exception e) {
+                log.info("Drop table failed: ${e.getMessage()}")
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to