This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new aea719627d Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927) aea719627d is described below commit aea719627def18e9fd9a895a63909b2c83a6d379 Author: zhengyu <freeman.zhang1...@gmail.com> AuthorDate: Sat Jun 17 10:39:02 2023 +0800 Revert "[enhencement](streamload) add on_close callback for httpserver (#20826)" (#20927) This reverts commit 5b6761acb86852a93351b7b971eb2049fb567aaf. --- be/src/http/action/stream_load.cpp | 4 +- be/src/http/ev_http_server.cpp | 52 +--------------------- be/src/http/http_request.cpp | 9 +--- be/src/io/fs/stream_load_pipe.cpp | 25 +---------- be/src/io/fs/stream_load_pipe.h | 11 +---- be/src/runtime/exec_env.cpp | 2 +- be/src/runtime/exec_env.h | 6 --- be/src/runtime/exec_env_init.cpp | 38 ---------------- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/stream_load/stream_load_context.cpp | 3 -- be/src/runtime/stream_load/stream_load_context.h | 5 --- be/src/util/byte_buffer.h | 15 +------ 12 files changed, 11 insertions(+), 161 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 33a526cd81..fdf6f6eaca 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false; LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db - << ", tbl=" << ctx->table << "two_phase_commit: " << ctx->two_phase_commit; + << ", tbl=" << ctx->table; auto st = _on_header(req, ctx); if (!st.ok()) { @@ -407,7 +407,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (ctx->use_streaming) { auto pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - ctx->body_bytes /* total_length */, false, ctx->id); + ctx->body_bytes /* total_length */); request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; ctx->pipe = pipe; diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index bdc6d420a7..b0743baee3 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -22,7 +22,6 @@ #include <butil/fd_utility.h> // IWYU pragma: no_include <bthread/errno.h> #include <errno.h> // IWYU pragma: keep -#include <event.h> #include <event2/event.h> #include <event2/http.h> #include <event2/http_struct.h> @@ -36,14 +35,12 @@ #include <memory> #include <sstream> -#include "bvar/bvar.h" #include "common/logging.h" #include "http/http_channel.h" #include "http/http_handler.h" #include "http/http_headers.h" #include "http/http_request.h" #include "http/http_status.h" -#include "mutex" #include "service/backend_options.h" #include "util/threadpool.h" @@ -51,50 +48,15 @@ struct event_base; struct evhttp; namespace doris { -static std::map<evhttp_connection*, HttpRequest*> g_conn_req_map; -static std::mutex g_conn_req_map_lock; static void on_chunked(struct evhttp_request* ev_req, void* param) { HttpRequest* request = (HttpRequest*)ev_req->on_free_cb_arg; request->handler()->on_chunk_data(request); } -static void on_close(evhttp_connection* con, void* arg) { - HttpRequest* request = (HttpRequest*)arg; - { - std::lock_guard<std::mutex> l(g_conn_req_map_lock); - auto itr = g_conn_req_map.find(con); - if (itr != g_conn_req_map.end()) { - if (itr->second) { - if (itr->second != request) { - LOG(WARNING) << "close connection. connection=" << con - << " current HttpRequest=" << request - << " but orginal HttpRequest=" << itr->second; - } - delete itr->second; - } - g_conn_req_map.erase(con); - } - } -} - static void on_free(struct evhttp_request* ev_req, void* arg) { HttpRequest* request = (HttpRequest*)arg; - { - std::lock_guard<std::mutex> l(g_conn_req_map_lock); - auto itr = g_conn_req_map.find(ev_req->evcon); - if (itr != g_conn_req_map.end()) { - if (itr->second) { - if (itr->second != request) { - LOG(WARNING) << "free request. connection=" << ev_req->evcon - << " current HttpRequest=" << request - << " but orginal HttpRequest=" << itr->second; - } - delete itr->second; - } - g_conn_req_map.erase(ev_req->evcon); - } - } + delete request; } static void on_request(struct evhttp_request* ev_req, void* arg) { @@ -163,7 +125,6 @@ void EvHttpServer::start() { std::shared_ptr<evhttp> http(evhttp_new(base.get()), [](evhttp* http) { evhttp_free(http); }); CHECK(http != nullptr) << "Couldn't create an evhttp."; - evhttp_set_timeout(http.get(), 60 /* timeout in seconds */); auto res = evhttp_accept_socket(http.get(), _server_fd); CHECK(res >= 0) << "evhttp accept socket failed, res=" << res; @@ -300,18 +261,7 @@ int EvHttpServer::on_header(struct evhttp_request* ev_req) { evhttp_request_set_chunked_cb(ev_req, on_chunked); } - { - std::lock_guard<std::mutex> l(g_conn_req_map_lock); - g_conn_req_map.erase(ev_req->evcon); - g_conn_req_map[ev_req->evcon] = request.get(); - } - - struct evhttp_connection* httpcon = evhttp_request_get_connection(ev_req); - evhttp_connection_set_closecb(httpcon, on_close, request.get()); evhttp_request_set_on_free_cb(ev_req, on_free, request.release()); - struct bufferevent* bufev = evhttp_connection_get_bufferevent(httpcon); - if (bufev) bufferevent_enable(bufev, EV_READ); - return 0; } diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 97274d0e01..14bde591b4 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -27,22 +27,15 @@ #include <unordered_map> #include <utility> -#include "bvar/bvar.h" #include "http/http_handler.h" -#include "util/url_coding.h" namespace doris { static std::string s_empty = ""; -bvar::Adder<int64_t> g_http_request_cnt("http_request", "request_cnt"); - -HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) { - g_http_request_cnt << 1; -} +HttpRequest::HttpRequest(evhttp_request* evhttp_request) : _ev_req(evhttp_request) {} HttpRequest::~HttpRequest() { - g_http_request_cnt << -1; if (_handler_ctx != nullptr) { DCHECK(_handler != nullptr); _handler->free_handler_ctx(_handler_ctx); diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index 2940a45e4d..25099c8796 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -33,31 +33,20 @@ namespace doris { namespace io { class IOContext; -std::map<UniqueId, io::StreamLoadPipe*> g_streamloadpipes; -std::mutex g_streamloadpipes_lock; - StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size, - int64_t total_length, bool use_proto, UniqueId id) + int64_t total_length, bool use_proto) : _buffered_bytes(0), _proto_buffered_bytes(0), _max_buffered_bytes(max_buffered_bytes), _min_chunk_size(min_chunk_size), _total_length(total_length), - _use_proto(use_proto), - _id(id) { - std::lock_guard<std::mutex> l(g_streamloadpipes_lock); - g_streamloadpipes[_id] = this; -} + _use_proto(use_proto) {} StreamLoadPipe::~StreamLoadPipe() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); while (!_buf_queue.empty()) { _buf_queue.pop_front(); } - { - std::lock_guard<std::mutex> l(g_streamloadpipes_lock); - g_streamloadpipes.erase(_id); - } } Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read, @@ -124,7 +113,6 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t } Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { - _last_active = GetCurrentTimeMicros(); ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1)); buf->put_bytes(data, size); buf->flip(); @@ -132,7 +120,6 @@ Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t pr } Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) { - _last_active = GetCurrentTimeMicros(); PDataRow* row_ptr = row.get(); { std::unique_lock<std::mutex> l(_lock); @@ -143,7 +130,6 @@ Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) { } Status StreamLoadPipe::append(const char* data, size_t size) { - _last_active = GetCurrentTimeMicros(); size_t pos = 0; if (_write_buf != nullptr) { if (size < _write_buf->remaining()) { @@ -167,7 +153,6 @@ Status StreamLoadPipe::append(const char* data, size_t size) { } Status StreamLoadPipe::append(const ByteBufferPtr& buf) { - _last_active = GetCurrentTimeMicros(); if (_write_buf != nullptr) { _write_buf->flip(); RETURN_IF_ERROR(_append(_write_buf)); @@ -241,7 +226,6 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size) // called when producer finished Status StreamLoadPipe::finish() { - LOG(INFO) << "finish pipe=" << this; if (_write_buf != nullptr) { _write_buf->flip(); _append(_write_buf); @@ -257,16 +241,11 @@ Status StreamLoadPipe::finish() { // called when producer/consumer failed void StreamLoadPipe::cancel(const std::string& reason) { - LOG(INFO) << "cancel pipe=" << this; { std::lock_guard<std::mutex> l(_lock); _cancelled = true; _cancelled_reason = reason; } - { - std::lock_guard<std::mutex> l(g_streamloadpipes_lock); - g_streamloadpipes.erase(_id); - } _get_cond.notify_all(); _put_cond.notify_all(); } diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index e41f5162ac..848175ce9a 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -34,13 +34,10 @@ #include "runtime/message_body_sink.h" #include "util/byte_buffer.h" #include "util/slice.h" -#include "util/time.h" -#include "util/uid_util.h" namespace doris { namespace io { class IOContext; -class StreamLoadPipe; static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; @@ -48,7 +45,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, - bool use_proto = false, UniqueId id = UniqueId(0, 0)); + bool use_proto = false); ~StreamLoadPipe() override; Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0); @@ -81,10 +78,6 @@ public: size_t get_queue_size() { return _buf_queue.size(); } - bool is_cancelled() { return _cancelled; } - bool is_finished() { return _finished; } - uint64_t last_active() { return _last_active; } - protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -116,8 +109,6 @@ private: std::condition_variable _get_cond; ByteBufferPtr _write_buf; - UniqueId _id; - uint64_t _last_active = 0; // no use, only for compatibility with the `Path` interface Path _path = ""; diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 9215479f62..8f63d6fe6a 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -21,7 +21,7 @@ namespace doris { -ExecEnv::ExecEnv() : _is_init(false), _check_streamloadpipe_latch(1) {} +ExecEnv::ExecEnv() : _is_init(false) {} ExecEnv::~ExecEnv() {} diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 00a58b956b..46e1157820 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -28,8 +28,6 @@ #include "common/status.h" #include "olap/options.h" -#include "util/countdown_latch.h" -#include "util/thread.h" #include "util/threadpool.h" namespace doris { @@ -193,8 +191,6 @@ private: void _register_metrics(); void _deregister_metrics(); - void _check_streamloadpipe(); - bool _is_init; std::vector<StorePath> _store_paths; // path => store index @@ -260,8 +256,6 @@ private: BlockSpillManager* _block_spill_mgr = nullptr; // To save meta info of external file, such as parquet footer. FileMetaCache* _file_meta_cache = nullptr; - CountDownLatch _check_streamloadpipe_latch; - scoped_refptr<Thread> _check_streamloadpipe_thread; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 40cccf0b77..d524fa13e2 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -27,7 +27,6 @@ #include <limits> #include <map> #include <memory> -#include <mutex> #include <ostream> #include <string> #include <unordered_map> @@ -38,7 +37,6 @@ #include "common/logging.h" #include "common/status.h" #include "io/fs/file_meta_cache.h" -#include "io/fs/stream_load_pipe.h" #include "olap/olap_define.h" #include "olap/options.h" #include "olap/page_cache.h" @@ -79,8 +77,6 @@ #include "util/parse_util.h" #include "util/pretty_printer.h" #include "util/threadpool.h" -#include "util/time.h" -#include "util/uid_util.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -89,15 +85,7 @@ #include "runtime/memory/tcmalloc_hook.h" #endif -namespace doris::io { -extern std::map<UniqueId, StreamLoadPipe*> g_streamloadpipes; -extern std::mutex g_streamloadpipes_lock; -} // namespace doris::io - namespace doris { - -using namespace io; - class PBackendService_Stub; class PFunctionService_Stub; @@ -107,9 +95,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT); -bvar::Adder<uint64_t> g_byte_buffer_allocate_kb; -bvar::Adder<uint64_t> g_byte_buffer_cnt; - Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) { return env->_init(store_paths); } @@ -195,16 +180,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _init_mem_env(); - RETURN_IF_ERROR(Thread::create( - "ExecEnv", "check_streamloadpipe", - [this]() { - uint32_t interval = 300; - while (!_check_streamloadpipe_latch.wait_for(std::chrono::seconds(interval))) { - _check_streamloadpipe(); - } - }, - &_check_streamloadpipe_thread)); - RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); _heartbeat_flags = new HeartbeatFlags(); _register_metrics(); @@ -232,19 +207,6 @@ Status ExecEnv::init_pipeline_task_scheduler() { return Status::OK(); } -void ExecEnv::_check_streamloadpipe() { - uint64_t now = GetCurrentTimeMicros(); - std::lock_guard<std::mutex> l(g_streamloadpipes_lock); - for (auto& pipe : g_streamloadpipes) { - if (pipe.second == nullptr || pipe.second->is_cancelled()) { - continue; - } - uint64_t diff_s = abs((int64_t)now - (int64_t)pipe.second->last_active()) / 1000000; - LOG(INFO) << "active StreamLoadPipe=" << pipe.second - << " diff_time_from_last_append=" << diff_s; - } -} - Status ExecEnv::_init_mem_env() { bool is_percent = false; std::stringstream ss; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 4581768199..648c9373d1 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -571,7 +571,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_ctx->need_rollback = true; auto pipe = std::make_shared<io::StreamLoadPipe>( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - -1 /* total_length */, true /* use_proto */, stream_load_ctx->id); + -1 /* total_length */, true /* use_proto */); stream_load_ctx->body_sink = pipe; stream_load_ctx->pipe = pipe; stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index b0002c98dc..f381ba097d 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -28,14 +28,11 @@ #include <new> #include <sstream> -#include "bvar/bvar.h" #include "common/logging.h" namespace doris { using namespace ErrorCode; -bvar::Adder<int64> g_streamloadctx_obj_cnt("streamloadctx", "obj_cnt"); - std::string StreamLoadContext::to_json() const { rapidjson::StringBuffer s; rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index c35d949780..0e004b12f5 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -31,7 +31,6 @@ #include <utility> #include <vector> -#include "bvar/bvar.h" #include "common/status.h" #include "common/utils.h" #include "runtime/exec_env.h" @@ -87,19 +86,15 @@ public: class MessageBodySink; -extern bvar::Adder<int64> g_streamloadctx_obj_cnt; - class StreamLoadContext { ENABLE_FACTORY_CREATOR(StreamLoadContext); public: StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) { start_millis = UnixMillis(); - g_streamloadctx_obj_cnt << 1; } ~StreamLoadContext() { - g_streamloadctx_obj_cnt << -1; if (need_rollback) { _exec_env->stream_load_executor()->rollback_txn(this); need_rollback = false; diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index 9c296030c5..aab8fd42db 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -22,14 +22,10 @@ #include <cstddef> #include <memory> -#include "bvar/bvar.h" #include "common/logging.h" namespace doris { -extern bvar::Adder<uint64_t> g_byte_buffer_allocate_kb; -extern bvar::Adder<uint64_t> g_byte_buffer_cnt; - struct ByteBuffer; using ByteBufferPtr = std::shared_ptr<ByteBuffer>; @@ -39,11 +35,7 @@ struct ByteBuffer { return ptr; } - ~ByteBuffer() { - delete[] ptr; - g_byte_buffer_allocate_kb << -(capacity / 1024); - g_byte_buffer_cnt << -1; - } + ~ByteBuffer() { delete[] ptr; } void put_bytes(const char* data, size_t size) { memcpy(ptr + pos, data, size); @@ -71,10 +63,7 @@ struct ByteBuffer { private: ByteBuffer(size_t capacity_) - : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) { - g_byte_buffer_allocate_kb << capacity / 1024; - g_byte_buffer_cnt << 1; - } + : ptr(new char[capacity_]), pos(0), limit(capacity_), capacity(capacity_) {} }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org