github-actions[bot] commented on code in PR #25796: URL: https://github.com/apache/doris/pull/25796#discussion_r1389469095
########## be/src/vec/sink/load_stream_stub.cpp: ########## @@ -190,19 +203,89 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) { - if (--_num_open > 0) { + { + std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex); + _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), + tablets_to_commit.end()); + } + const int num_use = --_use_cnt; + LOG(INFO) << "stream " << _stream_id << " close, load_id " << _load_id << " use cnt " << num_use; + if (num_use > 0) { return Status::OK(); } + if (num_use < 0) { + LOG(WARNING) << "stream " << _stream_id << "already closed, load_id " << _load_id; + return Status::InternalError("stream {} already closed", _stream_id); + } + + DCHECK(num_use == 0); + PStreamHeader header; + *header.mutable_load_id() = _load_id; + header.set_src_id(_src_id); + header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); + LOG(INFO) << "send close load on stream " << _stream_id << " load_id " << _load_id; + { + std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex); + for (const auto& tablet : _tablets_to_commit) { + *header.add_tablets() = tablet; + } + } + return _encode_and_send(header); +} + +// GET_SCHEMA +Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) { PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); - for (const auto& tablet : tablets_to_commit) { - *header.add_tablets_to_commit() = tablet; + std::ostringstream oss; + oss << "fetching tablet schema from stream " << _stream_id << ", load id: " << _load_id + << ", tablet id:"; + for (const auto& tablet : tablets) { + *header.add_tablets() = tablet; + oss << " " << tablet.tablet_id(); } + LOG(INFO) << oss.str(); return _encode_and_send(header); } +void LoadStreamStub::add_schema(const std::vector<PTabletSchemaWithIndex>& schemas) { + std::lock_guard<bthread::Mutex> lock(_mutex); + for (const auto& schema : schemas) { + auto tablet_schema = std::make_unique<TabletSchema>(); + tablet_schema->init_from_pb(schema.tablet_schema()); + _tablet_schema_for_index->emplace(schema.index_id(), std::move(tablet_schema)); + _enable_unique_mow_for_index->emplace(schema.index_id(), + schema.enable_unique_key_merge_on_write()); + } + _schema_cv.notify_all(); +} + +Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t timeout_ms) { + if (_tablet_schema_for_index->contains(index_id)) { + return Status::OK(); + } + PTabletID tablet; + tablet.set_partition_id(partition_id); + tablet.set_index_id(index_id); + tablet.set_tablet_id(tablet_id); + RETURN_IF_ERROR(get_schema({tablet})); + + MonotonicStopWatch watch; + watch.start(); + while (!_tablet_schema_for_index->contains(index_id) && + watch.elapsed_time() / 1000 / 1000 < timeout_ms) { + static_cast<void>(wait_for_new_schema(100)); Review Comment: warning: 100 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp static_cast<void>(wait_for_new_schema(100)); ^ ``` ########## be/src/runtime/load_stream.cpp: ########## @@ -360,6 +360,32 @@ void LoadStream::_report_result(StreamId stream, const Status& st, } } +void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { Review Comment: warning: method '_report_schema' can be made static [readability-convert-member-functions-to-static] ```suggestion static void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) { ``` ########## be/src/vec/sink/load_stream_stub.cpp: ########## @@ -215,21 +298,22 @@ buf.append(slice.get_data(), slice.get_size()); } bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD; - return _send_with_buffer(buf, eos); + bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA; + return _send_with_buffer(buf, eos || get_schema); } -Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool eos) { +Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { Review Comment: warning: method '_send_with_buffer' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) { ``` ########## be/src/vec/sink/load_stream_stub.h: ########## @@ -176,6 +185,22 @@ class LoadStreamStub { return _handler.close_wait(timeout_ms); } + Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t timeout_ms = 60000); Review Comment: warning: 60000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp int64_t timeout_ms = 60000); ^ ``` ########## be/src/vec/sink/load_stream_stub.h: ########## @@ -176,6 +185,22 @@ return _handler.close_wait(timeout_ms); } + Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, + int64_t timeout_ms = 60000); + + Status wait_for_new_schema(int64_t timeout_ms) { Review Comment: warning: method 'wait_for_new_schema' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status wait_for_new_schema(int64_t timeout_ms) { ``` ########## be/src/vec/sink/vtablet_sink_v2.cpp: ########## @@ -91,7 +92,45 @@ static Status on_partitions_created(void* writer, TCreatePartitionResult* result return static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result); } -void VOlapTableSinkV2::_init_row_distribution() { +Status VOlapTableSinkV2::_incremental_open_streams( Review Comment: warning: method '_incremental_open_streams' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status VOlapTableSinkV2::_incremental_open_streams( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org