github-actions[bot] commented on code in PR #23135: URL: https://github.com/apache/doris/pull/23135#discussion_r1303084162
########## be/test/runtime/load_stream_test.cpp: ########## @@ -0,0 +1,1145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <brpc/channel.h> +#include <brpc/server.h> +#include <brpc/stream.h> +#include <butil/logging.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/internal_service.pb.h> +#include <gflags/gflags.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <olap/storage_engine.h> +#include <service/internal_service.h> +#include <unistd.h> + +#include <functional> + +#include "common/config.h" +#include "common/status.h" +#include "exec/tablet_info.h" +#include "gen_cpp/BackendService_types.h" +#include "gen_cpp/FrontendService_types.h" +#include "gtest/gtest_pred_impl.h" +#include "olap/olap_define.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/tablet_manager.h" +#include "olap/txn_manager.h" +#include "runtime/descriptor_helper.h" +#include "runtime/exec_env.h" +#include "runtime/load_stream_mgr.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/runtime_profile.h" + +using namespace brpc; + +namespace doris { + +static const uint32_t MAX_PATH_LEN = 1024; +StorageEngine* z_engine = nullptr; +static const std::string zTestDir = "./data_test/data/load_stream_mgr_test"; + +const int64_t NORMAL_TABLET_ID = 10000; +const int64_t ABNORMAL_TABLET_ID = 40000; +const int64_t NORMAL_INDEX_ID = 50000; +const int64_t ABNORMAL_INDEX_ID = 60000; +const int64_t NORMAL_PARTITION_ID = 50000; +const int64_t SCHEMA_HASH = 90000; +const uint32_t NORMAL_SENDER_ID = 0; +const uint32_t ABNORMAL_SENDER_ID = 10000; +const int64_t NORMAL_TXN_ID = 600001; +const UniqueId NORMAL_LOAD_ID(1, 1); +const UniqueId ABNORMAL_LOAD_ID(1, 0); +std::string ABNORMAL_STRING("abnormal"); + +void construct_schema(OlapTableSchemaParam* schema) { + // construct schema + TOlapTableSchemaParam tschema; + tschema.db_id = 1; + tschema.table_id = 2; + tschema.version = 0; + + // descriptor + { + TDescriptorTableBuilder dtb; + { + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(10) + .column_name("c3") + .column_pos(3) + .build()); + + tuple_builder.build(&dtb); + } + { + TTupleDescriptorBuilder tuple_builder; + + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("c1") + .column_pos(1) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("c2") + .column_pos(2) + .build()); + tuple_builder.add_slot(TSlotDescriptorBuilder() + .string_type(20) + .column_name("c3") + .column_pos(3) + .build()); + + tuple_builder.build(&dtb); + } + + auto desc_tbl = dtb.desc_tbl(); + tschema.slot_descs = desc_tbl.slotDescriptors; + tschema.tuple_desc = desc_tbl.tupleDescriptors[0]; + } + // index + tschema.indexes.resize(2); + tschema.indexes[0].id = NORMAL_INDEX_ID; + tschema.indexes[0].columns = {"c1", "c2", "c3"}; + + tschema.indexes[1].id = NORMAL_INDEX_ID + 1; + tschema.indexes[1].columns = {"c1", "c2", "c3"}; + + schema->init(tschema); +} + +// copied from delta_writer_test.cpp +static void create_tablet_request(int64_t tablet_id, int32_t schema_hash, + TCreateTabletReq* request) { + request->tablet_id = tablet_id; + request->__set_version(1); + request->tablet_schema.schema_hash = schema_hash; + request->tablet_schema.short_key_column_count = 6; + request->tablet_schema.keys_type = TKeysType::AGG_KEYS; + request->tablet_schema.storage_type = TStorageType::COLUMN; + request->__set_storage_format(TStorageFormat::V2); + + TColumn k1; + + k1.__set_is_key(true); + k1.column_type.type = TPrimitiveType::TINYINT; + request->tablet_schema.columns.push_back(k1); + + TColumn k2; + k2.column_name = "k2"; + k2.__set_is_key(true); + k2.column_type.type = TPrimitiveType::SMALLINT; + request->tablet_schema.columns.push_back(k2); + + TColumn k3; + k3.column_name = "k3"; + k3.__set_is_key(true); + k3.column_type.type = TPrimitiveType::INT; + request->tablet_schema.columns.push_back(k3); + + TColumn k4; + k4.column_name = "k4"; + k4.__set_is_key(true); + k4.column_type.type = TPrimitiveType::BIGINT; + request->tablet_schema.columns.push_back(k4); + + TColumn k5; + k5.column_name = "k5"; + k5.__set_is_key(true); + k5.column_type.type = TPrimitiveType::LARGEINT; + request->tablet_schema.columns.push_back(k5); + + TColumn k6; + k6.column_name = "k6"; + k6.__set_is_key(true); + k6.column_type.type = TPrimitiveType::DATE; + request->tablet_schema.columns.push_back(k6); + + TColumn k7; + k7.column_name = "k7"; + k7.__set_is_key(true); + k7.column_type.type = TPrimitiveType::DATETIME; + request->tablet_schema.columns.push_back(k7); + + TColumn k8; + k8.column_name = "k8"; + k8.__set_is_key(true); + k8.column_type.type = TPrimitiveType::CHAR; + k8.column_type.__set_len(4); + request->tablet_schema.columns.push_back(k8); + + TColumn k9; + k9.column_name = "k9"; + k9.__set_is_key(true); + k9.column_type.type = TPrimitiveType::VARCHAR; + k9.column_type.__set_len(65); + request->tablet_schema.columns.push_back(k9); + + TColumn k10; + k10.column_name = "k10"; + k10.__set_is_key(true); + k10.column_type.type = TPrimitiveType::DECIMALV2; + k10.column_type.__set_precision(6); + k10.column_type.__set_scale(3); + request->tablet_schema.columns.push_back(k10); + + TColumn k11; + k11.column_name = "k11"; + k11.__set_is_key(true); + k11.column_type.type = TPrimitiveType::DATEV2; + request->tablet_schema.columns.push_back(k11); + + TColumn v1; + v1.column_name = "v1"; + v1.__set_is_key(false); + v1.column_type.type = TPrimitiveType::TINYINT; + v1.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v1); + + TColumn v2; + v2.column_name = "v2"; + v2.__set_is_key(false); + v2.column_type.type = TPrimitiveType::SMALLINT; + v2.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v2); + + TColumn v3; + v3.column_name = "v3"; + v3.__set_is_key(false); + v3.column_type.type = TPrimitiveType::INT; + v3.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v3); + + TColumn v4; + v4.column_name = "v4"; + v4.__set_is_key(false); + v4.column_type.type = TPrimitiveType::BIGINT; + v4.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v4); + + TColumn v5; + v5.column_name = "v5"; + v5.__set_is_key(false); + v5.column_type.type = TPrimitiveType::LARGEINT; + v5.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v5); + + TColumn v6; + v6.column_name = "v6"; + v6.__set_is_key(false); + v6.column_type.type = TPrimitiveType::DATE; + v6.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v6); + + TColumn v7; + v7.column_name = "v7"; + v7.__set_is_key(false); + v7.column_type.type = TPrimitiveType::DATETIME; + v7.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v7); + + TColumn v8; + v8.column_name = "v8"; + v8.__set_is_key(false); + v8.column_type.type = TPrimitiveType::CHAR; + v8.column_type.__set_len(4); + v8.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v8); + + TColumn v9; + v9.column_name = "v9"; + v9.__set_is_key(false); + v9.column_type.type = TPrimitiveType::VARCHAR; + v9.column_type.__set_len(65); + v9.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v9); + + TColumn v10; + v10.column_name = "v10"; + v10.__set_is_key(false); + v10.column_type.type = TPrimitiveType::DECIMALV2; + v10.column_type.__set_precision(6); + v10.column_type.__set_scale(3); + v10.__set_aggregation_type(TAggregationType::SUM); + request->tablet_schema.columns.push_back(v10); + + TColumn v11; + v11.column_name = "v11"; + v11.__set_is_key(false); + v11.column_type.type = TPrimitiveType::DATEV2; + v11.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(v11); +} + +struct ResponseStat { + std::atomic<int32_t> num; + std::vector<int64_t> success_tablet_ids; + std::vector<int64_t> failed_tablet_ids; +}; +bthread::Mutex g_stat_lock; +static ResponseStat g_response_stat; + +void reset_response_stat() { + std::lock_guard lock_guard(g_stat_lock); + g_response_stat.num = 0; + g_response_stat.success_tablet_ids.clear(); + g_response_stat.failed_tablet_ids.clear(); +} + +class LoadStreamMgrTest : public testing::Test { +public: + class Handler : public brpc::StreamInputHandler { + public: + int on_received_messages(StreamId id, butil::IOBuf* const messages[], + size_t size) override { + for (size_t i = 0; i < size; i++) { + PWriteStreamSinkResponse response; + butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); + response.ParseFromZeroCopyStream(&wrapper); + LOG(INFO) << "response " << response.DebugString(); + std::lock_guard lock_guard(g_stat_lock); + for (auto& id : response.success_tablet_ids()) { + g_response_stat.success_tablet_ids.push_back(id); + } + for (auto& id : response.failed_tablet_ids()) { + g_response_stat.failed_tablet_ids.push_back(id); + } + g_response_stat.num++; + } + + return 0; + } + void on_idle_timeout(StreamId id) override { std::cerr << "on_idle_timeout" << std::endl; } + void on_closed(StreamId id) override { std::cerr << "on_closed" << std::endl; } + }; + + class StreamService : public PBackendService { + public: + StreamService(LoadStreamMgr* load_stream_mgr) + : _sd(brpc::INVALID_STREAM_ID), _load_stream_mgr(load_stream_mgr) {} + virtual ~StreamService() { brpc::StreamClose(_sd); }; + virtual void open_stream_sink(google::protobuf::RpcController* controller, + const POpenStreamSinkRequest* request, + POpenStreamSinkResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + std::unique_ptr<PStatus> status = std::make_unique<PStatus>(); + brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); + brpc::StreamOptions stream_options; + + for (const auto& req : request->tablets()) { + TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager(); + TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id()); + if (tablet == nullptr) { + cntl->SetFailed("Tablet not found"); + status->set_status_code(TStatusCode::NOT_FOUND); + response->set_allocated_status(status.get()); + response->release_status(); + return; + } + auto resp = response->add_tablet_schemas(); + resp->set_index_id(req.index_id()); + resp->set_enable_unique_key_merge_on_write( + tablet->enable_unique_key_merge_on_write()); + tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); + } + + LoadStreamSharedPtr load_stream; + auto st = _load_stream_mgr->open_load_stream(request, load_stream); + + stream_options.handler = load_stream.get(); + + StreamId streamid; + if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { + cntl->SetFailed("Fail to accept stream"); + status->set_status_code(TStatusCode::CANCELLED); + response->set_allocated_status(status.get()); + response->release_status(); + return; + } + + load_stream->add_rpc_stream(); + + status->set_status_code(TStatusCode::OK); + response->set_allocated_status(status.get()); + response->release_status(); + } + + private: + Handler _receiver; + brpc::StreamId _sd; + LoadStreamMgr* _load_stream_mgr = nullptr; + }; + + class MockSinkClient { + public: + MockSinkClient() = default; + ~MockSinkClient() { disconnect(); } + + class MockClosure : public google::protobuf::Closure { + public: + MockClosure(std::function<void()> cb) : _cb(cb) {} + void Run() override { + _cb(); + delete this; + } + + private: + std::function<void()> _cb; + }; + + Status connect_stream() { + brpc::Channel channel; + std::cerr << "connect_stream" << std::endl; + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = "single"; + options.timeout_ms = 10000 /*milliseconds*/; + options.max_retry = 3; + CHECK_EQ(0, channel.Init("127.0.0.1:18947", nullptr)); + + // Normally, you should not call a Channel directly, but instead construct + // a stub Service wrapping it. stub can be shared by all threads as well. + PBackendService_Stub stub(&channel); + + _stream_options.handler = &_handler; + if (brpc::StreamCreate(&_stream, _cntl, &_stream_options) != 0) { + LOG(ERROR) << "Fail to create stream"; + return Status::InternalError("Fail to create stream"); + } + + POpenStreamSinkRequest request; + POpenStreamSinkResponse response; + PUniqueId id; + id.set_hi(1); + id.set_lo(1); + + OlapTableSchemaParam param; + construct_schema(¶m); + *request.mutable_schema() = *param.to_protobuf(); + *request.mutable_load_id() = id; + request.set_txn_id(NORMAL_TXN_ID); + auto ptablet = request.add_tablets(); + ptablet->set_tablet_id(NORMAL_TABLET_ID); + ptablet->set_index_id(NORMAL_INDEX_ID); + stub.open_stream_sink(&_cntl, &request, &response, nullptr); + if (_cntl.Failed()) { + std::cerr << "open_stream_sink failed" << std::endl; + LOG(ERROR) << "Fail to open stream sink " << _cntl.ErrorText(); + return Status::InternalError("Fail to open stream sink"); + } + + return Status::OK(); + } + + void disconnect() const { + std::cerr << "disconnect" << std::endl; + CHECK_EQ(0, brpc::StreamClose(_stream)); + } + + Status send(butil::IOBuf* buf) { + int ret = brpc::StreamWrite(_stream, *buf); + if (ret != 0) { + LOG(ERROR) << "Fail to write stream"; + return Status::InternalError("Fail to write stream"); + } + LOG(INFO) << "sent by stream successfully" << std::endl; + return Status::OK(); + } + + Status close() { return Status::OK(); } + + private: + brpc::StreamId _stream; + brpc::Controller _cntl; + brpc::StreamOptions _stream_options; + Handler _handler; + }; + + LoadStreamMgrTest() Review Comment: warning: use '= default' to define a trivial default constructor [modernize-use-equals-default] be/test/runtime/load_stream_test.cpp:488: ```diff - _light_work_pool(4, 32, "load_stream_test_light") {} + _light_work_pool(4, 32, "load_stream_test_light") = default; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org