This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c9381b0285c [fix](load) Fix import failure when the stream load 
parameter specifies Transfer-Encoding:chunked (#48196) (#48503)
c9381b0285c is described below

commit c9381b0285ccd59a2eb039399fcf3e99ddcb7b22
Author: Sun Chenyang <suncheny...@selectdb.com>
AuthorDate: Tue Mar 4 10:12:54 2025 +0800

    [fix](load) Fix import failure when the stream load parameter specifies 
Transfer-Encoding:chunked (#48196) (#48503)
    
    pick from master  #48196
---
 be/src/http/action/stream_load.cpp                 |   1 +
 be/src/io/fs/stream_load_pipe.h                    |  10 ++
 be/src/vec/exec/format/json/new_json_reader.cpp    |  51 +++++++++-
 be/src/vec/exec/format/json/new_json_reader.h      |   4 +
 .../test_load_with_transfer_encoding.out           | Bin 0 -> 131 bytes
 .../test_load_with_transfer_encoding.groovy        | 109 +++++++++++++++++++++
 6 files changed, 173 insertions(+), 2 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index e472c006260..c61909ab712 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -423,6 +423,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
         if (ctx->is_chunked_transfer) {
             pipe = std::make_shared<io::StreamLoadPipe>(
                     io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
+            pipe->set_is_chunked_transfer(true);
         } else {
             pipe = std::make_shared<io::StreamLoadPipe>(
                     io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 978badf9add..a860b5f7d8a 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -87,6 +87,12 @@ public:
 
     size_t current_capacity();
 
+    bool is_chunked_transfer() const { return _is_chunked_transfer; }
+
+    void set_is_chunked_transfer(bool is_chunked_transfer) {
+        _is_chunked_transfer = is_chunked_transfer;
+    }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -121,6 +127,10 @@ private:
 
     // no use, only for compatibility with the `Path` interface
     Path _path = "";
+
+    // When importing JSON data and using chunked transfer encoding,
+    // the data needs to be completely read before it can be parsed.
+    bool _is_chunked_transfer = false;
 };
 } // namespace io
 } // namespace doris
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 0850f5c0286..6a75a135793 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -994,8 +994,7 @@ Status 
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
         break;
     }
     case TFileType::FILE_STREAM: {
-        RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()))
-                                ->read_one_message(file_buf, read_size));
+        RETURN_IF_ERROR(_read_one_message_from_pipe(file_buf, read_size));
         break;
     }
     default: {
@@ -1004,6 +1003,54 @@ Status 
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
     }
     return Status::OK();
 }
+
+Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* 
file_buf,
+                                                  size_t* read_size) {
+    auto* stream_load_pipe = 
dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());
+
+    // first read: read from the pipe once.
+    RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size));
+
+    // When the file is not chunked, the entire file has already been read.
+    if (!stream_load_pipe->is_chunked_transfer()) {
+        return Status::OK();
+    }
+
+    std::vector<uint8_t> buf;
+    uint64_t cur_size = 0;
+
+    // second read: continuously read data from the pipe until all data is 
read.
+    std::unique_ptr<uint8_t[]> read_buf;
+    size_t read_buf_size = 0;
+    while (true) {
+        RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, 
&read_buf_size));
+        if (read_buf_size == 0) {
+            break;
+        } else {
+            buf.insert(buf.end(), read_buf.get(), read_buf.get() + 
read_buf_size);
+            cur_size += read_buf_size;
+            read_buf_size = 0;
+            read_buf.reset();
+        }
+    }
+
+    // No data is available during the second read.
+    if (cur_size == 0) {
+        return Status::OK();
+    }
+
+    std::unique_ptr<uint8_t[]> total_buf = 
std::make_unique<uint8_t[]>(cur_size + *read_size);
+
+    // copy the data during the first read
+    memcpy(total_buf.get(), file_buf->get(), *read_size);
+
+    // copy the data during the second read
+    memcpy(total_buf.get() + *read_size, buf.data(), cur_size);
+    *file_buf = std::move(total_buf);
+    *read_size += cur_size;
+    return Status::OK();
+}
+
 // ---------SIMDJSON----------
 // simdjson, replace none simdjson function if it is ready
 Status NewJsonReader::_simdjson_init_reader() {
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index fe550fa44b9..2cafc57384d 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -143,6 +143,10 @@ private:
 
     Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* 
read_size);
 
+    // StreamLoadPipe::read_one_message only reads a portion of the data when 
stream loading with a chunked transfer HTTP request.
+    // Need to read all the data before performing JSON parsing.
+    Status _read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, 
size_t* read_size);
+
     // simdjson, replace none simdjson function if it is ready
     Status _simdjson_init_reader();
     Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
diff --git 
a/regression-test/data/load_p0/stream_load/test_load_with_transfer_encoding.out 
b/regression-test/data/load_p0/stream_load/test_load_with_transfer_encoding.out
new file mode 100644
index 00000000000..5d61d789119
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_load_with_transfer_encoding.out 
differ
diff --git 
a/regression-test/suites/load_p0/stream_load/test_load_with_transfer_encoding.groovy
 
b/regression-test/suites/load_p0/stream_load/test_load_with_transfer_encoding.groovy
new file mode 100644
index 00000000000..d5a675ca655
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_load_with_transfer_encoding.groovy
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.http.HttpStatus
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+
+suite("test_load_with_transfer_encoding", "p0") {
+    def table_name = "test_load_with_transfer_encoding"
+
+    sql "DROP TABLE IF EXISTS ${table_name}"
+    sql """
+        CREATE TABLE ${table_name} (
+        `place_id` varchar(14) NOT NULL,
+        `card_id` varchar(12) NOT NULL,
+        `shift_id` varchar(4) NULL,
+        `id` bigint NOT NULL AUTO_INCREMENT (1),
+        `created` datetime(6) NOT NULL,
+        `creater` bigint NULL,
+        `deleted` int NOT NULL,
+        `updated` datetime(6) NULL,
+        `card_type_id` varchar(8) NOT NULL,
+        `card_type_name` varchar(20) NULL,
+        `cash_balance` int NOT NULL,
+        `cashier_id` varchar(8) NULL,
+        `client_id` varchar(4) NULL,
+        `cost` int NOT NULL,
+        `creater_name` varchar(50) NULL,
+        `details` varchar(200) NULL,
+        `id_name` varchar(50) NOT NULL,
+        `id_number` varchar(18) NOT NULL,
+        `last_client_id` varchar(4) NULL,
+        `login_id` varchar(16) NULL,
+        `operation_type` varchar(50) NOT NULL,
+        `present` int NOT NULL,
+        `present_balance` int NOT NULL,
+        `remark` varchar(200) NULL,
+        `source_type` varchar(50) NOT NULL,
+        `online_account` int NOT NULL
+        ) ENGINE = OLAP DUPLICATE KEY (`place_id`, `card_id`, `shift_id`) 
DISTRIBUTED BY HASH (`operation_type`) BUCKETS 10 PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+
+
+    String db = context.config.getDbNameByFile(context.file)
+
+    def load_data = { inputFile, int count ->
+        String url = """${getS3Url()}/regression/load/data/${inputFile}.json"""
+        String fileName
+
+        HttpClients.createDefault().withCloseable { client ->
+            def file = new 
File("${context.config.cacheDataPath}/${inputFile}.json")
+            if (file.exists()) {
+                log.info("Found ${url} in ${file.getAbsolutePath()}");
+                fileName = file.getAbsolutePath()
+                return;
+            }
+
+            log.info("Start to down data from ${url} to 
$context.config.cacheDataPath}/");
+            CloseableHttpResponse resp = 
client.execute(RequestBuilder.get(url).build())
+            int code = resp.getStatusLine().getStatusCode()
+
+            if (code != HttpStatus.SC_OK) {
+                String streamBody = EntityUtils.toString(resp.getEntity())
+                log.info("Fail to download data ${url}, code: ${code}, 
body:\n${streamBody}")
+                throw new IllegalStateException("Get http stream failed, 
status code is ${code}, body:\n${streamBody}")
+            }
+
+            InputStream httpFileStream = resp.getEntity().getContent()
+            java.nio.file.Files.copy(httpFileStream, file.toPath(), 
java.nio.file.StandardCopyOption.REPLACE_EXISTING)
+            httpFileStream.close()
+            fileName = file.getAbsolutePath()
+            log.info("File downloaded to: ${fileName}")
+        }
+
+        def command = """curl --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword} -H 
read_json_by_line:false -H Expect:100-continue -H max_filter_ratio:1 -H 
strict_mode:false -H strip_outer_array:true -H 
columns:id,created,creater,deleted,updated,card_id,card_type_id,card_type_name,cash_balance,cashier_id,client_id,cost,creater_name,details,id_name,id_number,last_client_id,login_id,operation_type,place_id,present,present_balance,remark,shift_id,source_type
 [...]
+        log.info("stream load: ${command}")
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.text
+        def json = parseJson(out)
+        log.info("stream load result is:: ${out}".toString())
+        assertEquals("success", json.Status.toLowerCase())
+        assertEquals(count, json.NumberLoadedRows)
+        qt_sql """ select count() from ${table_name} """
+    }
+
+    load_data.call("test_load_with_transfer_encoding", 15272)
+    load_data.call("test_transfer_encoding_small", 10)
+    
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to