This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new c9381b0285c [fix](load) Fix import failure when the stream load parameter specifies Transfer-Encoding:chunked (#48196) (#48503) c9381b0285c is described below commit c9381b0285ccd59a2eb039399fcf3e99ddcb7b22 Author: Sun Chenyang <suncheny...@selectdb.com> AuthorDate: Tue Mar 4 10:12:54 2025 +0800 [fix](load) Fix import failure when the stream load parameter specifies Transfer-Encoding:chunked (#48196) (#48503) pick from master #48196 --- be/src/http/action/stream_load.cpp | 1 + be/src/io/fs/stream_load_pipe.h | 10 ++ be/src/vec/exec/format/json/new_json_reader.cpp | 51 +++++++++- be/src/vec/exec/format/json/new_json_reader.h | 4 + .../test_load_with_transfer_encoding.out | Bin 0 -> 131 bytes .../test_load_with_transfer_encoding.groovy | 109 +++++++++++++++++++++ 6 files changed, 173 insertions(+), 2 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index e472c006260..c61909ab712 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -423,6 +423,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (ctx->is_chunked_transfer) { pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */); + pipe->set_is_chunked_transfer(true); } else { pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index 978badf9add..a860b5f7d8a 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -87,6 +87,12 @@ public: size_t current_capacity(); + bool is_chunked_transfer() const { return _is_chunked_transfer; } + + void set_is_chunked_transfer(bool is_chunked_transfer) { + _is_chunked_transfer = is_chunked_transfer; + } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -121,6 +127,10 @@ private: // no use, only for compatibility with the `Path` interface Path _path = ""; + + // When importing JSON data and using chunked transfer encoding, + // the data needs to be completely read before it can be parsed. + bool _is_chunked_transfer = false; }; } // namespace io } // namespace doris diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 0850f5c0286..6a75a135793 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -994,8 +994,7 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si break; } case TFileType::FILE_STREAM: { - RETURN_IF_ERROR((dynamic_cast<io::StreamLoadPipe*>(_file_reader.get())) - ->read_one_message(file_buf, read_size)); + RETURN_IF_ERROR(_read_one_message_from_pipe(file_buf, read_size)); break; } default: { @@ -1004,6 +1003,54 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si } return Status::OK(); } + +Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, + size_t* read_size) { + auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()); + + // first read: read from the pipe once. + RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size)); + + // When the file is not chunked, the entire file has already been read. + if (!stream_load_pipe->is_chunked_transfer()) { + return Status::OK(); + } + + std::vector<uint8_t> buf; + uint64_t cur_size = 0; + + // second read: continuously read data from the pipe until all data is read. + std::unique_ptr<uint8_t[]> read_buf; + size_t read_buf_size = 0; + while (true) { + RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size)); + if (read_buf_size == 0) { + break; + } else { + buf.insert(buf.end(), read_buf.get(), read_buf.get() + read_buf_size); + cur_size += read_buf_size; + read_buf_size = 0; + read_buf.reset(); + } + } + + // No data is available during the second read. + if (cur_size == 0) { + return Status::OK(); + } + + std::unique_ptr<uint8_t[]> total_buf = std::make_unique<uint8_t[]>(cur_size + *read_size); + + // copy the data during the first read + memcpy(total_buf.get(), file_buf->get(), *read_size); + + // copy the data during the second read + memcpy(total_buf.get() + *read_size, buf.data(), cur_size); + *file_buf = std::move(total_buf); + *read_size += cur_size; + return Status::OK(); +} + // ---------SIMDJSON---------- // simdjson, replace none simdjson function if it is ready Status NewJsonReader::_simdjson_init_reader() { diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index fe550fa44b9..2cafc57384d 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -143,6 +143,10 @@ private: Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size); + // StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request. + // Need to read all the data before performing JSON parsing. + Status _read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size); + // simdjson, replace none simdjson function if it is ready Status _simdjson_init_reader(); Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, diff --git a/regression-test/data/load_p0/stream_load/test_load_with_transfer_encoding.out b/regression-test/data/load_p0/stream_load/test_load_with_transfer_encoding.out new file mode 100644 index 00000000000..5d61d789119 Binary files /dev/null and b/regression-test/data/load_p0/stream_load/test_load_with_transfer_encoding.out differ diff --git a/regression-test/suites/load_p0/stream_load/test_load_with_transfer_encoding.groovy b/regression-test/suites/load_p0/stream_load/test_load_with_transfer_encoding.groovy new file mode 100644 index 00000000000..d5a675ca655 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_load_with_transfer_encoding.groovy @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.http.HttpStatus +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +suite("test_load_with_transfer_encoding", "p0") { + def table_name = "test_load_with_transfer_encoding" + + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE ${table_name} ( + `place_id` varchar(14) NOT NULL, + `card_id` varchar(12) NOT NULL, + `shift_id` varchar(4) NULL, + `id` bigint NOT NULL AUTO_INCREMENT (1), + `created` datetime(6) NOT NULL, + `creater` bigint NULL, + `deleted` int NOT NULL, + `updated` datetime(6) NULL, + `card_type_id` varchar(8) NOT NULL, + `card_type_name` varchar(20) NULL, + `cash_balance` int NOT NULL, + `cashier_id` varchar(8) NULL, + `client_id` varchar(4) NULL, + `cost` int NOT NULL, + `creater_name` varchar(50) NULL, + `details` varchar(200) NULL, + `id_name` varchar(50) NOT NULL, + `id_number` varchar(18) NOT NULL, + `last_client_id` varchar(4) NULL, + `login_id` varchar(16) NULL, + `operation_type` varchar(50) NOT NULL, + `present` int NOT NULL, + `present_balance` int NOT NULL, + `remark` varchar(200) NULL, + `source_type` varchar(50) NOT NULL, + `online_account` int NOT NULL + ) ENGINE = OLAP DUPLICATE KEY (`place_id`, `card_id`, `shift_id`) DISTRIBUTED BY HASH (`operation_type`) BUCKETS 10 PROPERTIES ( + "replication_num" = "1" + ); + """ + + + String db = context.config.getDbNameByFile(context.file) + + def load_data = { inputFile, int count -> + String url = """${getS3Url()}/regression/load/data/${inputFile}.json""" + String fileName + + HttpClients.createDefault().withCloseable { client -> + def file = new File("${context.config.cacheDataPath}/${inputFile}.json") + if (file.exists()) { + log.info("Found ${url} in ${file.getAbsolutePath()}"); + fileName = file.getAbsolutePath() + return; + } + + log.info("Start to down data from ${url} to $context.config.cacheDataPath}/"); + CloseableHttpResponse resp = client.execute(RequestBuilder.get(url).build()) + int code = resp.getStatusLine().getStatusCode() + + if (code != HttpStatus.SC_OK) { + String streamBody = EntityUtils.toString(resp.getEntity()) + log.info("Fail to download data ${url}, code: ${code}, body:\n${streamBody}") + throw new IllegalStateException("Get http stream failed, status code is ${code}, body:\n${streamBody}") + } + + InputStream httpFileStream = resp.getEntity().getContent() + java.nio.file.Files.copy(httpFileStream, file.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING) + httpFileStream.close() + fileName = file.getAbsolutePath() + log.info("File downloaded to: ${fileName}") + } + + def command = """curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H read_json_by_line:false -H Expect:100-continue -H max_filter_ratio:1 -H strict_mode:false -H strip_outer_array:true -H columns:id,created,creater,deleted,updated,card_id,card_type_id,card_type_name,cash_balance,cashier_id,client_id,cost,creater_name,details,id_name,id_number,last_client_id,login_id,operation_type,place_id,present,present_balance,remark,shift_id,source_type [...] + log.info("stream load: ${command}") + def process = command.execute() + def code = process.waitFor() + def out = process.text + def json = parseJson(out) + log.info("stream load result is:: ${out}".toString()) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(count, json.NumberLoadedRows) + qt_sql """ select count() from ${table_name} """ + } + + load_data.call("test_load_with_transfer_encoding", 15272) + load_data.call("test_transfer_encoding_small", 10) + +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org