This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c775f8e7bd [feature](move-memtable)[2/7] add protos for memtable on 
sink node (#23348)
c775f8e7bd is described below

commit c775f8e7bd5405f4a9c83538b516f53c9630debb
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Aug 24 11:11:46 2023 +0800

    [feature](move-memtable)[2/7] add protos for memtable on sink node (#23348)
    
    
    
    Co-authored-by: zhengyu <freeman.zhang1...@gmail.com>
    Co-authored-by: laihui <1353307...@qq.com>
---
 be/src/olap/rowset/rowset_writer.h  | 24 +++++++++++++++++
 gensrc/proto/internal_service.proto | 52 +++++++++++++++++++++++++++++++++++++
 gensrc/proto/olap_file.proto        |  7 +++++
 3 files changed, 83 insertions(+)

diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index 61cb20498a..21637a2379 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -24,6 +24,7 @@
 #include <optional>
 
 #include "common/factory_creator.h"
+#include "gen_cpp/olap_file.pb.h"
 #include "gutil/macros.h"
 #include "olap/column_mapping.h"
 #include "olap/rowset/rowset.h"
@@ -38,7 +39,30 @@ struct SegmentStatistics {
     int64_t data_size;
     int64_t index_size;
     KeyBoundsPB key_bounds;
+
+    SegmentStatistics() = default;
+
+    SegmentStatistics(SegmentStatisticsPB pb)
+            : row_num(pb.row_num()),
+              data_size(pb.data_size()),
+              index_size(pb.index_size()),
+              key_bounds(pb.key_bounds()) {}
+
+    void to_pb(SegmentStatisticsPB* segstat_pb) {
+        segstat_pb->set_row_num(row_num);
+        segstat_pb->set_data_size(data_size);
+        segstat_pb->set_index_size(index_size);
+        segstat_pb->mutable_key_bounds()->CopyFrom(key_bounds);
+    }
+
+    std::string to_string() {
+        std::stringstream ss;
+        ss << "row_num: " << row_num << ", data_size: " << data_size
+           << ", index_size: " << index_size << ", key_bounds: " << 
key_bounds.ShortDebugString();
+        return ss.str();
+    }
 };
+using SegmentStatisticsSharedPtr = std::shared_ptr<SegmentStatistics>;
 
 class RowsetWriter {
 public:
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 830ed3c41a..bdfc0d823a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -61,6 +61,12 @@ message PTabletWithPartition {
     required int64 tablet_id = 2;
 }
 
+message PTabletID {
+    optional int64 partition_id = 1;
+    optional int64 index_id = 2;
+    optional int64 tablet_id = 3;
+}
+
 message PTabletInfo {
     required int64 tablet_id = 1;
     required int32 schema_hash = 2;
@@ -692,6 +698,51 @@ message PGlobResponse {
     repeated PFileInfo files = 2;
 }
 
+message POpenStreamSinkRequest {
+    optional PUniqueId load_id = 1;
+    optional int64 txn_id = 2;
+    optional int64 src_id = 3;
+    optional POlapTableSchemaParam schema = 4;
+    repeated PTabletID tablets = 5;
+    optional bool enable_profile = 6 [default = false];
+}
+
+message PTabletSchemaWithIndex {
+    optional int64 index_id = 1;
+    optional TabletSchemaPB tablet_schema = 2;
+    optional bool enable_unique_key_merge_on_write = 3;
+}
+
+message POpenStreamSinkResponse {
+    optional PStatus status = 1;
+    repeated PTabletSchemaWithIndex tablet_schemas = 2;
+}
+
+message PWriteStreamSinkResponse {
+    optional PStatus status = 1;
+    repeated int64 success_tablet_ids = 2;
+    repeated int64 failed_tablet_ids = 3;
+    optional bytes load_stream_profile = 4;
+}
+
+message PStreamHeader {
+    enum Opcode {
+        APPEND_DATA = 1;
+        CLOSE_LOAD = 2;
+        ADD_SEGMENT = 3;
+    }
+    optional PUniqueId load_id = 1;
+    optional int64 partition_id = 2;
+    optional int64 index_id = 3;
+    optional int64 tablet_id = 4;
+    optional int32 segment_id = 5;
+    optional Opcode opcode = 6;
+    optional bool segment_eos = 7;
+    optional int64 src_id = 8;
+    optional SegmentStatisticsPB segment_statistics = 9;
+    repeated PTabletID tablets_to_commit = 10;
+}
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -703,6 +754,7 @@ service PBackendService {
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns 
(PCancelPlanFragmentResult);
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
+    rpc open_stream_sink(POpenStreamSinkRequest) returns 
(POpenStreamSinkResponse);
     rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_add_block_by_http(PEmptyRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns 
(PTabletWriterCancelResult);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 9a1a2686aa..fe3ac7915c 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -115,6 +115,13 @@ message RowsetMetaPB {
     optional SegmentsOverlapPB segments_overlap_pb = 51 [default = 
OVERLAP_UNKNOWN];
 }
 
+message SegmentStatisticsPB {
+    optional int64 row_num = 1;
+    optional int64 data_size = 2;
+    optional int64 index_size = 3;
+    optional KeyBoundsPB key_bounds = 4;
+}
+
 // kv value for reclaiming remote rowset
 message RemoteRowsetGcPB {
     required string resource_id = 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to