This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 94539e7 Non blocking OlapTableSink (#3143) 94539e7 is described below commit 94539e7120120cc0f5d0343f3c32eb3c4a946fd0 Author: HuangWei <huangw...@xiaomi.com> AuthorDate: Thu May 7 10:43:41 2020 +0800 Non blocking OlapTableSink (#3143) ImplementaItion Notes NodeChannel _cur_batch -> _pending_batches: when _cur_batch is filled up, move it to _pending_batches. add_row() just produce batches. try_send_and_fetch_status() tries to consume one pending batch. If has in flight packet, skip send in this round. So we can add one sender thread to be in charge of all node channels try_send. IndexChannel init(), open() stay the same. Use for_each_node_channel() to expose the detailed changes of NodeChannel.(It's more easy to read & modify) Sender thread See func OlapTableSink::_send_batch_process() Why use polling? If we use wait/notify, it will notify when generate a new batch. We can't skip sending this batch, coz it won't notify the same batch again. So wait/notify can't avoid blocking simply. So I choose polling. It's wasting to continuously try_send(), but it's difficult to set the suitable polling interval. Thus, I add std::this_thread::yield() to give up the time slice, give priority to other process/threads (if there are other process/threads waiting in the queue). --- be/src/common/config.h | 2 + be/src/exec/tablet_sink.cpp | 568 ++++++++++++++++++++++---------------- be/src/exec/tablet_sink.h | 253 ++++++++++++----- be/test/exec/tablet_sink_test.cpp | 146 +++++----- 4 files changed, 581 insertions(+), 388 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index dbd9f53..f85e0ff 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -296,6 +296,8 @@ namespace config { // you may need to increase this timeout if using larger 'streaming_load_max_mb', // or encounter 'tablet writer write failed' error when loading. // CONF_Int32(tablet_writer_rpc_timeout_sec, "600"); + // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. + CONF_mInt32(olap_table_sink_send_interval_ms, "10"); // Fragment thread pool CONF_Int32(fragment_pool_thread_num, "64"); diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 59ff016..5e91fec 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -26,18 +26,17 @@ #include "runtime/tuple_row.h" #include "olap/hll.h" +#include "service/brpc.h" #include "util/brpc_stub_cache.h" +#include "util/monotime.h" #include "util/uid_util.h" -#include "service/brpc.h" namespace doris { namespace stream_load { -NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, - int64_t node_id, int32_t schema_hash) - : _parent(parent), _index_id(index_id), - _node_id(node_id), _schema_hash(schema_hash) { -} +NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id, + int32_t schema_hash) + : _parent(parent), _index_id(index_id), _node_id(node_id), _schema_hash(schema_hash) {} NodeChannel::~NodeChannel() { if (_open_closure != nullptr) { @@ -47,12 +46,11 @@ NodeChannel::~NodeChannel() { _open_closure = nullptr; } if (_add_batch_closure != nullptr) { - if (_add_batch_closure->unref()) { - delete _add_batch_closure; - } + // it's safe to delete, but may take some time to wait until brpc joined + delete _add_batch_closure; _add_batch_closure = nullptr; } - _add_batch_request.release_id(); + _cur_add_batch_request.release_id(); } Status NodeChannel::init(RuntimeState* state) { @@ -63,23 +61,30 @@ Status NodeChannel::init(RuntimeState* state) { ss << "unknown node id, id=" << _node_id; return Status::InternalError(ss.str()); } - RowDescriptor row_desc(_tuple_desc, false); - _batch.reset(new RowBatch(row_desc, state->batch_size(), _parent->_mem_tracker)); - _stub = state->exec_env()->brpc_stub_cache()->get_stub( - _node_info->host, _node_info->brpc_port); + _row_desc.reset(new RowDescriptor(_tuple_desc, false)); + _batch_size = state->batch_size(); + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); + + _stub = state->exec_env()->brpc_stub_cache()->get_stub(_node_info->host, _node_info->brpc_port); if (_stub == nullptr) { LOG(WARNING) << "Get rpc stub failed, host=" << _node_info->host - << ", port=" << _node_info->brpc_port; + << ", port=" << _node_info->brpc_port; + _cancelled = true; return Status::InternalError("get rpc stub failed"); } - // Initialize _add_batch_request - _add_batch_request.set_allocated_id(&_parent->_load_id); - _add_batch_request.set_index_id(_index_id); - _add_batch_request.set_sender_id(_parent->_sender_id); + // Initialize _cur_add_batch_request + _cur_add_batch_request.set_allocated_id(&_parent->_load_id); + _cur_add_batch_request.set_index_id(_index_id); + _cur_add_batch_request.set_sender_id(_parent->_sender_id); + _cur_add_batch_request.set_eos(false); _rpc_timeout_ms = state->query_options().query_timeout * 1000; + + _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id" + + std::to_string(_parent->_txn_id); + _name = "NodeChannel[" + std::to_string(_index_id) + "-" + std::to_string(_node_id) + "]"; return Status::OK(); } @@ -105,9 +110,7 @@ void NodeChannel::open() { // This ref is for RPC's reference _open_closure->ref(); _open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); - _stub->tablet_writer_open(&_open_closure->cntl, - &request, - &_open_closure->result, + _stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result, _open_closure); request.release_id(); request.release_schema(); @@ -117,8 +120,9 @@ Status NodeChannel::open_wait() { _open_closure->join(); if (_open_closure->cntl.Failed()) { LOG(WARNING) << "failed to open tablet writer, error=" - << berror(_open_closure->cntl.ErrorCode()) - << ", error_text=" << _open_closure->cntl.ErrorText(); + << berror(_open_closure->cntl.ErrorCode()) + << ", error_text=" << _open_closure->cntl.ErrorText(); + _cancelled = true; return Status::InternalError("failed to open tablet writer"); } Status status(_open_closure->result.status()); @@ -128,54 +132,134 @@ Status NodeChannel::open_wait() { _open_closure = nullptr; // add batch closure - _add_batch_closure = new RefCountClosure<PTabletWriterAddBatchResult>(); - _add_batch_closure->ref(); + _add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create(); + _add_batch_closure->addFailedHandler([this]() { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req rpc failed, " << print_load_info() + << ", node=" << node_info()->host << ":" << node_info()->brpc_port; + }); + + _add_batch_closure->addSuccessHandler( + [this](const PTabletWriterAddBatchResult& result, bool is_last_rpc) { + Status status(result.status()); + if (status.ok()) { + if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + } + _add_batches_finished = true; + } + } else { + _cancelled = true; + LOG(WARNING) << "NodeChannel add batch req success but status isn't ok, " + << print_load_info() << ", node=" << node_info()->host << ":" + << node_info()->brpc_port << ", errmsg=" << status.get_error_msg(); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_lock_time_us += result.wait_lock_time_us(); + _add_batch_counter.add_batch_num++; + } + }); return status; } Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { - auto row_no = _batch->add_row(); + // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't add_row. cancelled/eos: "); + } + + // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, + // so in the ideal case, mem limit is a matter for _plan node. + // But there is still some unfinished things, we do mem limit here temporarily. + while (_parent->_mem_tracker->any_limit_exceeded()) { + SCOPED_RAW_TIMER(&_mem_exceeded_block_ns); + SleepFor(MonoDelta::FromMilliseconds(10)); + } + + auto row_no = _cur_batch->add_row(); if (row_no == RowBatch::INVALID_ROW_INDEX) { - RETURN_IF_ERROR(_send_cur_batch()); - row_no = _batch->add_row(); + { + SCOPED_RAW_TIMER(&_queue_push_lock_ns); + std::lock_guard<std::mutex> l(_pending_batches_lock); + //To simplify the add_row logic, postpone adding batch into req until the time of sending req + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + } + + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker)); + _cur_add_batch_request.clear_tablet_ids(); + + row_no = _cur_batch->add_row(); } DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX); - auto tuple = input_tuple->deep_copy(*_tuple_desc, _batch->tuple_data_pool()); - _batch->get_row(row_no)->set_tuple(0, tuple); - _batch->commit_last_row(); - _add_batch_request.add_tablet_ids(tablet_id); + auto tuple = input_tuple->deep_copy(*_tuple_desc, _cur_batch->tuple_data_pool()); + _cur_batch->get_row(row_no)->set_tuple(0, tuple); + _cur_batch->commit_last_row(); + _cur_add_batch_request.add_tablet_ids(tablet_id); return Status::OK(); } -Status NodeChannel::close(RuntimeState* state) { - auto st = _close(state); - _batch.reset(); - return st; -} +Status NodeChannel::mark_close() { + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, can't mark as closed. cancelled/eos: "); + } -Status NodeChannel::_close(RuntimeState* state) { - return _send_cur_batch(true); + _cur_add_batch_request.set_eos(true); + { + std::lock_guard<std::mutex> l(_pending_batches_lock); + _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); + _pending_batches_num++; + DCHECK(_pending_batches.back().second.eos()); + } + + _eos_is_produced = true; } Status NodeChannel::close_wait(RuntimeState* state) { - RETURN_IF_ERROR(_wait_in_flight_packet()); - Status status(_add_batch_closure->result.status()); - if (status.ok()) { - for (auto& tablet : _add_batch_closure->result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - state->tablet_commit_infos().emplace_back(std::move(commit_info)); - } + auto st = none_of({_cancelled, !_eos_is_produced}); + if (!st.ok()) { + return st.clone_and_prepend("already stopped, skip waiting for close. cancelled/!eos: "); } - // clear batch after sendt - _batch.reset(); - return status; + + // waiting for finished, it may take a long time, so we could't set a timeout + MonotonicStopWatch timer; + timer.start(); + while (!_add_batches_finished && !_cancelled) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + timer.stop(); + VLOG(1) << name() << " close_wait cost: " << timer.elapsed_time() / 1000000 << " ms"; + + { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + DCHECK(_pending_batches.empty()); + DCHECK(_cur_batch == nullptr); + } + + if (_add_batches_finished) { + state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), + std::make_move_iterator(_tablet_commit_infos.begin()), + std::make_move_iterator(_tablet_commit_infos.end())); + return Status::OK(); + } + + return Status::InternalError("close wait failed coz rpc error"); } void NodeChannel::cancel() { - // Do we need to wait last rpc finished??? + // we don't need to wait last rpc finished, cause closure's release/reset will join. + // But do we need brpc::StartCancel(call_id)? + _cancelled = true; + PTabletWriterCancelRequest request; request.set_allocated_id(&_parent->_load_id); request.set_index_id(_index_id); @@ -185,80 +269,89 @@ void NodeChannel::cancel() { closure->ref(); closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _stub->tablet_writer_cancel(&closure->cntl, - &request, - &closure->result, - closure); + _stub->tablet_writer_cancel(&closure->cntl, &request, &closure->result, closure); request.release_id(); - // reset batch - _batch.reset(); + // Beware of the destruct sequence. RowBatches will use mem_trackers(include ancestors). + // Delete RowBatches here is a better choice to reduce the potential of dtor errors. + { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + std::queue<AddBatchReq> empty; + std::swap(_pending_batches, empty); + _cur_batch.reset(); + } } -Status NodeChannel::_wait_in_flight_packet() { - if (!_has_in_flight_packet) { - return Status::OK(); +int NodeChannel::try_send_and_fetch_status() { + auto st = none_of({_cancelled, _send_finished}); + if (!st.ok()) { + return 0; } - SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns()); - _add_batch_closure->join(); - _has_in_flight_packet = false; - if (_add_batch_closure->cntl.Failed()) { - LOG(WARNING) << "failed to send batch, error=" - << berror(_add_batch_closure->cntl.ErrorCode()) - << ", error_text=" << _add_batch_closure->cntl.ErrorText(); - return Status::InternalError("failed to send batch"); - } + if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) { + SCOPED_RAW_TIMER(&_actual_consume_ns); + AddBatchReq send_batch; + { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + DCHECK(!_pending_batches.empty()); + send_batch = std::move(_pending_batches.front()); + _pending_batches.pop(); + _pending_batches_num--; + } - if (_add_batch_closure->result.has_execution_time_us()) { - _parent->update_node_add_batch_counter(_node_id, - _add_batch_closure->result.execution_time_us(), - _add_batch_closure->result.wait_lock_time_us()); - } - return {_add_batch_closure->result.status()}; -} + auto row_batch = std::move(send_batch.first); + auto request = std::move(send_batch.second); // doesn't need to be saved in heap -Status NodeChannel::_send_cur_batch(bool eos) { - RETURN_IF_ERROR(_wait_in_flight_packet()); + // tablet_ids has already set when add row + request.set_packet_seq(_next_packet_seq); + if (row_batch->num_rows() > 0) { + SCOPED_RAW_TIMER(&_serialize_batch_ns); + row_batch->serialize(request.mutable_row_batch()); + } - // tablet_ids has already set when add row - _add_batch_request.set_eos(eos); - _add_batch_request.set_packet_seq(_next_packet_seq); - if (_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns()); - _batch->serialize(_add_batch_request.mutable_row_batch()); - } + _add_batch_closure->reset(); + _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); - _add_batch_closure->ref(); - _add_batch_closure->cntl.Reset(); - _add_batch_closure->cntl.set_timeout_ms(_rpc_timeout_ms); + if (request.eos()) { + for (auto pid : _parent->_partition_ids) { + request.add_partition_ids(pid); + } - if (eos) { - for (auto pid : _parent->_partition_ids) { - _add_batch_request.add_partition_ids(pid); + // eos request must be the last request + _add_batch_closure->end_mark(); + _send_finished = true; + DCHECK(_pending_batches_num == 0); } - } - _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, - &_add_batch_request, - &_add_batch_closure->result, - _add_batch_closure); - _add_batch_request.clear_tablet_ids(); - _add_batch_request.clear_row_batch(); - _add_batch_request.clear_partition_ids(); + _add_batch_closure->set_in_flight(); + _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, + &_add_batch_closure->result, _add_batch_closure); - _has_in_flight_packet = true; - _next_packet_seq++; + _next_packet_seq++; + } - _batch->reset(); - return Status::OK(); + return _send_finished ? 0 : 1; } -IndexChannel::~IndexChannel() { +Status NodeChannel::none_of(std::initializer_list<bool> vars) { + bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; }); + Status st = Status::OK(); + if (!none) { + std::string vars_str; + std::for_each(vars.begin(), vars.end(), + [&vars_str](bool var) -> void { vars_str += (var ? "1/" : "0/"); }); + if (!vars_str.empty()) { + vars_str.pop_back(); // 0/1/0/ -> 0/1/0 + } + st = Status::InternalError(vars_str); + } + + return st; } -Status IndexChannel::init(RuntimeState* state, - const std::vector<TTabletWithPartition>& tablets) { +IndexChannel::~IndexChannel() {} + +Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) { for (auto& tablet : tablets) { auto location = _parent->_location->find_tablet(tablet.tablet_id); if (location == nullptr) { @@ -287,119 +380,37 @@ Status IndexChannel::init(RuntimeState* state, return Status::OK(); } -Status IndexChannel::open() { - for (auto& it : _node_channels) { - it.second->open(); - } - for (auto& it : _node_channels) { - auto channel = it.second; - auto st = channel->open_wait(); - if (!st.ok()) { - LOG(WARNING) << "tablet open failed, load_id=" << _parent->_load_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "open failed, load_id=" << _parent->_load_id; - return st; - } - } - } - return Status::OK(); -} - Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) { auto it = _channels_by_tablet.find(tablet_id); DCHECK(it != std::end(_channels_by_tablet)) << "unknown tablet, tablet_id=" << tablet_id; for (auto channel : it->second) { - if (channel->already_failed()) { - continue; - } + // if this node channel is already failed, this add_row will be skipped auto st = channel->add_row(tuple, tablet_id); if (!st.ok()) { - LOG(WARNING) << "NodeChannel add row failed, load_id=" << _parent->_load_id - << ", tablet_id=" << tablet_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "add row failed, load_id=" << _parent->_load_id; - return st; - } + mark_as_failed(channel); } } - return Status::OK(); -} -Status IndexChannel::close(RuntimeState* state) { - std::vector<NodeChannel*> need_wait_channels; - need_wait_channels.reserve(_node_channels.size()); - - Status close_status; - for (auto& it : _node_channels) { - auto channel = it.second; - if (channel->already_failed() || !close_status.ok()) { - channel->cancel(); - continue; - } - auto st = channel->close(state); - if (st.ok()) { - need_wait_channels.push_back(channel); - } else { - LOG(WARNING) << "close node channel failed, load_id=" << _parent->_load_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "close failed, load_id=" << _parent->_load_id; - close_status = st; - } - } + if (has_intolerable_failure()) { + return Status::InternalError("index channel has intoleralbe failure"); } - if (close_status.ok()) { - for (auto channel : need_wait_channels) { - auto st = channel->close_wait(state); - if (!st.ok()) { - LOG(WARNING) << "close_wait node channel failed, load_id=" << _parent->_load_id - << ", node=" << channel->node_info()->host - << ":" << channel->node_info()->brpc_port - << ", errmsg=" << st.get_error_msg(); - if (_handle_failed_node(channel)) { - LOG(WARNING) << "close_wait failed, load_id=" << _parent->_load_id; - return st; - } - } - } - } - return close_status; -} - -void IndexChannel::cancel() { - for (auto& it : _node_channels) { - it.second->cancel(); - } + return Status::OK(); } -bool IndexChannel::_handle_failed_node(NodeChannel* channel) { - DCHECK(!channel->already_failed()); - channel->set_failed(); - _num_failed_channels++; - return _num_failed_channels >= ((_parent->_num_repicas + 1) / 2); +bool IndexChannel::has_intolerable_failure() { + return _failed_channels.size() >= ((_parent->_num_repicas + 1) / 2); } -OlapTableSink::OlapTableSink(ObjectPool* pool, - const RowDescriptor& row_desc, - const std::vector<TExpr>& texprs, - Status* status) +OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector<TExpr>& texprs, Status* status) : _pool(pool), _input_row_desc(row_desc), _filter_bitmap(1024) { if (!texprs.empty()) { *status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs); } } -OlapTableSink::~OlapTableSink() { -} +OlapTableSink::~OlapTableSink() {} Status OlapTableSink::init(const TDataSink& t_sink) { DCHECK(t_sink.__isset.olap_table_sink); @@ -443,8 +454,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { SCOPED_TIMER(_profile->total_time_counter()); // Prepare the exprs to run. - RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, - _input_row_desc, _expr_mem_tracker.get())); + RETURN_IF_ERROR( + Expr::prepare(_output_expr_ctxs, state, _input_row_desc, _expr_mem_tracker.get())); // get table's tuple descriptor _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); @@ -455,17 +466,17 @@ Status OlapTableSink::prepare(RuntimeState* state) { if (!_output_expr_ctxs.empty()) { if (_output_expr_ctxs.size() != _output_tuple_desc->slots().size()) { LOG(WARNING) << "number of exprs is not same with slots, num_exprs=" - << _output_expr_ctxs.size() - << ", num_slots=" << _output_tuple_desc->slots().size(); + << _output_expr_ctxs.size() + << ", num_slots=" << _output_tuple_desc->slots().size(); return Status::InternalError("number of exprs is not same with slots"); } for (int i = 0; i < _output_expr_ctxs.size(); ++i) { if (!is_type_compatible(_output_expr_ctxs[i]->root()->type().type, _output_tuple_desc->slots()[i]->type().type)) { LOG(WARNING) << "type of exprs is not match slot's, expr_type=" - << _output_expr_ctxs[i]->root()->type().type - << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type - << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); + << _output_expr_ctxs[i]->root()->type().type + << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type + << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); return Status::InternalError("expr's type is not same with slot's"); } } @@ -513,8 +524,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { _convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); - _close_timer = ADD_TIMER(_profile, "CloseTime"); - _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime"); + _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); + _non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime"); _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); _load_mem_limit = state->get_load_mem_limit(); @@ -546,9 +557,29 @@ Status OlapTableSink::open(RuntimeState* state) { // Prepare the exprs to run. RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); - for (auto channel : _channels) { - RETURN_IF_ERROR(channel->open()); + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([](NodeChannel* ch) { ch->open(); }); + } + + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) { + auto st = ch->open_wait(); + if (!st.ok()) { + LOG(WARNING) << "tablet open failed, " << ch->print_load_info() + << ", node=" << ch->node_info()->host << ":" + << ch->node_info()->brpc_port << ", errmsg=" << st.get_error_msg(); + index_channel->mark_as_failed(ch); + } + }); + + if (index_channel->has_intolerable_failure()) { + LOG(WARNING) << "open failed, load_id=" << _load_id; + return Status::InternalError("intolerable failure in opening node channels"); + } } + + _sender_thread = std::thread(&OlapTableSink::_send_batch_process, this); + return Status::OK(); } @@ -584,7 +615,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { if (!_partition->find_tablet(tuple, &partition, &dist_hash)) { std::stringstream ss; ss << "no partition for this tuple. tuple=" - << Tuple::to_string(tuple, *_output_tuple_desc); + << Tuple::to_string(tuple, *_output_tuple_desc); #if BE_TEST LOG(INFO) << ss.str(); #else @@ -610,57 +641,86 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null SCOPED_TIMER(_profile->total_time_counter()); + // BE id -> add_batch method counter + std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map; + int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0, + actual_consume_ns = 0; { SCOPED_TIMER(_close_timer); - for (auto channel : _channels) { - status = channel->close(state); - if (!status.ok()) { - LOG(WARNING) << "close channel failed, load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id; - } + for (auto index_channel : _channels) { + index_channel->for_each_node_channel( + [](NodeChannel* ch) { WARN_IF_ERROR(ch->mark_close(), ""); }); + } + + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map, + &serialize_batch_ns, &mem_exceeded_block_ns, + &queue_push_lock_ns, + &actual_consume_ns](NodeChannel* ch) { + status = ch->close_wait(state); + if (!status.ok()) { + LOG(WARNING) << "close channel failed, " << ch->print_load_info(); + } + ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, + &mem_exceeded_block_ns, &queue_push_lock_ns, + &actual_consume_ns); + }); } } + // TODO need to be improved + LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns + << ", total queue_push_lock_ns=" << queue_push_lock_ns + << ", total actual_consume_ns=" << actual_consume_ns; + COUNTER_SET(_input_rows_counter, _number_input_rows); COUNTER_SET(_output_rows_counter, _number_output_rows); COUNTER_SET(_filtered_rows_counter, _number_filtered_rows); COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); - COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); - COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); + COUNTER_SET(_non_blocking_send_timer, _non_blocking_send_ns); + COUNTER_SET(_serialize_batch_timer, serialize_batch_ns); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node - int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); + int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + + state->num_rows_load_unselected(); state->set_num_rows_load_total(num_rows_load_total); state->update_num_rows_load_filtered(_number_filtered_rows); // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; ss << "finished to close olap table sink. load_id=" << print_id(_load_id) - << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; - for (auto const& pair : _node_add_batch_counter_map) { - ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_ns / 1000) << ")(" - << (pair.second.add_batch_wait_lock_time_ns / 1000) << ")(" + << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: "; + for (auto const& pair : node_add_batch_counter_map) { + ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000) + << ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")(" << pair.second.add_batch_num << ")} "; } LOG(INFO) << ss.str(); - } else { for (auto channel : _channels) { - channel->cancel(); + channel->for_each_node_channel([](NodeChannel* ch) { ch->cancel(); }); } } + + // Sender join() must put after node channels mark_close/cancel. + // But there is no specific sequence required between sender join() & close_wait(). + if (_sender_thread.joinable()) { + _sender_thread.join(); + } + Expr::close(_output_expr_ctxs, state); _output_batch.reset(); return status; } -void OlapTableSink::_convert_batch(RuntimeState* state, RowBatch* input_batch, RowBatch* output_batch) { +void OlapTableSink::_convert_batch(RuntimeState* state, RowBatch* input_batch, + RowBatch* output_batch) { DCHECK_GE(output_batch->capacity(), input_batch->num_rows()); int commit_rows = 0; for (int i = 0; i < input_batch->num_rows(); ++i) { auto src_row = input_batch->get_row(i); - Tuple* dst_tuple = (Tuple*)output_batch->tuple_data_pool()->allocate( - _output_tuple_desc->byte_size()); + Tuple* dst_tuple = + (Tuple*)output_batch->tuple_data_pool()->allocate(_output_tuple_desc->byte_size()); bool ignore_this_row = false; for (int j = 0; j < _output_expr_ctxs.size(); ++j) { auto src_val = _output_expr_ctxs[j]->get_value(src_row); @@ -727,10 +787,10 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (str_val->len > desc->type().len) { std::stringstream ss; ss << "the length of input is too long than schema. " - << "column_name: " << desc->col_name() << "; " - << "input_str: [" << std::string(str_val->ptr, str_val->len) << "] " - << "schema length: " << desc->type().len << "; " - << "actual length: " << str_val->len << "; "; + << "column_name: " << desc->col_name() << "; " + << "input_str: [" << std::string(str_val->ptr, str_val->len) << "] " + << "schema length: " << desc->type().len << "; " + << "actual length: " << str_val->len << "; "; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -743,9 +803,8 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* continue; } // padding 0 to CHAR field - if (desc->type().type == TYPE_CHAR - && str_val->len < desc->type().len) { - auto new_ptr = (char*)batch->tuple_data_pool()->allocate(desc->type().len); + if (desc->type().type == TYPE_CHAR && str_val->len < desc->type().len) { + auto new_ptr = (char*)batch->tuple_data_pool()->allocate(desc->type().len); memcpy(new_ptr, str_val->ptr, str_val->len); memset(new_ptr + str_val->len, 0, desc->type().len - str_val->len); @@ -776,9 +835,9 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (*dec_val > _max_decimal_val[i] || *dec_val < _min_decimal_val[i]) { std::stringstream ss; ss << "decimal value is not valid for defination, column=" << desc->col_name() - << ", value=" << dec_val->to_string() - << ", precision=" << desc->type().precision - << ", scale=" << desc->type().scale; + << ", value=" << dec_val->to_string() + << ", precision=" << desc->type().precision + << ", scale=" << desc->type().scale; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -814,9 +873,9 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) { std::stringstream ss; ss << "decimal value is not valid for defination, column=" << desc->col_name() - << ", value=" << dec_val.to_string() - << ", precision=" << desc->type().precision - << ", scale=" << desc->type().scale; + << ", value=" << dec_val.to_string() + << ", precision=" << desc->type().precision + << ", scale=" << desc->type().scale; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -834,7 +893,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* if (!HyperLogLog::is_valid(*hll_val)) { std::stringstream ss; ss << "Content of HLL type column is invalid" - << "column_name: " << desc->col_name() << "; "; + << "column_name: " << desc->col_name() << "; "; #if BE_TEST LOG(INFO) << ss.str(); #else @@ -855,5 +914,24 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap* return filtered_rows; } +void OlapTableSink::_send_batch_process() { + SCOPED_RAW_TIMER(&_non_blocking_send_ns); + while (true) { + int running_channels_num = 0; + for (auto index_channel : _channels) { + index_channel->for_each_node_channel([&running_channels_num](NodeChannel* ch) { + running_channels_num += ch->try_send_and_fetch_status(); + }); + } + + if (running_channels_num == 0) { + LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), " + "consumer thread exit."; + return; + } + SleepFor(MonoDelta::FromMilliseconds(config::olap_table_sink_send_interval_ms)); + } } -} + +} // namespace stream_load +} // namespace doris diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 77fb89f..9ff18cf 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -18,22 +18,23 @@ #pragma once #include <memory> +#include <queue> #include <set> #include <string> #include <unordered_map> #include <utility> #include <vector> -#include "common/status.h" #include "common/object_pool.h" +#include "common/status.h" #include "exec/data_sink.h" #include "exec/tablet_info.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/palo_internal_service.pb.h" #include "util/bitmap.h" -#include "util/thrift_util.h" #include "util/ref_count_closure.h" +#include "util/thrift_util.h" namespace doris { @@ -47,18 +48,103 @@ class ExprContext; class TExpr; namespace stream_load { - + class OlapTableSink; +// The counter of add_batch rpc of a single node +struct AddBatchCounter { + // total execution time of a add_batch rpc + int64_t add_batch_execution_time_us = 0; + // lock waiting time in a add_batch rpc + int64_t add_batch_wait_lock_time_us = 0; + // number of add_batch call + int64_t add_batch_num = 0; + AddBatchCounter& operator+=(const AddBatchCounter& rhs) { + add_batch_execution_time_us += rhs.add_batch_execution_time_us; + add_batch_wait_lock_time_us += rhs.add_batch_wait_lock_time_us; + add_batch_num += rhs.add_batch_num; + return *this; + } + friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) { + AddBatchCounter sum = lhs; + sum += rhs; + return sum; + } +}; + +// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. +// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. +// Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. +template <typename T> +class ReusableClosure : public google::protobuf::Closure { +public: + ReusableClosure() : cid(INVALID_BTHREAD_ID) {} + ~ReusableClosure() { + // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. + join(); + } + + static ReusableClosure<T>* create() { return new ReusableClosure<T>(); } + + void addFailedHandler(std::function<void()> fn) { failed_handler = fn; } + void addSuccessHandler(std::function<void(const T&, bool)> fn) { success_handler = fn; } + + void join() { + if (cid != INVALID_BTHREAD_ID) { + brpc::Join(cid); + } + } + + // plz follow this order: reset() -> set_in_flight() -> send brpc batch + void reset() { + join(); + DCHECK(_packet_in_flight == false); + cntl.Reset(); + cid = cntl.call_id(); + } + + void set_in_flight() { + DCHECK(_packet_in_flight == false); + _packet_in_flight = true; + } + + bool is_packet_in_flight() { return _packet_in_flight; } + + void end_mark() { + DCHECK(_is_last_rpc == false); + _is_last_rpc = true; + } + + void Run() override { + DCHECK(_packet_in_flight); + if (cntl.Failed()) { + LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl.ErrorCode()) + << ", error_text=" << cntl.ErrorText(); + failed_handler(); + } else { + success_handler(result, _is_last_rpc); + } + _packet_in_flight = false; + } + + brpc::Controller cntl; + T result; + +private: + brpc::CallId cid; + std::atomic<bool> _packet_in_flight{false}; + std::atomic<bool> _is_last_rpc{false}; + std::function<void()> failed_handler; + std::function<void(const T&, bool)> success_handler; +}; + class NodeChannel { public: NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id, int32_t schema_hash); ~NodeChannel() noexcept; // called before open, used to add tablet loacted in this backend - void add_tablet(const TTabletWithPartition& tablet) { - _all_tablets.emplace_back(tablet); - } + void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); } Status init(RuntimeState* state); @@ -68,99 +154,128 @@ public: Status add_row(Tuple* tuple, int64_t tablet_id); - Status close(RuntimeState* state); + // two ways to stop channel: + // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. + // 2. just cancel() + Status mark_close(); Status close_wait(RuntimeState* state); void cancel(); - int64_t node_id() const { return _node_id; } + // return: + // 0: stopped, send finished(eos request has been sent), or any internal error; + // 1: running, haven't reach eos. + // only allow 1 rpc in flight + int try_send_and_fetch_status(); + + void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map, + int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns, + int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) { + (*add_batch_counter_map)[_node_id] += _add_batch_counter; + *serialize_batch_ns += _serialize_batch_ns; + *mem_exceeded_block_ns += _mem_exceeded_block_ns; + *queue_push_lock_ns += _queue_push_lock_ns; + *actual_consume_ns += _actual_consume_ns; + } - void set_failed() { _already_failed = true; } - bool already_failed() const { return _already_failed; } + int64_t node_id() const { return _node_id; } const NodeInfo* node_info() const { return _node_info; } + std::string print_load_info() const { return _load_info; } + std::string name() const { return _name; } -private: - Status _send_cur_batch(bool eos = false); - // wait inflight packet finish, return error if inflight packet return failed - Status _wait_in_flight_packet(); - - Status _close(RuntimeState* state); + Status none_of(std::initializer_list<bool> vars); private: OlapTableSink* _parent = nullptr; int64_t _index_id = -1; int64_t _node_id = -1; int32_t _schema_hash = 0; + std::string _load_info; + std::string _name; TupleDescriptor* _tuple_desc = nullptr; const NodeInfo* _node_info = nullptr; - bool _already_failed = false; - bool _has_in_flight_packet = false; // this should be set in init() using config int _rpc_timeout_ms = 60000; int64_t _next_packet_seq = 0; - std::unique_ptr<RowBatch> _batch; + // user cancel or get some errors + std::atomic<bool> _cancelled{false}; + + // send finished means the consumer thread which send the rpc can exit + std::atomic<bool> _send_finished{false}; + + // add batches finished means the last rpc has be responsed, used to check whether this channel can be closed + std::atomic<bool> _add_batches_finished{false}; + + bool _eos_is_produced{false}; // only for restricting producer behaviors + + std::unique_ptr<RowDescriptor> _row_desc; + int _batch_size = 0; + std::unique_ptr<RowBatch> _cur_batch; + PTabletWriterAddBatchRequest _cur_add_batch_request; + + std::mutex _pending_batches_lock; + using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, PTabletWriterAddBatchRequest>; + std::queue<AddBatchReq> _pending_batches; + std::atomic<int> _pending_batches_num{0}; + palo::PInternalService_Stub* _stub = nullptr; RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr; - RefCountClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr; + ReusableClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr; std::vector<TTabletWithPartition> _all_tablets; - PTabletWriterAddBatchRequest _add_batch_request; + std::vector<TTabletCommitInfo> _tablet_commit_infos; + + AddBatchCounter _add_batch_counter; + int64_t _serialize_batch_ns = 0; + + int64_t _mem_exceeded_block_ns = 0; + int64_t _queue_push_lock_ns = 0; + int64_t _actual_consume_ns = 0; }; class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, int32_t schema_hash) - : _parent(parent), _index_id(index_id), - _schema_hash(schema_hash) { - } + : _parent(parent), _index_id(index_id), _schema_hash(schema_hash) {} ~IndexChannel(); - Status init(RuntimeState* state, - const std::vector<TTabletWithPartition>& tablets); - Status open(); - Status add_row(Tuple* tuple, int64_t tablet_id); + Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets); - Status close(RuntimeState* state); + Status add_row(Tuple* tuple, int64_t tablet_id); - void cancel(); + void for_each_node_channel(const std::function<void(NodeChannel*)>& func) { + for (auto& it : _node_channels) { + func(it.second); + } + } -private: - // return true if this load can't success. - bool _handle_failed_node(NodeChannel* channel); + void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); } + bool has_intolerable_failure(); private: OlapTableSink* _parent; int64_t _index_id; int32_t _schema_hash; - int _num_failed_channels = 0; // BeId -> channel std::unordered_map<int64_t, NodeChannel*> _node_channels; // from tablet_id to backend channel std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet; + // BeId + std::set<int64_t> _failed_channels; }; -// The counter of add_batch rpc of a single node -struct AddBatchCounter { - // total execution time of a add_batch rpc - int64_t add_batch_execution_time_ns = 0; - // lock waiting time in a add_batch rpc - int64_t add_batch_wait_lock_time_ns = 0; - // number of add_batch call - int64_t add_batch_num = 0; -}; - -// write data to Olap Table. -// this class distributed data according to +// Write data to Olap Table. +// When OlapTableSink::open() called, there will be a consumer thread running in the background. +// When you call OlapTableSink::send(), you will be the productor who products pending batches. +// Join the consumer thread in close(). class OlapTableSink : public DataSink { public: // Construct from thrift struct which is generated by FE. - OlapTableSink(ObjectPool* pool, - const RowDescriptor& row_desc, - const std::vector<TExpr>& texprs, + OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs, Status* status); ~OlapTableSink() override; @@ -172,29 +287,11 @@ public: Status send(RuntimeState* state, RowBatch* batch) override; + // close() will send RPCs too. If RPCs failed, return error. Status close(RuntimeState* state, Status close_status) override; // Returns the runtime profile for the sink. - RuntimeProfile* profile() override { - return _profile; - } - - // these 2 counters does not thread-safe. make sure only one thread - // at a time can modify them. - int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } - int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; } - void update_node_add_batch_counter(int64_t be_id, int64_t add_batch_time_ns, int64_t wait_lock_time_ns) { - auto search = _node_add_batch_counter_map.find(be_id); - if (search == _node_add_batch_counter_map.end()) { - AddBatchCounter new_counter; - _node_add_batch_counter_map.emplace(be_id, std::move(new_counter)); - } - - AddBatchCounter& counter = _node_add_batch_counter_map[be_id]; - counter.add_batch_execution_time_ns += add_batch_time_ns; - counter.add_batch_wait_lock_time_ns += wait_lock_time_ns; - counter.add_batch_num += 1; - } + RuntimeProfile* profile() override { return _profile; } private: // convert input batch to output batch which will be loaded into OLAP table. @@ -206,6 +303,11 @@ private: // invalid row number is set in Bitmap int _validate_data(RuntimeState* state, RowBatch* batch, Bitmap* filter_bitmap); + // the consumer func of sending pending batches in every NodeChannel. + // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending. + // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the productor + void _send_batch_process(); + private: friend class NodeChannel; friend class IndexChannel; @@ -254,6 +356,8 @@ private: // index_channel std::vector<IndexChannel*> _channels; + std::thread _sender_thread; + std::vector<DecimalValue> _max_decimal_val; std::vector<DecimalValue> _min_decimal_val; @@ -264,7 +368,7 @@ private: int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; - int64_t _wait_in_flight_packet_ns = 0; + int64_t _non_blocking_send_ns = 0; int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; @@ -278,12 +382,9 @@ private: RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; - RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr; + RuntimeProfile::Counter* _non_blocking_send_timer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; - // BE id -> add_batch method counter - std::unordered_map<int64_t, AddBatchCounter> _node_add_batch_counter_map; - // load mem limit is for remote load channel int64_t _load_mem_limit = -1; @@ -291,5 +392,5 @@ private: int64_t _load_channel_timeout_s = 0; }; -} -} +} // namespace stream_load +} // namespace doris diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp index bef0794..0f563b3 100644 --- a/be/test/exec/tablet_sink_test.cpp +++ b/be/test/exec/tablet_sink_test.cpp @@ -19,25 +19,23 @@ #include <gtest/gtest.h> +#include "common/config.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" -#include "common/config.h" +#include "runtime/bufferpool/reservation_tracker.h" #include "runtime/decimal_value.h" +#include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" +#include "runtime/stream_load/load_stream_mgr.h" #include "runtime/thread_resource_mgr.h" #include "runtime/tuple_row.h" -#include "runtime/stream_load/load_stream_mgr.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" #include "util/cpu_info.h" #include "util/debug/leakcheck_disabler.h" -#include "runtime/descriptor_helper.h" -#include "runtime/bufferpool/reservation_tracker.h" -#include "runtime/exec_env.h" -#include "runtime/result_queue_mgr.h" -#include "runtime/thread_resource_mgr.h" namespace doris { namespace stream_load { @@ -46,8 +44,8 @@ Status k_add_batch_status; class OlapTableSinkTest : public testing::Test { public: - OlapTableSinkTest() { } - virtual ~OlapTableSinkTest() { } + OlapTableSinkTest() {} + virtual ~OlapTableSinkTest() {} void SetUp() override { k_add_batch_status = Status::OK(); _env = ExecEnv::GetInstance(); @@ -66,9 +64,16 @@ public: SAFE_DELETE(_env->_master_info); SAFE_DELETE(_env->_thread_mgr); SAFE_DELETE(_env->_buffer_reservation); + if (_server) { + _server->Stop(100); + _server->Join(); + SAFE_DELETE(_server); + } } + private: - ExecEnv* _env; + ExecEnv* _env = nullptr; + brpc::Server* _server = nullptr; }; TDataSink get_data_sink(TDescriptorTable* desc_tbl) { @@ -106,24 +111,42 @@ TDataSink get_data_sink(TDescriptorTable* desc_tbl) { { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(2).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().string_type(10).column_name("c3").column_pos(3).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(10) + .column_name("c3") + .column_pos(3) + .build()); tuple_builder.build(&dtb); } { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_BIGINT).column_name("c2").column_pos(2).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().string_type(20).column_name("c3").column_pos(3).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(20) + .column_name("c3") + .column_pos(3) + .build()); tuple_builder.build(&dtb); } @@ -212,10 +235,16 @@ TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) { { TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_INT).column_name("c1").column_pos(1).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().decimal_type(5, 2).column_name("c2").column_pos(2).build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .decimal_type(5, 2) + .column_name("c2") + .column_pos(2) + .build()); tuple_builder.build(&dtb); } @@ -271,8 +300,8 @@ TDataSink get_decimal_sink(TDescriptorTable* desc_tbl) { class TestInternalService : public palo::PInternalService { public: - TestInternalService() { } - virtual ~TestInternalService() { } + TestInternalService() {} + virtual ~TestInternalService() {} void transmit_data(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, @@ -281,7 +310,6 @@ public: done->Run(); } - void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, @@ -306,7 +334,7 @@ public: if (request->has_row_batch() && _row_desc != nullptr) { MemTracker tracker; RowBatch batch(*_row_desc, request->row_batch(), &tracker); - for (int i = 0; i < batch.num_rows(); ++i){ + for (int i = 0; i < batch.num_rows(); ++i) { LOG(INFO) << batch.get_row(i)->to_string(*_row_desc); _output_set->emplace(batch.get_row(i)->to_string(*_row_desc)); } @@ -330,13 +358,13 @@ public: TEST_F(OlapTableSinkTest, normal) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -361,7 +389,7 @@ TEST_F(OlapTableSinkTest, normal) { LOG(INFO) << "tuple_desc=" << tuple_desc->debug_string(); RowDescriptor row_desc(*desc_tbl, {0}, {false}); - + OlapTableSink sink(&obj_pool, row_desc, {}, &st); ASSERT_TRUE(st.ok()); @@ -433,21 +461,17 @@ TEST_F(OlapTableSinkTest, normal) { // 2node * 2 ASSERT_EQ(1, state.num_rows_load_filtered()); - - server->Stop(100); - server->Join(); - delete server; } TEST_F(OlapTableSinkTest, convert) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -470,7 +494,7 @@ TEST_F(OlapTableSinkTest, convert) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector<TExpr> exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -570,10 +594,6 @@ TEST_F(OlapTableSinkTest, convert) { // 2node * 2 ASSERT_EQ(0, state.num_rows_load_filtered()); - - server->Stop(100); - server->Join(); - delete server; } TEST_F(OlapTableSinkTest, init_fail1) { @@ -595,7 +615,7 @@ TEST_F(OlapTableSinkTest, init_fail1) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector<TExpr> exprs; exprs.resize(1); exprs[0].nodes.resize(1); @@ -653,7 +673,7 @@ TEST_F(OlapTableSinkTest, init_fail3) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector<TExpr> exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -712,7 +732,7 @@ TEST_F(OlapTableSinkTest, init_fail4) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector<TExpr> exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -755,13 +775,13 @@ TEST_F(OlapTableSinkTest, init_fail4) { TEST_F(OlapTableSinkTest, add_batch_failed) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -782,7 +802,7 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { RowDescriptor row_desc(*desc_tbl, {0}, {false}); - // expr + // expr std::vector<TExpr> exprs; exprs.resize(3); exprs[0].nodes.resize(1); @@ -845,21 +865,17 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { // close st = sink.close(&state, Status::OK()); ASSERT_FALSE(st.ok()); - - server->Stop(100); - server->Join(); - delete server; } TEST_F(OlapTableSinkTest, decimal) { // start brpc service first - auto server = new brpc::Server(); + _server = new brpc::Server(); auto service = new TestInternalService(); - server->AddService(service, brpc::SERVER_OWNS_SERVICE); + ASSERT_EQ(_server->AddService(service, brpc::SERVER_OWNS_SERVICE), 0); brpc::ServerOptions options; { debug::ScopedLeakCheckDisabler disable_lsan; - server->Start(4356, &options); + _server->Start(4356, &options); } TUniqueId fragment_id; @@ -885,7 +901,7 @@ TEST_F(OlapTableSinkTest, decimal) { service->_row_desc = &row_desc; std::set<std::string> output_set; service->_output_set = &output_set; - + OlapTableSink sink(&obj_pool, row_desc, {}, &st); ASSERT_TRUE(st.ok()); @@ -946,14 +962,10 @@ TEST_F(OlapTableSinkTest, decimal) { ASSERT_TRUE(output_set.count("[(12 12.3)]") > 0); ASSERT_TRUE(output_set.count("[(13 123.12)]") > 0); // ASSERT_TRUE(output_set.count("[(14 999.99)]") > 0); - - server->Stop(100); - server->Join(); - delete server; } -} -} +} // namespace stream_load +} // namespace doris int main(int argc, char* argv[]) { doris::CpuInfo::init(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org