This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0f1f0cd4cad branch-2.1: [bugfix](load) Fixed import failure caused by
the $. symbol (#53337)
0f1f0cd4cad is described below
commit 0f1f0cd4cadb475e5ac95b46fd681213ac95de3c
Author: lw112 <[email protected]>
AuthorDate: Thu Jul 17 14:03:58 2025 +0800
branch-2.1: [bugfix](load) Fixed import failure caused by the $. symbol
(#53337)
Problem Summary:
Of course, master is normal.
1. Routine load task
```
CREATE ROUTINE LOAD kafka_load_task ON test_123
WITH APPEND
COLUMNS(ot,time=from_unixtime(`ot`),id,name,content),
PRECEDING FILTER ((`ot` > 0) AND (`id` != ''))
PROPERTIES
(
"max_error_number" = "0",
"max_filter_ratio" = "1.0",
"max_batch_interval" = "10",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"format" = "json",
"jsonpaths" = "[
\"$.time\",
\"$.id\",
\"$.name\",
\"$.\"
]",
"strip_outer_array" = "false",
"num_as_string" = "false",
"fuzzy_parse" = "false",
"strict_mode" = "false",
"timezone" = "Asia/Shanghai",
"exec_mem_limit" = "2147483648"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test-topic",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "test-groupid-1"
);
```
`I want to use the $. symbol to put the entire JSON string from Kafka
into the context field. In the version before the fix, an error would
occur: exception=[E-1721] Size of filter doesn't match size of column:
size=0, filter.size=1. The JSON data in Kafka has other fields besides
time, id, and name, but my requirement is only to get time, id, and
name.`
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/vec/exec/format/json/new_json_reader.cpp | 2 +
.../test_routine_load_jsonpath_dollar_job.json | 3 +
.../test_routine_load_jsonpath_dollar.groovy | 158 +++++++++++++++++++++
3 files changed, 163 insertions(+)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 5953ec1319c..2ef30667f2a 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1722,6 +1722,8 @@ Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
nullable_column = assert_cast<ColumnNullable*>(column_ptr);
target_column_ptr = &nullable_column->get_nested_column();
nullable_column->get_null_map_data().push_back(0);
+ } else {
+ target_column_ptr = column_ptr;
}
auto* column_string =
assert_cast<ColumnString*>(target_column_ptr);
column_string->insert_data(_simdjson_ondemand_padding_buffer.data(),
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
new file mode 100644
index 00000000000..86a8a9b2bcf
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_jsonpath_dollar_job.json
@@ -0,0 +1,3 @@
+{"time": 1752600673, "id": 1, "name": "test1", "extra": "field1"}
+{"time": 1752600674, "id": 2, "name": "test2", "extra": "field2"}
+{"time": 1752600675, "id": 3, "name": "test3", "extra": "field3"}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
new file mode 100644
index 00000000000..2dd0f792111
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+
+suite("test_routine_load_jsonpath_dollar", "p0") {
+ def tableName = "test_routine_load_jsonpath_dollar"
+ def jobName = "test_routine_load_jsonpath_dollar_job"
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // Send test data to Kafka
+ def props = new Properties()
+ props.put("bootstrap.servers",
"${externalEnvIp}:${kafka_port}".toString())
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ def producer = new KafkaProducer<>(props)
+
+ def kafkaJson = new
File("""${context.file.parent}/data/${jobName}.json""").text
+ def lines = kafkaJson.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(jobName, null, line)
+ producer.send(record)
+ }
+ producer.close()
+ try {
+ sql """
+ DROP TABLE IF EXISTS ${tableName}
+ """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ time BIGINT,
+ id INT,
+ name VARCHAR(50),
+ content TEXT
+ )
+ UNIQUE KEY(time, id)
+ DISTRIBUTED BY HASH(time, id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ // Create routine load job with $. in jsonpaths
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS(time, id, name, content)
+ PROPERTIES
+ (
+ "format" = "json",
+ "jsonpaths" = '[\"\$.time\", \"\$.id\", \"\$.name\",
\"\$.\"]',
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200",
+ "max_error_number" = "0",
+ "strip_outer_array" = "false",
+ "strict_mode" = "false"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${jobName}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ sql "sync"
+
+ // Wait for routine load to be in RUNNING state
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ log.info("routine load state: ${state}")
+ if (state == "RUNNING") {
+ break
+ }
+ if (count >= 60) {
+ log.error("routine load failed to start after 60 seconds")
+ assertEquals("RUNNING", state)
+ break
+ }
+ count++
+ }
+
+ // Wait for data to be loaded
+ count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state: ${state[0][8].toString()}")
+ log.info("routine load statistic: ${state[0][14].toString()}")
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 60) {
+ log.error("routine load can not load data for long time")
+ break
+ }
+ sleep(5000)
+ count++
+ }
+
+ sql "sync"
+ def result = sql "select * from ${tableName} order by time, id"
+ log.info("Loaded data: ${result}")
+
+ def rowCount = sql "select count(*) from ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ def contentCheck = sql "select content from ${tableName} where id
= 1"
+ assertTrue(contentCheck.size() > 0, "No data found for id = 1")
+ def jsonContent = contentCheck[0][0].toString()
+ assertTrue(jsonContent.contains("test1"), "Content should contain
the full JSON with 'test1'")
+ assertTrue(jsonContent.contains("field1"), "Content should contain
the full JSON with 'field1'")
+ assertTrue(jsonContent.contains("time"), "Content should contain
the full JSON with 'time' field")
+
+ def specificData = sql "select time, id, name from ${tableName}
where id = 1"
+ assertEquals(1642505600, specificData[0][0])
+ assertEquals(1, specificData[0][1])
+ assertEquals("test1", specificData[0][2])
+
+ } finally {
+ try {
+ sql "stop routine load for ${jobName}"
+ } catch (Exception e) {
+ log.info("Stop routine load failed: ${e.getMessage()}")
+ }
+
+ try {
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ } catch (Exception e) {
+ log.info("Drop table failed: ${e.getMessage()}")
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]