imay commented on a change in pull request #3143: Non blocking OlapTableSink
URL: https://github.com/apache/incubator-doris/pull/3143#discussion_r395407530
 
 

 ##########
 File path: be/src/exec/tablet_sink.h
 ##########
 @@ -95,72 +193,79 @@ class NodeChannel {
     TupleDescriptor* _tuple_desc = nullptr;
     const NodeInfo* _node_info = nullptr;
 
-    bool _already_failed = false;
-    bool _has_in_flight_packet = false;
     // this should be set in init() using config
     int _rpc_timeout_ms = 60000;
     int64_t _next_packet_seq = 0;
 
-    std::unique_ptr<RowBatch> _batch;
+    std::atomic<bool> _rpc_error{false};
+    std::atomic<bool> _is_cancelled{false};
+
+    std::atomic<bool> _send_finished{false};
+    std::atomic<bool> _add_batches_finished{false};
+
+    bool _eos_is_produced{false}; // only for restricting producer behaviors
+
+    std::unique_ptr<RowDescriptor> _row_desc;
+    int _batch_size = 0;
+    std::unique_ptr<RowBatch> _cur_batch;
+    PTabletWriterAddBatchRequest _cur_add_batch_request;
+
+    std::mutex _pending_batches_lock;
+    using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, 
PTabletWriterAddBatchRequest>;
+    std::queue<AddBatchReq> _pending_batches;
+    std::atomic<int> _pending_batches_num{0};
+
     palo::PInternalService_Stub* _stub = nullptr;
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
-    RefCountClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
+    ReusableClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
 
     std::vector<TTabletWithPartition> _all_tablets;
-    PTabletWriterAddBatchRequest _add_batch_request;
+    std::vector<TTabletCommitInfo> _tablet_commit_infos;
+
+    AddBatchCounter _add_batch_counter;
+    int64_t _queue_push_lock_ns = 0;
+    int64_t _serialize_batch_ns = 0;
+    int64_t _actual_consume_ns = 0;
 };
 
 class IndexChannel {
 public:
     IndexChannel(OlapTableSink* parent, int64_t index_id, int32_t schema_hash)
-            : _parent(parent), _index_id(index_id),
-            _schema_hash(schema_hash) {
-    }
+            : _parent(parent), _index_id(index_id), _schema_hash(schema_hash) 
{}
     ~IndexChannel();
 
-    Status init(RuntimeState* state,
-                const std::vector<TTabletWithPartition>& tablets);
-    Status open();
-    Status add_row(Tuple* tuple, int64_t tablet_id);
+    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets);
 
-    Status close(RuntimeState* state);
+    Status add_row(Tuple* tuple, int64_t tablet_id);
 
-    void cancel();
+    void for_each_node_channel(std::function<void(NodeChannel*)> func) {
+        for (auto& it : _node_channels) {
+            func(it.second);
+        }
+    }
 
-private:
-    // return true if this load can't success.
-    bool _handle_failed_node(NodeChannel* channel);
+    void mark_as_failed(NodeChannel* ch) { 
_failed_channels.insert(ch->node_id()); }
+    bool has_intolerable_failure();
 
 private:
     OlapTableSink* _parent;
     int64_t _index_id;
     int32_t _schema_hash;
-    int _num_failed_channels = 0;
 
     // BeId -> channel
     std::unordered_map<int64_t, NodeChannel*> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
-};
-
-// The counter of add_batch rpc of a single node
-struct AddBatchCounter {
-    // total execution time of a add_batch rpc
-    int64_t add_batch_execution_time_ns = 0;
-    // lock waiting time in a add_batch rpc
-    int64_t add_batch_wait_lock_time_ns = 0;
-    // number of add_batch call
-    int64_t add_batch_num = 0;
+    // BeId
+    std::set<int64_t> _failed_channels;
 };
 
 // write data to Olap Table.
 // this class distributed data according to
 class OlapTableSink : public DataSink {
 
 Review comment:
   Better to add some comments to explain how the work is done, which will make 
others understand this code easily.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to