This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b3a9c247af [refactor](move-memtable) add load stream stub (#23642) b3a9c247af is described below commit b3a9c247af89fa9b5eaa7e8463cf589e4b02d9fd Author: Kaijie Chen <c...@apache.org> AuthorDate: Thu Aug 31 19:39:34 2023 +0800 [refactor](move-memtable) add load stream stub (#23642) --- be/src/io/fs/stream_sink_file_writer.cpp | 60 ++----- be/src/io/fs/stream_sink_file_writer.h | 36 +--- be/src/olap/delta_writer_context.h | 6 - be/src/olap/delta_writer_v2.cpp | 24 ++- be/src/olap/delta_writer_v2.h | 10 +- be/src/olap/rowset/beta_rowset_writer_v2.cpp | 22 +-- be/src/olap/rowset/beta_rowset_writer_v2.h | 6 +- be/src/olap/rowset/rowset_writer_context.h | 3 - be/src/vec/sink/load_stream_stub.cpp | 232 +++++++++++++++++++++++++ be/src/vec/sink/load_stream_stub.h | 192 ++++++++++++++++++++ be/src/vec/sink/vtablet_sink_v2.cpp | 228 +++++------------------- be/src/vec/sink/vtablet_sink_v2.h | 20 +-- be/test/io/fs/stream_sink_file_writer_test.cpp | 161 ++++++----------- 13 files changed, 578 insertions(+), 422 deletions(-) diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp index ff2667b9fa..6726933513 100644 --- a/be/src/io/fs/stream_sink_file_writer.cpp +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -21,6 +21,7 @@ #include "olap/olap_common.h" #include "olap/rowset/beta_rowset_writer.h" +#include "vec/sink/load_stream_stub.h" namespace doris { namespace io { @@ -35,38 +36,24 @@ void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id, int64_t _index_id = index_id; _tablet_id = tablet_id; _segment_id = segment_id; - - _header.set_src_id(_sender_id); - *_header.mutable_load_id() = _load_id; - _header.set_partition_id(_partition_id); - _header.set_index_id(_index_id); - _header.set_tablet_id(_tablet_id); - _header.set_segment_id(_segment_id); - _header.set_opcode(doris::PStreamHeader::APPEND_DATA); - _append_header(); } Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { size_t bytes_req = 0; for (int i = 0; i < data_cnt; i++) { bytes_req += data[i].get_size(); - _buf.append(data[i].get_data(), data[i].get_size()); } - _pending_bytes += bytes_req; _bytes_appended += bytes_req; VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string() << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id - << ", segment_id: " << _segment_id << ", data_length: " << bytes_req - << ", current batched bytes: " << _pending_bytes; + << ", segment_id: " << _segment_id << ", data_length: " << bytes_req; - if (_pending_bytes >= _max_pending_bytes) { - RETURN_IF_ERROR(_stream_sender(_buf)); - _buf.clear(); - _append_header(); - _pending_bytes = 0; + std::span<const Slice> slices {data, data_cnt}; + for (auto& stream : _streams) { + RETURN_IF_ERROR( + stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, slices)); } - return Status::OK(); } @@ -75,38 +62,11 @@ Status StreamSinkFileWriter::finalize() { << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; // TODO(zhengyu): update get_inverted_index_file_size into stat - Status status = _stream_sender(_buf); - // send eos - _buf.clear(); - _header.set_segment_eos(true); - _append_header(); - status = _stream_sender(_buf); - return status; -} - -void StreamSinkFileWriter::_append_header() { - size_t header_len = _header.ByteSizeLong(); - _buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len)); - _buf.append(_header.SerializeAsString()); -} - -Status StreamSinkFileWriter::send_with_retry(brpc::StreamId stream, butil::IOBuf buf) { - while (true) { - int ret = brpc::StreamWrite(stream, buf); - if (ret == EAGAIN) { - const timespec time = butil::seconds_from_now(60); - int wait_result = brpc::StreamWait(stream, &time); - if (wait_result == 0) { - continue; - } else { - return Status::InternalError("fail to send data when wait stream"); - } - } else if (ret == EINVAL) { - return Status::InternalError("fail to send data when stream write"); - } else { - return Status::OK(); - } + for (auto& stream : _streams) { + RETURN_IF_ERROR( + stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, {}, true)); } + return Status::OK(); } Status StreamSinkFileWriter::close() { diff --git a/be/src/io/fs/stream_sink_file_writer.h b/be/src/io/fs/stream_sink_file_writer.h index 074a2f0f5a..6c40543b93 100644 --- a/be/src/io/fs/stream_sink_file_writer.h +++ b/be/src/io/fs/stream_sink_file_writer.h @@ -27,18 +27,16 @@ namespace doris { +class LoadStreamStub; + struct RowsetId; struct SegmentStatistics; namespace io { class StreamSinkFileWriter : public FileWriter { public: - StreamSinkFileWriter(int sender_id, std::vector<brpc::StreamId> stream_id) - : _sender_id(sender_id), _streams(stream_id) {} - - static void deleter(void* data) { ::free(data); } - - static Status send_with_retry(brpc::StreamId stream, butil::IOBuf buf); + StreamSinkFileWriter(std::vector<std::shared_ptr<LoadStreamStub>>& streams) + : _streams(streams) {} void init(PUniqueId load_id, int64_t partition_id, int64_t index_id, int64_t tablet_id, int32_t segment_id); @@ -58,30 +56,10 @@ public: } private: - Status _stream_sender(butil::IOBuf buf) const { - for (auto stream : _streams) { - LOG(INFO) << "writer flushing, load_id: " << UniqueId(_load_id).to_string() - << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id - << ", segment_id: " << _segment_id << ", stream id: " << stream - << ", buf size: " << buf.size(); - - RETURN_IF_ERROR(send_with_retry(stream, buf)); - } - return Status::OK(); - } - - void _append_header(); - -private: - PStreamHeader _header; - butil::IOBuf _buf; - - std::queue<Slice> _pending_slices; - size_t _max_pending_bytes = config::brpc_streaming_client_batch_bytes; - size_t _pending_bytes; + template <bool eos> + Status _flush(); - int _sender_id; - std::vector<brpc::StreamId> _streams; + std::vector<std::shared_ptr<LoadStreamStub>> _streams; PUniqueId _load_id; int64_t _partition_id; diff --git a/be/src/olap/delta_writer_context.h b/be/src/olap/delta_writer_context.h index 680f2d0b6f..da506b10b5 100644 --- a/be/src/olap/delta_writer_context.h +++ b/be/src/olap/delta_writer_context.h @@ -28,7 +28,6 @@ namespace doris { class TupleDescriptor; class SlotDescriptor; class OlapTableSchemaParam; -class TabletSchema; struct WriteRequest { int64_t tablet_id; @@ -42,11 +41,6 @@ struct WriteRequest { bool is_high_priority = false; OlapTableSchemaParam* table_schema_param; int64_t index_id = 0; - // for DeltaWriterV2 - std::shared_ptr<TabletSchema> tablet_schema; - bool enable_unique_key_merge_on_write = false; - int sender_id = 0; - std::vector<brpc::StreamId> streams; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 6f42ae068e..ff1a341f5e 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -58,23 +58,27 @@ #include "util/stopwatch.hpp" #include "util/time.h" #include "vec/core/block.h" +#include "vec/sink/load_stream_stub.h" namespace doris { using namespace ErrorCode; -Status DeltaWriterV2::open(WriteRequest* req, DeltaWriterV2** writer, RuntimeProfile* profile) { - *writer = new DeltaWriterV2(req, StorageEngine::instance(), profile); +Status DeltaWriterV2::open(WriteRequest* req, + const std::vector<std::shared_ptr<LoadStreamStub>>& streams, + DeltaWriterV2** writer, RuntimeProfile* profile) { + *writer = new DeltaWriterV2(req, streams, StorageEngine::instance(), profile); return Status::OK(); } -DeltaWriterV2::DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, - RuntimeProfile* profile) +DeltaWriterV2::DeltaWriterV2(WriteRequest* req, + const std::vector<std::shared_ptr<LoadStreamStub>>& streams, + StorageEngine* storage_engine, RuntimeProfile* profile) : _req(*req), _tablet_schema(new TabletSchema), _profile(profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, true)), _memtable_writer(new MemTableWriter(*req)), - _streams(req->streams) { + _streams(streams) { _init_profile(profile); } @@ -97,7 +101,8 @@ Status DeltaWriterV2::init() { return Status::OK(); } // build tablet schema in request level - _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_req.tablet_schema.get()); + _build_current_tablet_schema(_req.index_id, _req.table_schema_param, + *_streams[0]->tablet_schema(_req.index_id)); RowsetWriterContext context; context.txn_id = _req.txn_id; context.load_id = _req.load_id; @@ -112,17 +117,18 @@ Status DeltaWriterV2::init() { context.tablet_id = _req.tablet_id; context.partition_id = _req.partition_id; context.tablet_schema_hash = _req.schema_hash; - context.enable_unique_key_merge_on_write = _req.enable_unique_key_merge_on_write; + context.enable_unique_key_merge_on_write = _streams[0]->enable_unique_mow(_req.index_id); context.rowset_type = RowsetTypePB::BETA_ROWSET; context.rowset_id = StorageEngine::instance()->next_rowset_id(); context.data_dir = nullptr; - context.sender_id = _req.sender_id; _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams); _rowset_writer->init(context); - _memtable_writer->init(_rowset_writer, _tablet_schema, _req.enable_unique_key_merge_on_write); + _memtable_writer->init(_rowset_writer, _tablet_schema, + _streams[0]->enable_unique_mow(_req.index_id)); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; + _streams.clear(); return Status::OK(); } diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 0d78162035..741d939fa8 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -52,6 +52,7 @@ class TupleDescriptor; class SlotDescriptor; class OlapTableSchemaParam; class BetaRowsetWriterV2; +class LoadStreamStub; namespace vectorized { class Block; @@ -61,7 +62,9 @@ class Block; // This class is NOT thread-safe, external synchronization is required. class DeltaWriterV2 { public: - static Status open(WriteRequest* req, DeltaWriterV2** writer, RuntimeProfile* profile); + static Status open(WriteRequest* req, + const std::vector<std::shared_ptr<LoadStreamStub>>& streams, + DeltaWriterV2** writer, RuntimeProfile* profile); ~DeltaWriterV2(); @@ -95,7 +98,8 @@ public: int64_t total_received_rows() const { return _total_received_rows; } private: - DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile); + DeltaWriterV2(WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>& streams, + StorageEngine* storage_engine, RuntimeProfile* profile); void _build_current_tablet_schema(int64_t index_id, const OlapTableSchemaParam* table_schema_param, @@ -122,7 +126,7 @@ private: std::shared_ptr<MemTableWriter> _memtable_writer; MonotonicStopWatch _lock_watch; - std::vector<brpc::StreamId> _streams; + std::vector<std::shared_ptr<LoadStreamStub>> _streams; }; } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index d7a641b6e0..a4bc32e32b 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -54,11 +54,12 @@ #include "util/time.h" #include "vec/common/schema_util.h" // LocalSchemaChangeRecorder #include "vec/core/block.h" +#include "vec/sink/load_stream_stub.h" namespace doris { using namespace ErrorCode; -BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<brpc::StreamId>& streams) +BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector<std::shared_ptr<LoadStreamStub>>& streams) : _next_segment_id(0), _num_segment(0), _num_rows_written(0), @@ -78,33 +79,20 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext& rowset_writer_context Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer) { auto partition_id = _context.partition_id; - auto sender_id = _context.sender_id; auto index_id = _context.index_id; auto tablet_id = _context.tablet_id; auto load_id = _context.load_id; - auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(sender_id, _streams); + auto stream_writer = std::make_unique<io::StreamSinkFileWriter>(_streams); stream_writer->init(load_id, partition_id, index_id, tablet_id, segment_id); file_writer = std::move(stream_writer); return Status::OK(); } Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, SegmentStatistics& segstat) { - butil::IOBuf buf; - PStreamHeader header; - header.set_src_id(_context.sender_id); - *header.mutable_load_id() = _context.load_id; - header.set_partition_id(_context.partition_id); - header.set_index_id(_context.index_id); - header.set_tablet_id(_context.tablet_id); - header.set_segment_id(segment_id); - header.set_opcode(doris::PStreamHeader::ADD_SEGMENT); - segstat.to_pb(header.mutable_segment_statistics()); - size_t header_len = header.ByteSizeLong(); - buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len)); - buf.append(header.SerializeAsString()); for (const auto& stream : _streams) { - io::StreamSinkFileWriter::send_with_retry(stream, buf); + RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id, + _context.tablet_id, segment_id, segstat)); } return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index 919c128607..a982272217 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -60,9 +60,11 @@ namespace vectorized::schema_util { class LocalSchemaChangeRecorder; } +class LoadStreamStub; + class BetaRowsetWriterV2 : public RowsetWriter { public: - BetaRowsetWriterV2(const std::vector<brpc::StreamId>& streams); + BetaRowsetWriterV2(const std::vector<std::shared_ptr<LoadStreamStub>>& streams); ~BetaRowsetWriterV2() override; @@ -157,7 +159,7 @@ private: fmt::memory_buffer vlog_buffer; - std::vector<brpc::StreamId> _streams; + std::vector<std::shared_ptr<LoadStreamStub>> _streams; int64_t _delete_bitmap_ns = 0; int64_t _segment_writer_ns = 0; diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index b8bfd1225c..cb78a1233a 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -45,7 +45,6 @@ struct RowsetWriterContext { rowset_type(BETA_ROWSET), rowset_state(PREPARED), version(Version(0, 0)), - sender_id(0), txn_id(0), tablet_uid(0, 0), segments_overlap(OVERLAP_UNKNOWN) { @@ -68,8 +67,6 @@ struct RowsetWriterContext { // properties for non-pending rowset Version version; - int sender_id; - // properties for pending rowset int64_t txn_id; PUniqueId load_id; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp new file mode 100644 index 0000000000..1fbc8228b2 --- /dev/null +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/load_stream_stub.h" + +#include "olap/rowset/rowset_writer.h" +#include "util/brpc_client_cache.h" +#include "util/network_util.h" +#include "util/thrift_util.h" + +namespace doris { + +int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, + butil::IOBuf* const messages[], + size_t size) { + for (size_t i = 0; i < size; i++) { + butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); + PWriteStreamSinkResponse response; + response.ParseFromZeroCopyStream(&wrapper); + + Status st = Status::create(response.status()); + + std::stringstream ss; + ss << "received response from backend " << _stub->_dst_id; + if (response.success_tablet_ids_size() > 0) { + ss << ", success tablet ids:"; + for (auto tablet_id : response.success_tablet_ids()) { + ss << " " << tablet_id; + } + std::lock_guard<bthread::Mutex> lock(_stub->_success_tablets_mutex); + for (auto tablet_id : response.success_tablet_ids()) { + _stub->_success_tablets.push_back(tablet_id); + } + } + if (response.failed_tablet_ids_size() > 0) { + ss << ", failed tablet ids:"; + for (auto tablet_id : response.failed_tablet_ids()) { + ss << " " << tablet_id; + } + std::lock_guard<bthread::Mutex> lock(_stub->_failed_tablets_mutex); + for (auto tablet_id : response.failed_tablet_ids()) { + _stub->_failed_tablets.push_back(tablet_id); + } + } + ss << ", status: " << st; + LOG(INFO) << ss.str(); + + if (response.has_load_stream_profile()) { + TRuntimeProfileTree tprofile; + const uint8_t* buf = + reinterpret_cast<const uint8_t*>(response.load_stream_profile().data()); + uint32_t len = response.load_stream_profile().size(); + auto status = deserialize_thrift_msg(buf, &len, false, &tprofile); + if (status.ok()) { + // TODO + //_sink->_state->load_channel_profile()->update(tprofile); + } else { + LOG(WARNING) << "load stream TRuntimeProfileTree deserialize failed, errmsg=" + << status; + } + } + } + return 0; +} + +void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) { + std::lock_guard<bthread::Mutex> lock(_stub->_mutex); + _stub->_is_closed = true; + _stub->_close_cv.notify_all(); +} + +LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id) + : _load_id(load_id), + _src_id(src_id), + _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()), + _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {}; + +LoadStreamStub::LoadStreamStub(LoadStreamStub& stub) + : _load_id(stub._load_id), + _src_id(stub._src_id), + _tablet_schema_for_index(stub._tablet_schema_for_index), + _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {}; + +LoadStreamStub::~LoadStreamStub() { + std::unique_lock<bthread::Mutex> lock(_mutex); + if (_is_init && !_is_closed) { + brpc::StreamClose(_stream_id); + } +} + +// open_stream_sink +// tablets means +Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, + const NodeInfo& node_info, int64_t txn_id, + const OlapTableSchemaParam& schema, + const std::vector<PTabletID>& tablets_for_schema, bool enable_profile) { + std::unique_lock<bthread::Mutex> lock(_mutex); + if (_is_init) { + return Status::OK(); + } + _dst_id = node_info.id; + std::string host_port = get_host_port(node_info.host, node_info.brpc_port); + brpc::StreamOptions opt; + opt.max_buf_size = 20 << 20; // 20MB + opt.idle_timeout_ms = 30000; + opt.messages_in_batch = 128; + opt.handler = &_handler; + brpc::Controller cntl; + if (int ret = StreamCreate(&_stream_id, cntl, &opt)) { + return Status::Error<true>(ret, "Failed to create stream"); + } + cntl.set_timeout_ms(config::open_stream_sink_timeout_ms); + POpenStreamSinkRequest request; + *request.mutable_load_id() = _load_id; + request.set_src_id(_src_id); + request.set_txn_id(txn_id); + request.set_enable_profile(enable_profile); + schema.to_protobuf(request.mutable_schema()); + for (auto& tablet : tablets_for_schema) { + *request.add_tablets() = tablet; + } + POpenStreamSinkResponse response; + // use "pooled" connection to avoid conflicts between streaming rpc and regular rpc, + // see: https://github.com/apache/brpc/issues/392 + const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "pooled"); + stub->open_stream_sink(&cntl, &request, &response, nullptr); + for (const auto& resp : response.tablet_schemas()) { + auto tablet_schema = std::make_unique<TabletSchema>(); + tablet_schema->init_from_pb(resp.tablet_schema()); + _tablet_schema_for_index->emplace(resp.index_id(), std::move(tablet_schema)); + _enable_unique_mow_for_index->emplace(resp.index_id(), + resp.enable_unique_key_merge_on_write()); + } + if (cntl.Failed()) { + return Status::InternalError("Failed to connect to backend {}: {}", _dst_id, + cntl.ErrorText()); + } + LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id << " (" << host_port + << ")"; + _is_init = true; + return Status::OK(); +} + +// APPEND_DATA +Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t segment_id, std::span<const Slice> data, + bool segment_eos) { + PStreamHeader header; + header.set_src_id(_src_id); + *header.mutable_load_id() = _load_id; + header.set_partition_id(partition_id); + header.set_index_id(index_id); + header.set_tablet_id(tablet_id); + header.set_segment_id(segment_id); + header.set_segment_eos(segment_eos); + header.set_opcode(doris::PStreamHeader::APPEND_DATA); + return _encode_and_send(header, data); +} + +// ADD_SEGMENT +Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t segment_id, SegmentStatistics& segment_stat) { + PStreamHeader header; + header.set_src_id(_src_id); + *header.mutable_load_id() = _load_id; + header.set_partition_id(partition_id); + header.set_index_id(index_id); + header.set_tablet_id(tablet_id); + header.set_segment_id(segment_id); + header.set_opcode(doris::PStreamHeader::ADD_SEGMENT); + segment_stat.to_pb(header.mutable_segment_statistics()); + return _encode_and_send(header); +} + +// CLOSE_LOAD +Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) { + PStreamHeader header; + *header.mutable_load_id() = _load_id; + header.set_src_id(_src_id); + header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); + for (const auto& tablet : tablets_to_commit) { + *header.add_tablets_to_commit() = tablet; + } + return _encode_and_send(header); +} + +Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) { + butil::IOBuf buf; + size_t header_len = header.ByteSizeLong(); + buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len)); + buf.append(header.SerializeAsString()); + for (const auto& slice : data) { + buf.append(slice.get_data(), slice.get_size()); + } + return _send_with_retry(buf); +} + +Status LoadStreamStub::_send_with_retry(butil::IOBuf buf) { + for (;;) { + int ret = brpc::StreamWrite(_stream_id, buf); + switch (ret) { + case 0: + return Status::OK(); + case EAGAIN: { + const timespec time = butil::seconds_from_now(60); + int wait_ret = brpc::StreamWait(_stream_id, &time); + if (wait_ret != 0) { + return Status::InternalError("StreamWait failed, err = ", wait_ret); + } + break; + } + default: + return Status::InternalError("StreamWrite failed, err = {}", ret); + } + } +} + +} // namespace doris diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h new file mode 100644 index 0000000000..268de6ca83 --- /dev/null +++ b/be/src/vec/sink/load_stream_stub.h @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <brpc/controller.h> +#include <bthread/types.h> +#include <butil/errno.h> +#include <fmt/format.h> +#include <gen_cpp/PaloInternalService_types.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/internal_service.pb.h> +#include <gen_cpp/types.pb.h> +#include <glog/logging.h> +#include <google/protobuf/stubs/callback.h> +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <functional> +#include <initializer_list> +#include <map> +#include <memory> +#include <mutex> +#include <ostream> +#include <queue> +#include <set> +#include <span> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "common/config.h" +#include "common/status.h" +#include "exec/data_sink.h" +#include "exec/tablet_info.h" +#include "gutil/ref_counted.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "runtime/thread_context.h" +#include "runtime/types.h" +#include "util/countdown_latch.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" +#include "vec/columns/column.h" +#include "vec/common/allocator.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" + +namespace doris { +class TabletSchema; +class LoadStreamStub; + +struct SegmentStatistics; + +using IndexToTabletSchema = std::unordered_map<int64_t, std::shared_ptr<TabletSchema>>; +using IndexToEnableMoW = std::unordered_map<int64_t, bool>; + +class LoadStreamStub { +private: + class LoadStreamReplyHandler : public brpc::StreamInputHandler { + public: + LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {} + + int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], + size_t size) override; + + void on_idle_timeout(brpc::StreamId id) override {} + + void on_closed(brpc::StreamId id) override; + + private: + LoadStreamStub* _stub; + }; + +public: + // construct new stub + LoadStreamStub(PUniqueId load_id, int64_t src_id); + + // copy constructor, shared_ptr members are shared + LoadStreamStub(LoadStreamStub& stub); + +// for mock this class in UT +#ifdef BE_TEST + virtual +#endif + ~LoadStreamStub(); + + // open_stream_sink + Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info, + int64_t txn_id, const OlapTableSchemaParam& schema, + const std::vector<PTabletID>& tablets_for_schema, bool enable_profile); + +// for mock this class in UT +#ifdef BE_TEST + virtual +#endif + // APPEND_DATA + Status + append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t segment_id, std::span<const Slice> data, bool segment_eos = false); + + // ADD_SEGMENT + Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t segment_id, SegmentStatistics& segment_stat); + + // CLOSE_LOAD + Status close_load(const std::vector<PTabletID>& tablets_to_commit); + + // wait remote to close stream, + // remote will close stream when it receives CLOSE_LOAD + Status close_wait(int64_t timeout_ms = 0) { + std::unique_lock<bthread::Mutex> lock(_mutex); + if (!_is_init || _is_closed) { + return Status::OK(); + } + if (timeout_ms > 0) { + int ret = _close_cv.wait_for(lock, timeout_ms * 1000); + return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream close_wait timeout"); + } + _close_cv.wait(lock); + return Status::OK(); + } + + std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const { + return _tablet_schema_for_index->at(index_id); + } + + bool enable_unique_mow(int64_t index_id) const { + return _enable_unique_mow_for_index->at(index_id); + } + + std::vector<int64_t> success_tablets() { + std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); + return _success_tablets; + } + + std::vector<int64_t> failed_tablets() { + std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); + return _failed_tablets; + } + + brpc::StreamId stream_id() const { return _stream_id; } + + int64_t src_id() const { return _src_id; } + + int64_t dst_id() const { return _dst_id; } + +private: + Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {}); + Status _send_with_retry(butil::IOBuf buf); + +protected: + bool _is_init = false; + bool _is_closed = false; + bthread::Mutex _mutex; + bthread::ConditionVariable _close_cv; + + PUniqueId _load_id; + brpc::StreamId _stream_id; + int64_t _src_id = -1; // source backend_id + int64_t _dst_id = -1; // destination backend_id + LoadStreamReplyHandler _handler {this}; + + bthread::Mutex _success_tablets_mutex; + bthread::Mutex _failed_tablets_mutex; + std::vector<int64_t> _success_tablets; + std::vector<int64_t> _failed_tablets; + + std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index; + std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index; +}; + +} // namespace doris diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 243507db71..eb66eac664 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -39,7 +39,6 @@ #include "common/object_pool.h" #include "common/status.h" #include "exec/tablet_info.h" -#include "io/fs/stream_sink_file_writer.h" #include "olap/delta_writer_v2.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -55,6 +54,7 @@ #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/exprs/vexpr.h" +#include "vec/sink/load_stream_stub.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -63,76 +63,6 @@ class TExpr; namespace stream_load { -int StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], - size_t size) { - int64_t backend_id = _sink->_node_id_for_stream->at(id); - - for (size_t i = 0; i < size; i++) { - butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); - PWriteStreamSinkResponse response; - response.ParseFromZeroCopyStream(&wrapper); - - Status st = Status::create(response.status()); - - std::stringstream ss; - ss << "received response from backend " << backend_id << ", status: " << st - << ", success tablet ids:"; - for (auto tablet_id : response.success_tablet_ids()) { - ss << " " << tablet_id; - } - ss << ", failed tablet ids:"; - for (auto tablet_id : response.failed_tablet_ids()) { - ss << " " << tablet_id; - } - LOG(INFO) << ss.str(); - - int replica = _sink->_num_replicas; - - { - std::lock_guard<bthread::Mutex> l(_sink->_tablet_success_map_mutex); - for (auto tablet_id : response.success_tablet_ids()) { - if (_sink->_tablet_success_map.count(tablet_id) == 0) { - _sink->_tablet_success_map.insert({tablet_id, {}}); - } - _sink->_tablet_success_map[tablet_id].push_back(backend_id); - } - } - { - std::lock_guard<bthread::Mutex> l(_sink->_tablet_failure_map_mutex); - for (auto tablet_id : response.failed_tablet_ids()) { - if (_sink->_tablet_failure_map.count(tablet_id) == 0) { - _sink->_tablet_failure_map.insert({tablet_id, {}}); - } - _sink->_tablet_failure_map[tablet_id].push_back(backend_id); - if (_sink->_tablet_failure_map[tablet_id].size() * 2 >= replica) { - _sink->_cancel(Status::Cancelled( - "Failed to meet num replicas requirements for tablet {}", tablet_id)); - break; - } - } - } - - if (response.has_load_stream_profile()) { - TRuntimeProfileTree tprofile; - const uint8_t* buf = - reinterpret_cast<const uint8_t*>(response.load_stream_profile().data()); - uint32_t len = response.load_stream_profile().size(); - auto status = deserialize_thrift_msg(buf, &len, false, &tprofile); - if (status.ok()) { - _sink->_state->load_channel_profile()->update(tprofile); - } else { - LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" - << status; - } - } - } - return 0; -} - -void StreamSinkHandler::on_closed(brpc::StreamId id) { - _sink->_pending_streams.fetch_add(-1); -} - VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs, Status* status) : DataSink(row_desc), _pool(pool) { @@ -210,7 +140,6 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) { _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", "CloseWaitTime"); _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", "CloseWaitTime"); - _close_stream_timer = ADD_CHILD_TIMER(_profile, "CloseStreamTime", "CloseWaitTime"); // Prepare the exprs to run. RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); @@ -224,8 +153,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - _stream_pool_for_node = std::make_shared<StreamPoolForNode>(); - _node_id_for_stream = std::make_shared<NodeIdForStream>(); + _stream_pool_for_node = std::make_shared<NodeToStreams>(); _delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>(); _build_tablet_node_mapping(); RETURN_IF_ERROR(_init_stream_pools()); @@ -234,71 +162,32 @@ Status VOlapTableSinkV2::open(RuntimeState* state) { } Status VOlapTableSinkV2::_init_stream_pools() { + // stub template is for sharing internal schema map among all stubs + LoadStreamStub stub_template {_load_id, _sender_id}; for (auto& [node_id, _] : _tablets_for_node) { auto node_info = _nodes_info->find_node(node_id); if (node_info == nullptr) { return Status::InternalError("Unknown node {} in tablet location", node_id); } - _stream_pool_for_node->insert({node_id, StreamPool {}}); - StreamPool& stream_pool = _stream_pool_for_node->at(node_id); - RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool)); - for (auto stream : stream_pool) { - _node_id_for_stream->insert({stream, node_id}); - } + Streams& stream_pool = (*_stream_pool_for_node)[node_id]; + RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool, stub_template)); } return Status::OK(); } -Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, StreamPool& stream_pool) { - DCHECK_GT(config::num_streams_per_sink, 0); +Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, Streams& stream_pool, + LoadStreamStub& stub_template) { stream_pool.reserve(config::num_streams_per_sink); for (int i = 0; i < config::num_streams_per_sink; ++i) { - brpc::StreamOptions opt; - opt.max_buf_size = 20 << 20; // 20MB - opt.idle_timeout_ms = 30000; - opt.messages_in_batch = 128; - opt.handler = new StreamSinkHandler(this); - brpc::StreamId stream; - brpc::Controller cntl; - if (int ret = StreamCreate(&stream, cntl, &opt)) { - return Status::RpcError("Failed to create stream, code = {}", ret); - } - LOG(INFO) << "Created stream " << stream << " for backend " << node_info.id << " (" - << node_info.host << ":" << node_info.brpc_port << ")"; - std::string host_port = get_host_port(node_info.host, node_info.brpc_port); - // use "pooled" connection to avoid conflicts between streaming rpc and regular rpc, - // see: https://github.com/apache/brpc/issues/392 - const auto& stub = - _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache( - host_port, "baidu_std", "pooled"); - POpenStreamSinkRequest request; - *request.mutable_load_id() = _load_id; - request.set_src_id(_sender_id); - request.set_txn_id(_txn_id); - request.set_enable_profile(_state->enable_profile()); - _schema->to_protobuf(request.mutable_schema()); - if (i == 0) { - // get tablet schema from each backend only in the 1st stream - for (auto& tablet : _indexes_from_node[node_info.id]) { - auto req = request.add_tablets(); - *req = tablet; - } - } - POpenStreamSinkResponse response; - cntl.set_timeout_ms(config::open_stream_sink_timeout_ms); - stub->open_stream_sink(&cntl, &request, &response, nullptr); - for (const auto& resp : response.tablet_schemas()) { - auto tablet_schema = std::make_shared<TabletSchema>(); - tablet_schema->init_from_pb(resp.tablet_schema()); - _tablet_schema_for_index[resp.index_id()] = tablet_schema; - _enable_unique_mow_for_index[resp.index_id()] = resp.enable_unique_key_merge_on_write(); - } - if (cntl.Failed()) { - return Status::InternalError("Failed to connect to backend {}: {}", node_info.id, - cntl.ErrorText()); - } - stream_pool.push_back(stream); - _pending_streams.fetch_add(1); + // internal tablet schema map will be shared among all stubs + auto stream = std::make_unique<LoadStreamStub>(stub_template); + // get tablet schema from each backend only in the 1st stream + const std::vector<PTabletID>& tablets_for_schema = + i == 0 ? _indexes_from_node[node_info.id] : std::vector<PTabletID> {}; + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), node_info, + _txn_id, *_schema, tablets_for_schema, + _state->enable_profile())); + stream_pool.emplace_back(std::move(stream)); } return Status::OK(); } @@ -343,13 +232,13 @@ void VOlapTableSinkV2::_generate_rows_for_tablet(RowsForTablet& rows_for_tablet, } } -Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, std::vector<brpc::StreamId>& streams) { +Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) { auto location = _location->find_tablet(tablet_id); if (location == nullptr) { return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); } for (auto& node_id : location->node_ids) { - streams.push_back(_stream_pool_for_node->at(node_id)[_stream_index]); + streams.emplace_back(_stream_pool_for_node->at(node_id)[_stream_index]); } _stream_index = (_stream_index + 1) % config::num_streams_per_sink; return Status::OK(); @@ -359,8 +248,6 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); - LOG(INFO) << "upstream id = " << state->backend_id(); - auto input_rows = input_block->rows(); auto input_bytes = input_block->bytes(); if (UNLIKELY(input_rows == 0)) { @@ -408,7 +295,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc // For each tablet, send its input_rows from block to delta writer for (const auto& [tablet_id, rows] : rows_for_tablet) { - std::vector<brpc::StreamId> streams; + Streams streams; RETURN_IF_ERROR(_select_streams(tablet_id, streams)); RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams)); } @@ -418,7 +305,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, const Rows& rows, - const std::vector<brpc::StreamId>& streams) { + const Streams& streams) { DeltaWriterV2* delta_writer = nullptr; { auto it = _delta_writer_for_tablet->find(tablet_id); @@ -434,10 +321,6 @@ Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc req.tuple_desc = _output_tuple_desc; req.is_high_priority = _is_high_priority; req.table_schema_param = _schema.get(); - req.tablet_schema = _tablet_schema_for_index[rows.index_id]; - req.enable_unique_key_merge_on_write = _enable_unique_mow_for_index[rows.index_id]; - req.sender_id = _sender_id; - req.streams = streams; for (auto& index : _schema->indexes()) { if (index->index_id == rows.index_id) { req.slots = &index->slots; @@ -445,9 +328,8 @@ Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc break; } } - DeltaWriterV2::open(&req, &delta_writer, _profile); - _delta_writer_for_tablet->insert( - {tablet_id, std::unique_ptr<DeltaWriterV2>(delta_writer)}); + DeltaWriterV2::open(&req, streams, &delta_writer, _profile); + _delta_writer_for_tablet->emplace(tablet_id, delta_writer); } else { VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " << tablet_id << ", index id: " << rows.index_id << ")"; @@ -472,11 +354,6 @@ Status VOlapTableSinkV2::_cancel(Status status) { [&status](auto&& entry) { entry.second->cancel_with_status(status); }); } _delta_writer_for_tablet.reset(); - if (_stream_pool_for_node.use_count() == 1) { - std::for_each(std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream), - [](auto&& entry) { brpc::StreamClose(entry.first); }); - } - _stream_pool_for_node.reset(); return Status::OK(); } @@ -515,43 +392,35 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { { // send CLOSE_LOAD to all streams, return ERROR if any - RETURN_IF_ERROR(std::transform_reduce( - std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream), Status::OK(), - [](Status& left, Status&& right) { return left.ok() ? right : left; }, - [this](auto&& entry) { return _close_load(entry.first); })); - } - - { - SCOPED_TIMER(_close_load_timer); - while (_pending_streams.load() > 0) { - // TODO: use a better wait - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - LOG(INFO) << "sinkv2 close_wait, pending streams: " << _pending_streams.load(); + for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) { + RETURN_IF_ERROR(_close_load(stream_pool)); } } { - SCOPED_TIMER(_close_stream_timer); - // close streams - if (_stream_pool_for_node.use_count() == 1) { - std::for_each(std::begin(*_node_id_for_stream), std::end(*_node_id_for_stream), - [](auto&& entry) { brpc::StreamClose(entry.first); }); + SCOPED_TIMER(_close_load_timer); + for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) { + for (const auto& stream : stream_pool) { + stream->close_wait(); + } } - _stream_pool_for_node.reset(); } std::vector<TTabletCommitInfo> tablet_commit_infos; - for (auto& [tablet_id, backends] : _tablet_success_map) { - for (int64_t be_id : backends) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = be_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); + for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) { + for (const auto& stream : stream_pool) { + for (auto tablet_id : stream->success_tablets()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = node_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); + } } } state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), std::make_move_iterator(tablet_commit_infos.begin()), std::make_move_iterator(tablet_commit_infos.end())); + _stream_pool_for_node.reset(); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + @@ -573,22 +442,17 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { return status; } -Status VOlapTableSinkV2::_close_load(brpc::StreamId stream) { - butil::IOBuf buf; - PStreamHeader header; - *header.mutable_load_id() = _load_id; - header.set_src_id(_sender_id); - header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); - auto node_id = _node_id_for_stream.get()->at(stream); +Status VOlapTableSinkV2::_close_load(const Streams& streams) { + auto node_id = streams[0]->dst_id(); + std::vector<PTabletID> tablets_to_commit; for (auto tablet : _tablets_for_node[node_id]) { if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { - *header.add_tablets_to_commit() = tablet; + tablets_to_commit.push_back(tablet); } } - size_t header_len = header.ByteSizeLong(); - buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len)); - buf.append(header.SerializeAsString()); - io::StreamSinkFileWriter::send_with_retry(stream, buf); + for (const auto& stream : streams) { + RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + } return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index c2c24a26fb..5f50463268 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -66,6 +66,7 @@ namespace doris { class DeltaWriterV2; +class LoadStreamStub; class ObjectPool; class RowDescriptor; class RuntimeState; @@ -81,8 +82,8 @@ class OlapTabletFinder; class VOlapTableSinkV2; using DeltaWriterForTablet = std::unordered_map<int64_t, std::unique_ptr<DeltaWriterV2>>; -using StreamPool = std::vector<brpc::StreamId>; -using StreamPoolForNode = std::unordered_map<int64_t, StreamPool>; +using Streams = std::vector<std::shared_ptr<LoadStreamStub>>; +using NodeToStreams = std::unordered_map<int64_t, Streams>; using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>; using NodePartitionTabletMapping = std::unordered_map<int64_t, std::unordered_map<int64_t, std::unordered_set<int64_t>>>; @@ -135,7 +136,8 @@ public: RuntimeProfile* profile() override { return _profile; } private: - Status _init_stream_pool(const NodeInfo& node_info, StreamPool& stream_pool); + Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool, + LoadStreamStub& stub_template); Status _init_stream_pools(); @@ -146,11 +148,11 @@ private: int row_idx); Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id, - const Rows& rows, const std::vector<brpc::StreamId>& streams); + const Rows& rows, const Streams& streams); - Status _select_streams(int64_t tablet_id, std::vector<brpc::StreamId>& streams); + Status _select_streams(int64_t tablet_id, Streams& streams); - Status _close_load(brpc::StreamId stream); + Status _close_load(const Streams& streams); Status _cancel(Status status); @@ -176,8 +178,6 @@ private: // TODO(zc): think about cache this data std::shared_ptr<OlapTableSchemaParam> _schema; - std::unordered_map<int64_t, std::shared_ptr<TabletSchema>> _tablet_schema_for_index; - std::unordered_map<int64_t, bool> _enable_unique_mow_for_index; OlapTableLocationParam* _location = nullptr; DorisNodesInfo* _nodes_info = nullptr; @@ -206,7 +206,6 @@ private: RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _close_writer_timer = nullptr; RuntimeProfile::Counter* _close_load_timer = nullptr; - RuntimeProfile::Counter* _close_stream_timer = nullptr; // Save the status of close() method Status _close_status; @@ -221,8 +220,7 @@ private: std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node; std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node; - std::shared_ptr<StreamPoolForNode> _stream_pool_for_node; - std::shared_ptr<NodeIdForStream> _node_id_for_stream; + std::shared_ptr<NodeToStreams> _stream_pool_for_node; size_t _stream_index = 0; std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet; diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index e894cbaf91..c6ac4a3f50 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -24,6 +24,7 @@ #include "olap/olap_common.h" #include "util/debug/leakcheck_disabler.h" #include "util/faststring.h" +#include "vec/sink/load_stream_stub.h" namespace doris { @@ -35,134 +36,74 @@ namespace doris { } while (false) #endif -DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); -DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); -DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); -DEFINE_int32(idle_timeout_s, -1, - "Connection will be closed if there is no " - "read/write operations during the last `idle_timeout_s'"); +constexpr int64_t LOAD_ID_LO = 1; +constexpr int64_t LOAD_ID_HI = 2; +constexpr int64_t NUM_STREAM = 3; +constexpr int64_t PARTITION_ID = 1234; +constexpr int64_t INDEX_ID = 2345; +constexpr int64_t TABLET_ID = 3456; +constexpr int32_t SEGMENT_ID = 4567; +const std::string DATA0 = "segment data"; +const std::string DATA1 = "hello world"; + +static std::atomic<int64_t> g_num_request; class StreamSinkFileWriterTest : public testing::Test { - class MockStreamSinkFileRecevier : public brpc::StreamInputHandler { + class MockStreamStub : public LoadStreamStub { public: - virtual int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], - size_t size) { - std::stringstream str; - for (size_t i = 0; i < size; ++i) { - str << "msg[" << i << "]=" << *messages[i]; + MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id) {}; + + virtual ~MockStreamStub() = default; + + // APPEND_DATA + virtual Status append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t segment_id, std::span<const Slice> data, + bool segment_eos = false) override { + EXPECT_EQ(PARTITION_ID, partition_id); + EXPECT_EQ(INDEX_ID, index_id); + EXPECT_EQ(TABLET_ID, tablet_id); + EXPECT_EQ(SEGMENT_ID, segment_id); + if (segment_eos) { + EXPECT_EQ(0, data.size()); + } else { + EXPECT_EQ(2, data.size()); + EXPECT_EQ(DATA0, data[0].to_string()); + EXPECT_EQ(DATA1, data[1].to_string()); } - LOG(INFO) << "Received from Stream=" << id << ": " << str.str(); - return 0; - } - virtual void on_idle_timeout(brpc::StreamId id) { - LOG(INFO) << "Stream=" << id << " has no data transmission for a while"; - } - virtual void on_closed(brpc::StreamId id) { LOG(INFO) << "Stream=" << id << " is closed"; } - }; - - class StreamingSinkFileService : public PBackendService { - public: - StreamingSinkFileService() : _sd(brpc::INVALID_STREAM_ID) {} - virtual ~StreamingSinkFileService() { brpc::StreamClose(_sd); }; - virtual void open_stream_sink(google::protobuf::RpcController* controller, - const POpenStreamSinkRequest*, - POpenStreamSinkResponse* response, - google::protobuf::Closure* done) { - brpc::ClosureGuard done_guard(done); - - brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - brpc::StreamOptions stream_options; - stream_options.handler = &_receiver; - CHECK_EQ(0, brpc::StreamAccept(&_sd, *cntl, &stream_options)); - Status::OK().to_protobuf(response->mutable_status()); + g_num_request++; + return Status::OK(); } - - private: - MockStreamSinkFileRecevier _receiver; - brpc::StreamId _sd; }; public: - StreamSinkFileWriterTest() { srand(time(nullptr)); } - ~StreamSinkFileWriterTest() {} + StreamSinkFileWriterTest() = default; + ~StreamSinkFileWriterTest() = default; protected: virtual void SetUp() { - // init channel - brpc::Channel channel; - brpc::ChannelOptions options; - options.protocol = brpc::PROTOCOL_BAIDU_STD; - options.connection_type = FLAGS_connection_type; - options.timeout_ms = FLAGS_timeout_ms; - options.max_retry = FLAGS_max_retry; - std::stringstream port; - CHECK_EQ(0, channel.Init("127.0.0.1:18946", nullptr)); - - // init server - _stream_service = new StreamingSinkFileService(); - CHECK_EQ(0, _server.AddService(_stream_service, brpc::SERVER_DOESNT_OWN_SERVICE)); - brpc::ServerOptions server_options; - server_options.idle_timeout_sec = FLAGS_idle_timeout_s; - { - debug::ScopedLeakCheckDisabler disable_lsan; - CHECK_EQ(0, _server.Start("127.0.0.1:18946", &server_options)); + _load_id.set_hi(LOAD_ID_HI); + _load_id.set_lo(LOAD_ID_LO); + for (int src_id = 0; src_id < NUM_STREAM; src_id++) { + _streams.emplace_back(new MockStreamStub(_load_id, src_id)); } - - // init stream connect - PBackendService_Stub stub(&channel); - brpc::Controller cntl; - brpc::StreamId stream; - CHECK_EQ(0, brpc::StreamCreate(&stream, cntl, NULL)); - - POpenStreamSinkRequest request; - POpenStreamSinkResponse response; - request.mutable_load_id()->set_hi(1); - request.mutable_load_id()->set_lo(1); - stub.open_stream_sink(&cntl, &request, &response, NULL); - - brpc::Join(cntl.call_id()); - _stream = stream; } - virtual void TearDown() { - CHECK_EQ(0, brpc::StreamClose(_stream)); - CHECK_EQ(0, _server.Stop(1000)); - CHECK_EQ(0, _server.Join()); - delete _stream_service; - } + virtual void TearDown() {} - StreamingSinkFileService* _stream_service; - brpc::StreamId _stream; - brpc::Server _server; + PUniqueId _load_id; + std::vector<std::shared_ptr<LoadStreamStub>> _streams; }; -TEST_F(StreamSinkFileWriterTest, TestInit) { - std::vector<brpc::StreamId> stream_ids {_stream}; - io::StreamSinkFileWriter writer(0, stream_ids); - PUniqueId load_id; - load_id.set_hi(1); - load_id.set_lo(2); - writer.init(load_id, 3, 4, 5, 6); -} +TEST_F(StreamSinkFileWriterTest, Test) { + g_num_request = 0; + io::StreamSinkFileWriter writer(_streams); + writer.init(_load_id, PARTITION_ID, INDEX_ID, TABLET_ID, SEGMENT_ID); + std::vector<Slice> slices {DATA0, DATA1}; -TEST_F(StreamSinkFileWriterTest, TestAppend) { - std::vector<brpc::StreamId> stream_ids {_stream}; - io::StreamSinkFileWriter writer(0, stream_ids); - PUniqueId load_id; - load_id.set_hi(1); - load_id.set_lo(2); - writer.init(load_id, 3, 4, 5, 6); - std::vector<Slice> slices {"hello"}; - CHECK_STATUS_OK(writer.appendv(&slices[0], slices.size())); -} - -TEST_F(StreamSinkFileWriterTest, TestFinalize) { - std::vector<brpc::StreamId> stream_ids {_stream}; - io::StreamSinkFileWriter writer(0, stream_ids); - PUniqueId load_id; - load_id.set_hi(1); - load_id.set_lo(2); - writer.init(load_id, 3, 4, 5, 6); + CHECK_STATUS_OK(writer.appendv(&(*slices.begin()), slices.size())); + EXPECT_EQ(NUM_STREAM, g_num_request); CHECK_STATUS_OK(writer.finalize()); + EXPECT_EQ(NUM_STREAM * 2, g_num_request); } + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org