This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new defdae1e7d [improvement](stream-load) adjust read unit of http to 
optimize stream load (#9154)
defdae1e7d is described below

commit defdae1e7d5b78b15f37ace0295c4c43b5f1a103
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Fri May 20 09:52:36 2022 +0800

    [improvement](stream-load) adjust read unit of http to optimize stream load 
(#9154)
---
 be/src/http/action/stream_load.cpp            |   4 +-
 be/src/runtime/fragment_mgr.cpp               |   2 +-
 be/src/runtime/stream_load/stream_load_pipe.h |   6 +-
 thirdparty/patches/libevent.patch             | 126 ++++++++++++++++++--------
 4 files changed, 94 insertions(+), 44 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 9efab0f975..b9087990ca 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -350,7 +350,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
 
     int64_t start_read_data_time = MonotonicNanos();
     while (evbuffer_get_length(evbuf) > 0) {
-        auto bb = ByteBuffer::allocate(4096);
+        auto bb = ByteBuffer::allocate(128 * 1024);
         auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
         bb->pos = remove_bytes;
         bb->flip();
@@ -394,7 +394,7 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
     request.__set_header_type(ctx->header_type);
     request.__set_loadId(ctx->id.to_thrift());
     if (ctx->use_streaming) {
-        auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* 
max_buffered_bytes */,
+        auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* 
max_buffered_bytes */,
                                                      64 * 1024 /* 
min_chunk_size */,
                                                      ctx->body_bytes /* 
total_length */);
         RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d12bc85f26..c553744b5d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params) {
         stream_load_cxt->need_commit_self = true;
         stream_load_cxt->need_rollback = true;
         // total_length == -1 means read one message from pipe in once time, 
don't care the length.
-        auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* 
max_buffered_bytes */,
+        auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* 
max_buffered_bytes */,
                                                      64 * 1024 /* 
min_chunk_size */,
                                                      -1 /* total_length */, 
true /* use_proto */);
         stream_load_cxt->body_sink = pipe;
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h 
b/be/src/runtime/stream_load/stream_load_pipe.h
index 7872cab1e2..d793d1cbd6 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -29,12 +29,14 @@
 
 namespace doris {
 
+const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
 // StreamLoadPipe use to transfer data from producer to consumer
 // Data in pip is stored in chunks.
 class StreamLoadPipe : public MessageBodySink, public FileReader {
 public:
-    StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t 
min_chunk_size = 64 * 1024,
-                   int64_t total_length = -1, bool use_proto = false)
+    StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
+                   size_t min_chunk_size = 64 * 1024, int64_t total_length = 
-1,
+                   bool use_proto = false)
             : _buffered_bytes(0),
               _proto_buffered_bytes(0),
               _max_buffered_bytes(max_buffered_bytes),
diff --git a/thirdparty/patches/libevent.patch 
b/thirdparty/patches/libevent.patch
index 83e426d62e..a545897cf1 100644
--- a/thirdparty/patches/libevent.patch
+++ b/thirdparty/patches/libevent.patch
@@ -1,6 +1,75 @@
-diff -uprN a/http.c b/http.c
---- a/http.c   2020-07-05 20:02:46.000000000 +0800
-+++ b/http.c   2021-09-28 13:56:14.045159153 +0800
+diff --git a/CMakeLists.txt b/CMakeLists.txt
+index 676727f1..833fbf70 100644
+--- a/CMakeLists.txt
++++ b/CMakeLists.txt
+@@ -200,7 +200,7 @@ endif()
+ if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
+     set(GNUC 1)
+ endif()
+-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
++if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" 
STREQUAL "MSVC"))
+     set(MSVC 1)
+ endif()
+ 
+diff --git a/buffer.c b/buffer.c
+index 3524b350..e5d97458 100644
+--- a/buffer.c
++++ b/buffer.c
+@@ -2204,9 +2204,9 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen)
+ #define IOV_LEN_TYPE unsigned long
+ #endif
+ #endif
+-#define NUM_READ_IOVEC 4
++#define NUM_READ_IOVEC 8
+ 
+-#define EVBUFFER_MAX_READ     4096
++#define EVBUFFER_MAX_READ     (128 * 1024)
+ 
+ /** Helper function to figure out which space to use for reading data into
+     an evbuffer.  Internal use only.
+diff --git a/bufferevent_async.c b/bufferevent_async.c
+index 40c7c5e8..c1624878 100644
+--- a/bufferevent_async.c
++++ b/bufferevent_async.c
+@@ -275,7 +275,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
+               }
+               at_most = read_high - cur_size;
+       } else {
+-              at_most = 16384; /* FIXME totally magic. */
++              at_most = 128 * 1024; /* FIXME totally magic. */
+       }
+ 
+       /* XXXX This over-commits. */
+diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c
+index 25874968..9bc2b577 100644
+--- a/bufferevent_ratelim.c
++++ b/bufferevent_ratelim.c
+@@ -179,7 +179,7 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
+ }
+ 
+ /* Default values for max_single_read & max_single_write variables. */
+-#define MAX_SINGLE_READ_DEFAULT 16384
++#define MAX_SINGLE_READ_DEFAULT (128 * 1024)
+ #define MAX_SINGLE_WRITE_DEFAULT 16384
+ 
+ #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
+diff --git a/http-internal.h b/http-internal.h
+index feaf436d..9f9b5ab5 100644
+--- a/http-internal.h
++++ b/http-internal.h
+@@ -167,6 +167,8 @@ struct evhttp {
+       void *gencbarg;
+       struct bufferevent* (*bevcb)(struct event_base *, void *);
+       void *bevcbarg;
++      int (*newreqcb)(struct evhttp_request *req, void *);
++      void *newreqcbarg;
+ 
+       struct event_base *base;
+ };
+diff --git a/http.c b/http.c
+index 04f089bc..53951cba 100644
+--- a/http.c
++++ b/http.c
 @@ -3975,6 +3975,14 @@ evhttp_set_bevcb(struct evhttp *http,
        http->bevcbarg = cbarg;
  }
@@ -16,7 +85,7 @@ diff -uprN a/http.c b/http.c
  /*
   * Request related functions
   */
-@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_reques
+@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_request *req)
                req->flags |= EVHTTP_REQ_NEEDS_FREE;
                return;
        }
@@ -25,7 +94,7 @@ diff -uprN a/http.c b/http.c
  
        if (req->remote_host != NULL)
                mm_free(req->remote_host);
-@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct
+@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct evhttp_request 
*req,
        req->on_complete_cb_arg = cb_arg;
  }
  
@@ -41,7 +110,7 @@ diff -uprN a/http.c b/http.c
  /*
   * Allows for inspection of the request URI
   */
-@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connec
+@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connection(struct 
evhttp_connection *evcon)
         */
        req->userdone = 1;
  
@@ -59,25 +128,15 @@ diff -uprN a/http.c b/http.c
  
        evhttp_start_read_(evcon);
  
-diff -uprN a/http-internal.h b/http-internal.h
---- a/http-internal.h  2020-07-05 20:02:46.000000000 +0800
-+++ b/http-internal.h  2021-09-28 13:56:13.925151028 +0800
-@@ -167,6 +167,8 @@ struct evhttp {
-       void *gencbarg;
-       struct bufferevent* (*bevcb)(struct event_base *, void *);
-       void *bevcbarg;
-+      int (*newreqcb)(struct evhttp_request *req, void *);
-+      void *newreqcbarg;
- 
-       struct event_base *base;
- };
-diff -uprN a/include/event2/http.h b/include/event2/http.h
---- a/include/event2/http.h    2020-07-05 20:02:46.000000000 +0800
-+++ b/include/event2/http.h    2021-09-28 13:56:13.928151231 +0800
-@@ -299,6 +299,20 @@ void evhttp_set_bevcb(struct evhttp *htt
+diff --git a/include/event2/http.h b/include/event2/http.h
+index 2a41303e..e80bab9a 100644
+--- a/include/event2/http.h
++++ b/include/event2/http.h
+@@ -298,6 +298,20 @@ EVENT2_EXPORT_SYMBOL
+ void evhttp_set_bevcb(struct evhttp *http,
      struct bufferevent *(*cb)(struct event_base *, void *), void *arg);
  
- /**
++/**
 +   Set a callback which allows the user to note or throttle incoming requests.
 +   The requests are not populated with HTTP level information. They
 +   are just associated to a connection.
@@ -91,10 +150,9 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
 +void evhttp_set_newreqcb(struct evhttp *http,
 +    int (*cb)(struct evhttp_request*, void *), void *arg);
 +
-+/**
+ /**
     Adds a virtual host to the http server.
  
-    A virtual host is a newly initialized evhttp object that has request
 @@ -624,6 +638,20 @@ EVENT2_EXPORT_SYMBOL
  void evhttp_request_set_on_complete_cb(struct evhttp_request *req,
      void (*cb)(struct evhttp_request *, void *), void *cb_arg);
@@ -116,9 +174,10 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
  /** Frees the request object and removes associated events. */
  EVENT2_EXPORT_SYMBOL
  void evhttp_request_free(struct evhttp_request *req);
-diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h
---- a/include/event2/http_struct.h     2020-07-05 20:02:46.000000000 +0800
-+++ b/include/event2/http_struct.h     2021-09-28 13:56:13.928151231 +0800
+diff --git a/include/event2/http_struct.h b/include/event2/http_struct.h
+index 4bf5b1ff..0762cabd 100644
+--- a/include/event2/http_struct.h
++++ b/include/event2/http_struct.h
 @@ -142,6 +142,12 @@ struct {
         */
        void (*on_complete_cb)(struct evhttp_request *, void *);
@@ -132,14 +191,3 @@ diff -uprN a/include/event2/http_struct.h 
b/include/event2/http_struct.h
  };
  
  #ifdef __cplusplus
-diff -uprN a/CMakeLists.txt b/CMakeLists.txt
---- a/CMakeLists.txt   2020-07-05 20:02:46.000000000 +0800
-+++ b/CMakeLists.txt   2022-01-10 13:29:32.912883436 +0800
-@@ -200,6 +200,6 @@ endif()
- if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
-     set(GNUC 1)
- endif()
--if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
-+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" 
STREQUAL "MSVC"))
-     set(MSVC 1)
- endif()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to