HappenLee commented on code in PR #57888:
URL: https://github.com/apache/doris/pull/57888#discussion_r2520972503


##########
be/src/vec/runtime/vdata_stream_recvr.cpp:
##########
@@ -209,22 +190,85 @@ Status 
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
 
     _block_queue.emplace_back(std::move(pblock), block_byte_size);
     COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
-    _record_debug_info();
     set_source_ready(l);
 
     // if done is nullptr, this function can't delay this response
     if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
-        MonotonicStopWatch monotonicStopWatch;
-        monotonicStopWatch.start();
-        DCHECK(*done != nullptr);
-        _pending_closures.emplace_back(*done, monotonicStopWatch);
+        _block_queue.back().set_done(*done);
         *done = nullptr;
     }
     _recvr->_memory_used_counter->update(block_byte_size);
     add_blocks_memory_usage(block_byte_size);
     return Status::OK();
 }
 
+Status VDataStreamRecvr::SenderQueue::add_blocks(const PTransmitDataParams* 
request,
+                                                 ::google::protobuf::Closure** 
done,
+                                                 const int64_t wait_for_worker,
+                                                 const uint64_t 
time_to_find_recvr) {
+    {
+        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
+        if (_is_cancelled) {
+            return Status::OK();
+        }
+        const int be_number = request->be_number();
+        const int64_t packet_seq = request->packet_seq();
+        auto iter = _packet_seq_map.find(be_number);
+        if (iter != _packet_seq_map.end()) {
+            if (iter->second >= packet_seq) {
+                return Status::InternalError(
+                        "packet already exist [cur_packet_id= {} 
receive_packet_id={}]",
+                        iter->second, packet_seq);
+            }
+            iter->second = packet_seq;
+        } else {
+            _packet_seq_map.emplace(be_number, packet_seq);

Review Comment:
   the packet_seq seem different with origin logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to