This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fae4724d31 [enhancement](exception) catch exception for streamload 
and validate column (#40092)
6fae4724d31 is described below

commit 6fae4724d319fdf5f5dde1a3576ad287f23ebe90
Author: yiguolei <676222...@qq.com>
AuthorDate: Thu Aug 29 20:20:32 2024 +0800

    [enhancement](exception) catch exception for streamload and validate column 
(#40092)
---
 be/src/http/action/http_stream.cpp               | 58 ++++++++++++------------
 be/src/http/action/stream_load.cpp               | 35 +++++++-------
 be/src/io/file_factory.cpp                       |  1 +
 be/src/io/fs/stream_load_pipe.cpp                |  4 +-
 be/src/runtime/stream_load/stream_load_context.h |  8 ++--
 be/src/util/byte_buffer.h                        |  9 +---
 be/src/vec/sink/vtablet_block_convertor.cpp      | 11 ++---
 be/src/vec/sink/vtablet_block_convertor.h        | 13 +++++-
 be/test/util/byte_buffer2_test.cpp               |  3 +-
 9 files changed, 75 insertions(+), 67 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index 7dbae6df731..c6176c52815 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -237,37 +237,39 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
     SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
 
     int64_t start_read_data_time = MonotonicNanos();
+    Status st = ctx->allocate_schema_buffer();
+    if (!st.ok()) {
+        ctx->status = st;
+        return;
+    }
     while (evbuffer_get_length(evbuf) > 0) {
-        try {
-            auto bb = ByteBuffer::allocate(128 * 1024);
-            auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
-            bb->pos = remove_bytes;
-            bb->flip();
-            auto st = ctx->body_sink->append(bb);
-            // schema_buffer stores 1M of data for parsing column information
-            // need to determine whether to cache for the first time
-            if (ctx->is_read_schema) {
-                if (ctx->schema_buffer()->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
-                    ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
-                } else {
-                    LOG(INFO) << "use a portion of data to request fe to 
obtain column information";
-                    ctx->is_read_schema = false;
-                    ctx->status = process_put(req, ctx);
-                }
-            }
-            if (!st.ok() && !ctx->status.ok()) {
-                LOG(WARNING) << "append body content failed. errmsg=" << st << 
", " << ctx->brief();
-                ctx->status = st;
-                return;
-            }
-            ctx->receive_bytes += remove_bytes;
-        } catch (const doris::Exception& e) {
-            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
-                ctx->status = Status::MemoryLimitExceeded(
-                        fmt::format("PreCatch error code:{}, {}, ", e.code(), 
e.to_string()));
+        ByteBufferPtr bb;
+        st = ByteBuffer::allocate(128 * 1024, &bb);
+        if (!st.ok()) {
+            ctx->status = st;
+            return;
+        }
+        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+        bb->pos = remove_bytes;
+        bb->flip();
+        st = ctx->body_sink->append(bb);
+        // schema_buffer stores 1M of data for parsing column information
+        // need to determine whether to cache for the first time
+        if (ctx->is_read_schema) {
+            if (ctx->schema_buffer()->pos + remove_bytes < 
config::stream_tvf_buffer_size) {
+                ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
+            } else {
+                LOG(INFO) << "use a portion of data to request fe to obtain 
column information";
+                ctx->is_read_schema = false;
+                ctx->status = process_put(req, ctx);
             }
-            ctx->status = Status::Error<false>(e.code(), e.to_string());
         }
+        if (!st.ok()) {
+            LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
+            ctx->status = st;
+            return;
+        }
+        ctx->receive_bytes += remove_bytes;
     }
     // after all the data has been read and it has not reached 1M, it will 
execute here
     if (ctx->is_read_schema) {
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index d0c5dff2075..1a9420dea63 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -344,25 +344,22 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
 
     int64_t start_read_data_time = MonotonicNanos();
     while (evbuffer_get_length(evbuf) > 0) {
-        try {
-            auto bb = ByteBuffer::allocate(128 * 1024);
-            auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
-            bb->pos = remove_bytes;
-            bb->flip();
-            auto st = ctx->body_sink->append(bb);
-            if (!st.ok()) {
-                LOG(WARNING) << "append body content failed. errmsg=" << st << 
", " << ctx->brief();
-                ctx->status = st;
-                return;
-            }
-            ctx->receive_bytes += remove_bytes;
-        } catch (const doris::Exception& e) {
-            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
-                ctx->status = Status::MemoryLimitExceeded(
-                        fmt::format("PreCatch error code:{}, {}, ", e.code(), 
e.to_string()));
-            }
-            ctx->status = Status::Error<false>(e.code(), e.to_string());
-        }
+        ByteBufferPtr bb;
+        Status st = ByteBuffer::allocate(128 * 1024, &bb);
+        if (!st.ok()) {
+            ctx->status = st;
+            return;
+        }
+        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+        bb->pos = remove_bytes;
+        bb->flip();
+        st = ctx->body_sink->append(bb);
+        if (!st.ok()) {
+            LOG(WARNING) << "append body content failed. errmsg=" << st << ", 
" << ctx->brief();
+            ctx->status = st;
+            return;
+        }
+        ctx->receive_bytes += remove_bytes;
     }
     int64_t read_data_time = MonotonicNanos() - start_read_data_time;
     int64_t last_receive_and_read_data_cost_nanos = 
ctx->receive_and_read_data_cost_nanos;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index f4ce573c535..86907886f17 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -206,6 +206,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& 
load_id, io::FileReaderS
         return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
     }
     if (need_schema) {
+        RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
         // Here, a portion of the data is processed to parse column information
         auto pipe = std::make_shared<io::StreamLoadPipe>(
                 io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
diff --git a/be/src/io/fs/stream_load_pipe.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
index ce91a2e8391..0dc27e009d0 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -113,7 +113,7 @@ Status 
StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
 Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t 
proto_byte_size) {
     SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
     ByteBufferPtr buf;
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 
128 * 1024));
+    RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
     buf->put_bytes(data, size);
     buf->flip();
     return _append(buf, proto_byte_size);
@@ -148,7 +148,7 @@ Status StreamLoadPipe::append(const char* data, size_t 
size) {
     size_t chunk_size = std::max(_min_chunk_size, size - pos);
     chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
     SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
-    
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, 
chunk_size));
+    RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
     _write_buf->put_bytes(data + pos, size - pos);
     return Status::OK();
 }
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 95e56e0b3fa..9d1601372f8 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -121,15 +121,17 @@ public:
 
     bool is_mow_table() const;
 
-    ByteBufferPtr schema_buffer() {
+    Status allocate_schema_buffer() {
         if (_schema_buffer == nullptr) {
             SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
                     ExecEnv::GetInstance()->stream_load_pipe_tracker());
-            _schema_buffer = 
ByteBuffer::allocate(config::stream_tvf_buffer_size);
+            return ByteBuffer::allocate(config::stream_tvf_buffer_size, 
&_schema_buffer);
         }
-        return _schema_buffer;
+        return Status::OK();
     }
 
+    ByteBufferPtr schema_buffer() { return _schema_buffer; }
+
 public:
     static const int default_txn_id = -1;
     // load type, eg: ROUTINE LOAD/MANUAL LOAD
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index 1499f51c053..aafd4506087 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -34,13 +34,8 @@ struct ByteBuffer;
 using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
 
 struct ByteBuffer : private Allocator<false> {
-    static ByteBufferPtr allocate(size_t size) {
-        ByteBufferPtr ptr(new ByteBuffer(size));
-        return ptr;
-    }
-
-    static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) {
-        ptr = ByteBufferPtr(new ByteBuffer(size));
+    static Status allocate(const size_t size, ByteBufferPtr* ptr) {
+        RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new 
ByteBuffer(size)); });
         return Status::OK();
     }
 
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp 
b/be/src/vec/sink/vtablet_block_convertor.cpp
index feb6633511e..617668c035a 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -182,12 +182,11 @@ DecimalType 
OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip
     return DecimalType(value);
 }
 
-Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
-                                                 bool is_nullable, 
vectorized::ColumnPtr column,
-                                                 size_t slot_index, bool* 
stop_processing,
-                                                 fmt::memory_buffer& 
error_prefix,
-                                                 const uint32_t row_count,
-                                                 
vectorized::IColumn::Permutation* rows) {
+Status OlapTableBlockConvertor::_internal_validate_column(
+        RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
+        vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
+        fmt::memory_buffer& error_prefix, const uint32_t row_count,
+        vectorized::IColumn::Permutation* rows) {
     DCHECK((rows == nullptr) || (rows->size() == row_count));
     fmt::memory_buffer error_msg;
     auto set_invalid_and_append_error_msg = [&](int row) {
diff --git a/be/src/vec/sink/vtablet_block_convertor.h 
b/be/src/vec/sink/vtablet_block_convertor.h
index 0db340ce6c2..7f866c38032 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -69,7 +69,18 @@ private:
     Status _validate_column(RuntimeState* state, const TypeDescriptor& type, 
bool is_nullable,
                             vectorized::ColumnPtr column, size_t slot_index, 
bool* stop_processing,
                             fmt::memory_buffer& error_prefix, const uint32_t 
row_count,
-                            vectorized::IColumn::Permutation* rows = nullptr);
+                            vectorized::IColumn::Permutation* rows = nullptr) {
+        RETURN_IF_CATCH_EXCEPTION({
+            return _internal_validate_column(state, type, is_nullable, column, 
slot_index,
+                                             stop_processing, error_prefix, 
row_count, rows);
+        });
+    }
+
+    Status _internal_validate_column(RuntimeState* state, const 
TypeDescriptor& type,
+                                     bool is_nullable, vectorized::ColumnPtr 
column,
+                                     size_t slot_index, bool* stop_processing,
+                                     fmt::memory_buffer& error_prefix, const 
uint32_t row_count,
+                                     vectorized::IColumn::Permutation* rows = 
nullptr);
 
     // make input data valid for OLAP table
     // return number of invalid/filtered rows.
diff --git a/be/test/util/byte_buffer2_test.cpp 
b/be/test/util/byte_buffer2_test.cpp
index 04b62cd5fe8..73c38c9e404 100644
--- a/be/test/util/byte_buffer2_test.cpp
+++ b/be/test/util/byte_buffer2_test.cpp
@@ -32,7 +32,8 @@ public:
 };
 
 TEST_F(ByteBufferTest, normal) {
-    auto buf = ByteBuffer::allocate(4);
+    ByteBufferPtr buf;
+    Status st = ByteBuffer::allocate(4, &buf);
     EXPECT_EQ(0, buf->pos);
     EXPECT_EQ(4, buf->limit);
     EXPECT_EQ(4, buf->capacity);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to