This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new defdae1e7d [improvement](stream-load) adjust read unit of http to optimize stream load (#9154) defdae1e7d is described below commit defdae1e7d5b78b15f37ace0295c4c43b5f1a103 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Fri May 20 09:52:36 2022 +0800 [improvement](stream-load) adjust read unit of http to optimize stream load (#9154) --- be/src/http/action/stream_load.cpp | 4 +- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/stream_load/stream_load_pipe.h | 6 +- thirdparty/patches/libevent.patch | 126 ++++++++++++++++++-------- 4 files changed, 94 insertions(+), 44 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 9efab0f975..b9087990ca 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -350,7 +350,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(4096); + auto bb = ByteBuffer::allocate(128 * 1024); auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); bb->pos = remove_bytes; bb->flip(); @@ -394,7 +394,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_header_type(ctx->header_type); request.__set_loadId(ctx->id.to_thrift()); if (ctx->use_streaming) { - auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */, + auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, ctx->body_bytes /* total_length */); RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d12bc85f26..c553744b5d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_cxt->need_commit_self = true; stream_load_cxt->need_rollback = true; // total_length == -1 means read one message from pipe in once time, don't care the length. - auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */, + auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, -1 /* total_length */, true /* use_proto */); stream_load_cxt->body_sink = pipe; diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 7872cab1e2..d793d1cbd6 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -29,12 +29,14 @@ namespace doris { +const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; // StreamLoadPipe use to transfer data from producer to consumer // Data in pip is stored in chunks. class StreamLoadPipe : public MessageBodySink, public FileReader { public: - StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024, - int64_t total_length = -1, bool use_proto = false) + StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, + size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, + bool use_proto = false) : _buffered_bytes(0), _proto_buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), diff --git a/thirdparty/patches/libevent.patch b/thirdparty/patches/libevent.patch index 83e426d62e..a545897cf1 100644 --- a/thirdparty/patches/libevent.patch +++ b/thirdparty/patches/libevent.patch @@ -1,6 +1,75 @@ -diff -uprN a/http.c b/http.c ---- a/http.c 2020-07-05 20:02:46.000000000 +0800 -+++ b/http.c 2021-09-28 13:56:14.045159153 +0800 +diff --git a/CMakeLists.txt b/CMakeLists.txt +index 676727f1..833fbf70 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -200,7 +200,7 @@ endif() + if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG})) + set(GNUC 1) + endif() +-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG})) ++if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC")) + set(MSVC 1) + endif() + +diff --git a/buffer.c b/buffer.c +index 3524b350..e5d97458 100644 +--- a/buffer.c ++++ b/buffer.c +@@ -2204,9 +2204,9 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen) + #define IOV_LEN_TYPE unsigned long + #endif + #endif +-#define NUM_READ_IOVEC 4 ++#define NUM_READ_IOVEC 8 + +-#define EVBUFFER_MAX_READ 4096 ++#define EVBUFFER_MAX_READ (128 * 1024) + + /** Helper function to figure out which space to use for reading data into + an evbuffer. Internal use only. +diff --git a/bufferevent_async.c b/bufferevent_async.c +index 40c7c5e8..c1624878 100644 +--- a/bufferevent_async.c ++++ b/bufferevent_async.c +@@ -275,7 +275,7 @@ bev_async_consider_reading(struct bufferevent_async *beva) + } + at_most = read_high - cur_size; + } else { +- at_most = 16384; /* FIXME totally magic. */ ++ at_most = 128 * 1024; /* FIXME totally magic. */ + } + + /* XXXX This over-commits. */ +diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c +index 25874968..9bc2b577 100644 +--- a/bufferevent_ratelim.c ++++ b/bufferevent_ratelim.c +@@ -179,7 +179,7 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) + } + + /* Default values for max_single_read & max_single_write variables. */ +-#define MAX_SINGLE_READ_DEFAULT 16384 ++#define MAX_SINGLE_READ_DEFAULT (128 * 1024) + #define MAX_SINGLE_WRITE_DEFAULT 16384 + + #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) +diff --git a/http-internal.h b/http-internal.h +index feaf436d..9f9b5ab5 100644 +--- a/http-internal.h ++++ b/http-internal.h +@@ -167,6 +167,8 @@ struct evhttp { + void *gencbarg; + struct bufferevent* (*bevcb)(struct event_base *, void *); + void *bevcbarg; ++ int (*newreqcb)(struct evhttp_request *req, void *); ++ void *newreqcbarg; + + struct event_base *base; + }; +diff --git a/http.c b/http.c +index 04f089bc..53951cba 100644 +--- a/http.c ++++ b/http.c @@ -3975,6 +3975,14 @@ evhttp_set_bevcb(struct evhttp *http, http->bevcbarg = cbarg; } @@ -16,7 +85,7 @@ diff -uprN a/http.c b/http.c /* * Request related functions */ -@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_reques +@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_request *req) req->flags |= EVHTTP_REQ_NEEDS_FREE; return; } @@ -25,7 +94,7 @@ diff -uprN a/http.c b/http.c if (req->remote_host != NULL) mm_free(req->remote_host); -@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct +@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct evhttp_request *req, req->on_complete_cb_arg = cb_arg; } @@ -41,7 +110,7 @@ diff -uprN a/http.c b/http.c /* * Allows for inspection of the request URI */ -@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connec +@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connection(struct evhttp_connection *evcon) */ req->userdone = 1; @@ -59,25 +128,15 @@ diff -uprN a/http.c b/http.c evhttp_start_read_(evcon); -diff -uprN a/http-internal.h b/http-internal.h ---- a/http-internal.h 2020-07-05 20:02:46.000000000 +0800 -+++ b/http-internal.h 2021-09-28 13:56:13.925151028 +0800 -@@ -167,6 +167,8 @@ struct evhttp { - void *gencbarg; - struct bufferevent* (*bevcb)(struct event_base *, void *); - void *bevcbarg; -+ int (*newreqcb)(struct evhttp_request *req, void *); -+ void *newreqcbarg; - - struct event_base *base; - }; -diff -uprN a/include/event2/http.h b/include/event2/http.h ---- a/include/event2/http.h 2020-07-05 20:02:46.000000000 +0800 -+++ b/include/event2/http.h 2021-09-28 13:56:13.928151231 +0800 -@@ -299,6 +299,20 @@ void evhttp_set_bevcb(struct evhttp *htt +diff --git a/include/event2/http.h b/include/event2/http.h +index 2a41303e..e80bab9a 100644 +--- a/include/event2/http.h ++++ b/include/event2/http.h +@@ -298,6 +298,20 @@ EVENT2_EXPORT_SYMBOL + void evhttp_set_bevcb(struct evhttp *http, struct bufferevent *(*cb)(struct event_base *, void *), void *arg); - /** ++/** + Set a callback which allows the user to note or throttle incoming requests. + The requests are not populated with HTTP level information. They + are just associated to a connection. @@ -91,10 +150,9 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h +void evhttp_set_newreqcb(struct evhttp *http, + int (*cb)(struct evhttp_request*, void *), void *arg); + -+/** + /** Adds a virtual host to the http server. - A virtual host is a newly initialized evhttp object that has request @@ -624,6 +638,20 @@ EVENT2_EXPORT_SYMBOL void evhttp_request_set_on_complete_cb(struct evhttp_request *req, void (*cb)(struct evhttp_request *, void *), void *cb_arg); @@ -116,9 +174,10 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h /** Frees the request object and removes associated events. */ EVENT2_EXPORT_SYMBOL void evhttp_request_free(struct evhttp_request *req); -diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h ---- a/include/event2/http_struct.h 2020-07-05 20:02:46.000000000 +0800 -+++ b/include/event2/http_struct.h 2021-09-28 13:56:13.928151231 +0800 +diff --git a/include/event2/http_struct.h b/include/event2/http_struct.h +index 4bf5b1ff..0762cabd 100644 +--- a/include/event2/http_struct.h ++++ b/include/event2/http_struct.h @@ -142,6 +142,12 @@ struct { */ void (*on_complete_cb)(struct evhttp_request *, void *); @@ -132,14 +191,3 @@ diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h }; #ifdef __cplusplus -diff -uprN a/CMakeLists.txt b/CMakeLists.txt ---- a/CMakeLists.txt 2020-07-05 20:02:46.000000000 +0800 -+++ b/CMakeLists.txt 2022-01-10 13:29:32.912883436 +0800 -@@ -200,6 +200,6 @@ endif() - if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG})) - set(GNUC 1) - endif() --if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG})) -+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC")) - set(MSVC 1) - endif() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org