imay commented on a change in pull request #2956: Add multi-thread ver 
olaptablesink for LOAD
URL: https://github.com/apache/incubator-doris/pull/2956#discussion_r385502512
 
 

 ##########
 File path: be/src/exec/tablet_sink.h
 ##########
 @@ -141,26 +191,78 @@ class IndexChannel {
     std::unordered_map<int64_t, NodeChannel*> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
+
+    int64_t _serialize_batch_ns = 0;
+    int64_t _wait_in_flight_packet_ns = 0;
+
+    // BeId -> AddBatchCounter
+    std::unordered_map<int64_t, AddBatchCounter> _add_batch_counter_map;
 };
 
-// The counter of add_batch rpc of a single node
-struct AddBatchCounter {
-    // total execution time of a add_batch rpc
-    int64_t add_batch_execution_time_ns = 0;
-    // lock waiting time in a add_batch rpc
-    int64_t add_batch_wait_lock_time_ns = 0;
-    // number of add_batch call
-    int64_t add_batch_num = 0;
+// RowBuffer is used for multi-thread version of OlapTableSink, it's 
single-productor/single-consumer.
+// In multi-thread version, OlapTableSink will create multi RowBuffers, and 
create the same number threads to exec RowBuffer::consume_process.
+// Only one thread(OlapTableSink::send) exec push op, use modular 
hashing(node_id%buffer_num) to specify the buffer for which the row should be 
pushed into.
+class RowBuffer {
+public:
+    RowBuffer(TupleDescriptor* tuple_desc, int64_t byte_limit, int64_t 
size_limit)
+            : _off(false),
+              _consume_err(false),
+              _tuple_desc(tuple_desc),
+              _queue_runtime_size(size_limit),
+              _queue(size_limit),
+              _mem_tracker(new MemTracker(byte_limit)),
+              _buffer_pool(new MemPool(_mem_tracker.get())) {}
+
+    // push method won't generate error, it returns error only if buffer is 
not workable
+    // only be called from the producer thread
+    Status push(IndexChannel* index_ch, NodeChannel* node_ch, int64_t 
tablet_id, Tuple* tuple);
+
+    // the thread function of consumer thread
+    bool consume_process(int buffer_id);
+
+    // disable pushing item to buffer, but items in buffer will continue to be 
consumed
+    void turn_off() { _off = true; }
+
+    // there's no need for productor to differentiate off and error
+    bool workable() { return !_off && !_consume_err; }
+
+    void report_time(int buffer_id) {
+        LOG(INFO) << "buffer " << buffer_id << " time report: {consumed rows: 
" << _consume_count
+                  << ", mem_handle: " << _mem_handle_ns / 1e9
+                  << "s, deep_copy: " << _deep_copy_ns / 1e9
+                  << "s, spsc push block if full: " << _spsc_push_ns / 1e9
+                  << "s, consume: " << _consume_ns / 1e9
+                  << "s, actual consume: " << _actual_consume_ns / 1e9 << "s}";
+    }
+
+private:
+    std::atomic<bool> _off;
+    std::atomic<bool> _consume_err;
+
+    TupleDescriptor* _tuple_desc = nullptr;
+
+    std::size_t _queue_runtime_size;
+    // 
https://www.boost.org/doc/libs/1_64_0/doc/html/lockfree/examples.html#lockfree.examples.waitfree_single_producer_single_consumer_queue
+    boost::lockfree::spsc_queue<std::tuple<IndexChannel*, NodeChannel*, 
int64_t, Tuple*>> _queue;
+
+    boost::scoped_ptr<MemTracker> _mem_tracker;
+    boost::scoped_ptr<MemPool> _buffer_pool;
 
 Review comment:
   better to use std::unique_ptr

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to