This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5b6761acb8 [enhencement](streamload) add on_close callback for httpserver (#20826) 5b6761acb8 is described below commit 5b6761acb86852a93351b7b971eb2049fb567aaf Author: zhengyu <freeman.zhang1...@gmail.com> AuthorDate: Thu Jun 15 13:44:02 2023 +0800 [enhencement](streamload) add on_close callback for httpserver (#20826) Sometimes connection cannot be released properly during on_free. We need on_close callback as the last resort. Signed-off-by: freemandealer <freeman.zhang1...@gmail.com> --- be/src/http/action/stream_load.cpp | 4 +- be/src/http/ev_http_server.cpp | 52 +++++++++++++++++++++- be/src/http/http_request.cpp | 9 +++- be/src/io/fs/stream_load_pipe.cpp | 25 ++++++++++- be/src/io/fs/stream_load_pipe.h | 11 ++++- be/src/runtime/exec_env.cpp | 2 +- be/src/runtime/exec_env.h | 6 +++ be/src/runtime/exec_env_init.cpp | 38 ++++++++++++++++ be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/stream_load/stream_load_context.cpp | 3 ++ be/src/runtime/stream_load/stream_load_context.h | 5 +++ be/src/util/byte_buffer.h | 15 ++++++- 12 files changed, 161 insertions(+), 11 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index fdf6f6eaca..33a526cd81 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false; LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db - << ", tbl=" << ctx->table; + << ", tbl=" << ctx->table << "two_phase_commit: " << ctx->two_phase_commit; auto st = _on_header(req, ctx); if (!st.ok()) { @@ -407,7 +407,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (ctx->use_streaming) { auto pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - ctx->body_bytes /* total_length */); + ctx->body_bytes /* total_length */, false, ctx->id); request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; ctx->pipe = pipe; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index b0743baee3..bdc6d420a7 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -22,6 +22,7 @@ #include <butil/fd_utility.h> // IWYU pragma: no_include <bthread/errno.h> #include <errno.h> // IWYU pragma: keep +#include <event.h> #include <event2/event.h> #include <event2/http.h> #include <event2/http_struct.h> @@ -35,12 +36,14 @@ #include <memory> #include <sstream> +#include "bvar/bvar.h" #include "common/logging.h" #include "http/http_channel.h" #include "http/http_handler.h" #include "http/http_headers.h" #include "http/http_request.h" #include "http/http_status.h" +#include "mutex" #include "service/backend_options.h" #include "util/threadpool.h" @@ -48,15 +51,50 @@ struct event_base; struct evhttp; namespace doris { +static std::map<evhttp_connection*, HttpRequest*> g_conn_req_map; +static std::mutex g_conn_req_map_lock; static void on_chunked(struct evhttp_request* ev_req, void* param) { HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg; request->handler()->on_chunk_data(request); } +static void on_close(evhttp_connection* con, void* arg) { + HttpRequest* request = (HttpRequest*)arg; + { + std::lock_guard<std::mutex> l(g_conn_req_map_lock); + auto itr = g_conn_req_map.find(con); + if (itr != g_conn_req_map.end()) { + if (itr->second) { + if (itr->second != request) { + LOG(WARNING) << "close connection. connection=" << con + << " current HttpRequest=" << request + << " but orginal HttpRequest=" << itr->second; + } + delete itr->second; + } + g_conn_req_map.erase(con); + } + } +} + static void on_free(struct evhttp_request* ev_req, void* arg) { HttpRequest* request = (HttpRequest*)arg; - delete request; + { + std::lock_guard<std::mutex> l(g_conn_req_map_lock); + auto itr = g_conn_req_map.find(ev_req->evcon); + if (itr != g_conn_req_map.end()) { + if (itr->second) { + if (itr->second != request) { + LOG(WARNING) << "free request. connection=" << ev_req->evcon + << " current HttpRequest=" << request + << " but orginal HttpRequest=" << itr->second; + } + delete itr->second; + } + g_conn_req_map.erase(ev_req->evcon); + } + } } static void on_request(struct evhttp_request* ev_req, void* arg) { @@ -125,6 +163,7 @@ void EvHttpServer::start() { std::shared_ptr<evhttp> http(evhttp_new(base.get()), [](evhttp* http) { evhttp_free(http); }); CHECK(http != nullptr) << "Couldn't create an evhttp."; + evhttp_set_timeout(http.get(), 60 /* timeout in seconds */); auto res = evhttp_accept_socket(http.get(), _server_fd); CHECK(res >= 0) << "evhttp accept socket failed, res=" << res; @@ -261,7 +300,18 @@ int EvHttpServer::on_header(struct evhttp_request* ev_req) { evhttp_request_set_chunked_cb(ev_req, on_chunked); } + { + std::lock_guard<std::mutex> l(g_conn_req_map_lock); + g_conn_req_map.erase(ev_req->evcon); + g_conn_req_map[ev_req->evcon] = request.get(); + } + + struct evhttp_connection* httpcon = evhttp_request_get_connection(ev_req); + evhttp_connection_set_closecb(httpcon, on_close, request.get()); evhttp_request_set_on_free_cb(ev_req, on_free, request.release()); + struct bufferevent* bufev = evhttp_connection_get_bufferevent(httpcon); + if (bufev) bufferevent_enable(bufev, EV_READ); + return 0; } diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 14bde591b4..97274d0e01 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -27,15 +27,22 @@ #include <unordered_map> #include <utility> +#include "bvar/bvar.h" #include "http/http_handler.h" +#include "util/url_coding.h" namespace doris { static std::string s_empty = ""; -HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {} +bvar::Adder<int64_t> g_http_request_cnt("http_request", "request_cnt"); + +HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) { + g_http_request_cnt << 1; +} HttpRequest::~HttpRequest() { + g_http_request_cnt << -1; if (_handler_ctx != nullptr) { DCHECK(_handler != nullptr); _handler->free_handler_ctx(_handler_ctx); diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index 25099c8796..2940a45e4d 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -33,20 +33,31 @@ namespace doris { namespace io { class IOContext; +std::map<UniqueId, io::StreamLoadPipe*> g_streamloadpipes; +std::mutex g_streamloadpipes_lock; + StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size, - int64_t total_length, bool use_proto) + int64_t total_length, bool use_proto, UniqueId id) : _buffered_bytes(0), _proto_buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), _total_length(total_length), - _use_proto(use_proto) {} + _use_proto(use_proto), + _id(id) { + std::lock_guard<std::mutex> l(g_streamloadpipes_lock); + g_streamloadpipes[_id] = this; +} StreamLoadPipe::~StreamLoadPipe() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); while (!_buf_queue.empty()) { _buf_queue.pop_front(); } + { + std::lock_guard<std::mutex> l(g_streamloadpipes_lock); + g_streamloadpipes.erase(_id); + } } Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read, @@ -113,6 +124,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t } Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { + _last_active = GetCurrentTimeMicros(); ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); buf->put_bytes(data, size); buf->flip(); @@ -120,6 +132,7 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr } Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) { + _last_active = GetCurrentTimeMicros(); PDataRow* row_ptr = row.get(); { std::unique_lock<std::mutex> l(_lock); @@ -130,6 +143,7 @@ Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) { } Status StreamLoadPipe::append(const char* data, size_t size) { + _last_active = GetCurrentTimeMicros(); size_t pos = 0; if (_write_buf != nullptr) { if (size < _write_buf->remaining()) { @@ -153,6 +167,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) { } Status StreamLoadPipe::append(const ByteBufferPtr& buf) { + _last_active = GetCurrentTimeMicros(); if (_write_buf != nullptr) { _write_buf->flip(); RETURN_IF_ERROR(_append(_write_buf)); @@ -226,6 +241,7 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size) // called when producer finished Status StreamLoadPipe::finish() { + LOG(INFO) << "finish pipe=" << this; if (_write_buf != nullptr) { _write_buf->flip(); _append(_write_buf); @@ -241,11 +257,16 @@ Status StreamLoadPipe::finish() { // called when producer/consumer failed void StreamLoadPipe::cancel(const std::string& reason) { + LOG(INFO) << "cancel pipe=" << this; { std::lock_guard<std::mutex> l(_lock); _cancelled = true; _cancelled_reason = reason; } + { + std::lock_guard<std::mutex> l(g_streamloadpipes_lock); + g_streamloadpipes.erase(_id); + } _get_cond.notify_all(); _put_cond.notify_all(); } diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index 848175ce9a..e41f5162ac 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -34,10 +34,13 @@ #include "runtime/message_body_sink.h" #include "util/byte_buffer.h" #include "util/slice.h" +#include "util/time.h" +#include "util/uid_util.h" namespace doris { namespace io { class IOContext; +class StreamLoadPipe; static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; @@ -45,7 +48,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, - bool use_proto = false); + bool use_proto = false, UniqueId id = UniqueId(0, 0)); ~StreamLoadPipe() override; Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0); @@ -78,6 +81,10 @@ public: size_t get_queue_size() { return _buf_queue.size(); } + bool is_cancelled() { return _cancelled; } + bool is_finished() { return _finished; } + uint64_t last_active() { return _last_active; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -109,6 +116,8 @@ private: std::condition_variable _get_cond; ByteBufferPtr _write_buf; + UniqueId _id; + uint64_t _last_active = 0; // no use, only for compatibility with the `Path` interface Path _path = ""; diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 8f63d6fe6a..9215479f62 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -21,7 +21,7 @@ namespace doris { -ExecEnv::ExecEnv() : _is_init(false) {} +ExecEnv::ExecEnv() : _is_init(false), _check_streamloadpipe_latch(1) {} ExecEnv::~ExecEnv() {} diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 46e1157820..00a58b956b 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -28,6 +28,8 @@ #include "common/status.h" #include "olap/options.h" +#include "util/countdown_latch.h" +#include "util/thread.h" #include "util/threadpool.h" namespace doris { @@ -191,6 +193,8 @@ private: void _register_metrics(); void _deregister_metrics(); + void _check_streamloadpipe(); + bool _is_init; std::vector<StorePath> _store_paths; // path => store index @@ -256,6 +260,8 @@ private: BlockSpillManager* _block_spill_mgr = nullptr; // To save meta info of external file, such as parquet footer. FileMetaCache* _file_meta_cache = nullptr; + CountDownLatch _check_streamloadpipe_latch; + scoped_refptr<Thread> _check_streamloadpipe_thread; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index d524fa13e2..40cccf0b77 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -27,6 +27,7 @@ #include <limits> #include <map> #include <memory> +#include <mutex> #include <ostream> #include <string> #include <unordered_map> @@ -37,6 +38,7 @@ #include "common/logging.h" #include "common/status.h" #include "io/fs/file_meta_cache.h" +#include "io/fs/stream_load_pipe.h" #include "olap/olap_define.h" #include "olap/options.h" #include "olap/page_cache.h" @@ -77,6 +79,8 @@ #include "util/parse_util.h" #include "util/pretty_printer.h" #include "util/threadpool.h" +#include "util/time.h" +#include "util/uid_util.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -85,7 +89,15 @@ #include "runtime/memory/tcmalloc_hook.h" #endif +namespace doris::io { +extern std::map<UniqueId, StreamLoadPipe*> g_streamloadpipes; +extern std::mutex g_streamloadpipes_lock; +} // namespace doris::io + namespace doris { + +using namespace io; + class PBackendService_Stub; class PFunctionService_Stub; @@ -95,6 +107,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT); +bvar::Adder<uint64_t> g_byte_buffer_allocate_kb; +bvar::Adder<uint64_t> g_byte_buffer_cnt; + Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) { return env->_init(store_paths); } @@ -180,6 +195,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _init_mem_env(); + RETURN_IF_ERROR(Thread::create( + "ExecEnv", "check_streamloadpipe", + [this]() { + uint32_t interval = 300; + while (!_check_streamloadpipe_latch.wait_for(std::chrono::seconds(interval))) { + _check_streamloadpipe(); + } + }, + &_check_streamloadpipe_thread)); + RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); _heartbeat_flags = new HeartbeatFlags(); _register_metrics(); @@ -207,6 +232,19 @@ Status ExecEnv::init_pipeline_task_scheduler() { return Status::OK(); } +void ExecEnv::_check_streamloadpipe() { + uint64_t now = GetCurrentTimeMicros(); + std::lock_guard<std::mutex> l(g_streamloadpipes_lock); + for (auto& pipe : g_streamloadpipes) { + if (pipe.second == nullptr || pipe.second->is_cancelled()) { + continue; + } + uint64_t diff_s = abs((int64_t)now - (int64_t)pipe.second->last_active()) / 1000000; + LOG(INFO) << "active StreamLoadPipe=" << pipe.second + << " diff_time_from_last_append=" << diff_s; + } +} + Status ExecEnv::_init_mem_env() { bool is_percent = false; std::stringstream ss; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f61eae31b9..c98bebafa1 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -571,7 +571,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_ctx->need_rollback = true; auto pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - -1 /* total_length */, true /* use_proto */); + -1 /* total_length */, true /* use_proto */, stream_load_ctx->id); stream_load_ctx->body_sink = pipe; stream_load_ctx->pipe = pipe; stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index f381ba097d..b0002c98dc 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -28,11 +28,14 @@ #include <new> #include <sstream> +#include "bvar/bvar.h" #include "common/logging.h" namespace doris { using namespace ErrorCode; +bvar::Adder<int64> g_streamloadctx_obj_cnt("streamloadctx", "obj_cnt"); + std::string StreamLoadContext::to_json() const { rapidjson::StringBuffer s; rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 0e004b12f5..c35d949780 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -31,6 +31,7 @@ #include <utility> #include <vector> +#include "bvar/bvar.h" #include "common/status.h" #include "common/utils.h" #include "runtime/exec_env.h" @@ -86,15 +87,19 @@ public: class MessageBodySink; +extern bvar::Adder<int64> g_streamloadctx_obj_cnt; + class StreamLoadContext { ENABLE_FACTORY_CREATOR(StreamLoadContext); public: StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) { start_millis = UnixMillis(); + g_streamloadctx_obj_cnt << 1; } ~StreamLoadContext() { + g_streamloadctx_obj_cnt << -1; if (need_rollback) { _exec_env->stream_load_executor()->rollback_txn(this); need_rollback = false; diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index aab8fd42db..9c296030c5 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -22,10 +22,14 @@ #include <cstddef> #include <memory> +#include "bvar/bvar.h" #include "common/logging.h" namespace doris { +extern bvar::Adder<uint64_t> g_byte_buffer_allocate_kb; +extern bvar::Adder<uint64_t> g_byte_buffer_cnt; + struct ByteBuffer; using ByteBufferPtr = std::shared_ptr<ByteBuffer>; @@ -35,7 +39,11 @@ struct ByteBuffer { return ptr; } - ~ByteBuffer() { delete[] ptr; } + ~ByteBuffer() { + delete[] ptr; + g_byte_buffer_allocate_kb << -(capacity / 1024); + g_byte_buffer_cnt << -1; + } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); @@ -63,7 +71,10 @@ struct ByteBuffer { private: ByteBuffer(size_t capacity_) - : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {} + : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) { + g_byte_buffer_allocate_kb << capacity / 1024; + g_byte_buffer_cnt << 1; + } }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org