This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6fae4724d31 [enhancement](exception) catch exception for streamload and validate column (#40092) 6fae4724d31 is described below commit 6fae4724d319fdf5f5dde1a3576ad287f23ebe90 Author: yiguolei <676222...@qq.com> AuthorDate: Thu Aug 29 20:20:32 2024 +0800 [enhancement](exception) catch exception for streamload and validate column (#40092) --- be/src/http/action/http_stream.cpp | 58 ++++++++++++------------ be/src/http/action/stream_load.cpp | 35 +++++++------- be/src/io/file_factory.cpp | 1 + be/src/io/fs/stream_load_pipe.cpp | 4 +- be/src/runtime/stream_load/stream_load_context.h | 8 ++-- be/src/util/byte_buffer.h | 9 +--- be/src/vec/sink/vtablet_block_convertor.cpp | 11 ++--- be/src/vec/sink/vtablet_block_convertor.h | 13 +++++- be/test/util/byte_buffer2_test.cpp | 3 +- 9 files changed, 75 insertions(+), 67 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 7dbae6df731..c6176c52815 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -237,37 +237,39 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } while (evbuffer_get_length(evbuf) > 0) { - try { - auto bb = ByteBuffer::allocate(128 * 1024); - auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); - bb->pos = remove_bytes; - bb->flip(); - auto st = ctx->body_sink->append(bb); - // schema_buffer stores 1M of data for parsing column information - // need to determine whether to cache for the first time - if (ctx->is_read_schema) { - if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { - ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); - } else { - LOG(INFO) << "use a portion of data to request fe to obtain column information"; - ctx->is_read_schema = false; - ctx->status = process_put(req, ctx); - } - } - if (!st.ok() && !ctx->status.ok()) { - LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); - ctx->status = st; - return; - } - ctx->receive_bytes += remove_bytes; - } catch (const doris::Exception& e) { - if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { - ctx->status = Status::MemoryLimitExceeded( - fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string())); + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + st = ctx->body_sink->append(bb); + // schema_buffer stores 1M of data for parsing column information + // need to determine whether to cache for the first time + if (ctx->is_read_schema) { + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); + } else { + LOG(INFO) << "use a portion of data to request fe to obtain column information"; + ctx->is_read_schema = false; + ctx->status = process_put(req, ctx); } - ctx->status = Status::Error<false>(e.code(), e.to_string()); } + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; } // after all the data has been read and it has not reached 1M, it will execute here if (ctx->is_read_schema) { diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d0c5dff2075..1a9420dea63 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -344,25 +344,22 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - try { - auto bb = ByteBuffer::allocate(128 * 1024); - auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); - bb->pos = remove_bytes; - bb->flip(); - auto st = ctx->body_sink->append(bb); - if (!st.ok()) { - LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); - ctx->status = st; - return; - } - ctx->receive_bytes += remove_bytes; - } catch (const doris::Exception& e) { - if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { - ctx->status = Status::MemoryLimitExceeded( - fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string())); - } - ctx->status = Status::Error<false>(e.code(), e.to_string()); - } + ByteBufferPtr bb; + Status st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + st = ctx->body_sink->append(bb); + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; } int64_t read_data_time = MonotonicNanos() - start_read_data_time; int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index f4ce573c535..86907886f17 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -206,6 +206,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } if (need_schema) { + RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer()); // Here, a portion of the data is processed to parse column information auto pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index ce91a2e8391..0dc27e009d0 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -113,7 +113,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); ByteBufferPtr buf; - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 128 * 1024)); + RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf)); buf->put_bytes(data, size); buf->flip(); return _append(buf, proto_byte_size); @@ -148,7 +148,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) { size_t chunk_size = std::max(_min_chunk_size, size - pos); chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, chunk_size)); + RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf)); _write_buf->put_bytes(data + pos, size - pos); return Status::OK(); } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 95e56e0b3fa..9d1601372f8 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -121,15 +121,17 @@ public: bool is_mow_table() const; - ByteBufferPtr schema_buffer() { + Status allocate_schema_buffer() { if (_schema_buffer == nullptr) { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( ExecEnv::GetInstance()->stream_load_pipe_tracker()); - _schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); + return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer); } - return _schema_buffer; + return Status::OK(); } + ByteBufferPtr schema_buffer() { return _schema_buffer; } + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index 1499f51c053..aafd4506087 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -34,13 +34,8 @@ struct ByteBuffer; using ByteBufferPtr = std::shared_ptr<ByteBuffer>; struct ByteBuffer : private Allocator<false> { - static ByteBufferPtr allocate(size_t size) { - ByteBufferPtr ptr(new ByteBuffer(size)); - return ptr; - } - - static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) { - ptr = ByteBufferPtr(new ByteBuffer(size)); + static Status allocate(const size_t size, ByteBufferPtr* ptr) { + RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new ByteBuffer(size)); }); return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index feb6633511e..617668c035a 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -182,12 +182,11 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip return DecimalType(value); } -Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const TypeDescriptor& type, - bool is_nullable, vectorized::ColumnPtr column, - size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, - const uint32_t row_count, - vectorized::IColumn::Permutation* rows) { +Status OlapTableBlockConvertor::_internal_validate_column( + RuntimeState* state, const TypeDescriptor& type, bool is_nullable, + vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows) { DCHECK((rows == nullptr) || (rows->size() == row_count)); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 0db340ce6c2..7f866c38032 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -69,7 +69,18 @@ private: Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, fmt::memory_buffer& error_prefix, const uint32_t row_count, - vectorized::IColumn::Permutation* rows = nullptr); + vectorized::IColumn::Permutation* rows = nullptr) { + RETURN_IF_CATCH_EXCEPTION({ + return _internal_validate_column(state, type, is_nullable, column, slot_index, + stop_processing, error_prefix, row_count, rows); + }); + } + + Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type, + bool is_nullable, vectorized::ColumnPtr column, + size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows = nullptr); // make input data valid for OLAP table // return number of invalid/filtered rows. diff --git a/be/test/util/byte_buffer2_test.cpp b/be/test/util/byte_buffer2_test.cpp index 04b62cd5fe8..73c38c9e404 100644 --- a/be/test/util/byte_buffer2_test.cpp +++ b/be/test/util/byte_buffer2_test.cpp @@ -32,7 +32,8 @@ public: }; TEST_F(ByteBufferTest, normal) { - auto buf = ByteBuffer::allocate(4); + ByteBufferPtr buf; + Status st = ByteBuffer::allocate(4, &buf); EXPECT_EQ(0, buf->pos); EXPECT_EQ(4, buf->limit); EXPECT_EQ(4, buf->capacity); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org