HappenLee commented on code in PR #48720:
URL: https://github.com/apache/doris/pull/48720#discussion_r2097134631


##########
be/src/vec/sink/tablet_sink_hash_partitioner.cpp:
##########
@@ -82,47 +85,59 @@ Status TabletSinkHashPartitioner::open(RuntimeState* state) 
{
     return Status::OK();
 }
 
-Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* 
block, bool eos,
-                                                  bool* already_sent) const {
+Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* 
block) const {
     _hash_vals.resize(block->rows());
     if (block->empty()) {
         return Status::OK();
     }
     std::fill(_hash_vals.begin(), _hash_vals.end(), -1);
-    bool has_filtered_rows = false;
-    int64_t filtered_rows = 0;
-    int64_t number_input_rows = _local_state->rows_input_counter()->value();
+    int64_t ___ = 0; // _local_state->rows_input_counter() updated in sink and 
write.
     std::shared_ptr<vectorized::Block> convert_block = 
std::make_shared<vectorized::Block>();
-    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
-            *block, convert_block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids,
-            number_input_rows));
-    if (_row_distribution.batching_rows() > 0) {
-        SCOPED_TIMER(_local_state->send_new_partition_timer());
-        RETURN_IF_ERROR(_send_new_partition_batch(state, block, eos));
-        *already_sent = true;
-    } else {
-        const auto& row_ids = _row_part_tablet_ids[0].row_ids;
-        const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
-        for (int idx = 0; idx < row_ids.size(); ++idx) {
-            const auto& row = row_ids[idx];
-            const auto& tablet_id_hash =
-                    HashUtil::zlib_crc_hash(&tablet_ids[idx], 
sizeof(HashValType), 0);
-            _hash_vals[row] = tablet_id_hash % _partition_count;
-        }
+    // add local_exchange before this node to deal row distribution
+    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(*block, 
convert_block,
+                                                                 
_row_part_tablet_ids, ___));
+    _skipped = _row_distribution.get_skipped();
+    const auto& row_ids = _row_part_tablet_ids[0].row_ids;
+    const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
+    for (int idx = 0; idx < row_ids.size(); ++idx) {
+        const auto& row = row_ids[idx];
+        const auto& tablet_id_hash =
+                HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType), 
0);
+        _hash_vals[row] = tablet_id_hash % _partition_count;
     }
 
     return Status::OK();
 }
 
+Status TabletSinkHashPartitioner::try_cut_in_line(Block& prior_block) const {
+    // check if we need send batching block first
+    if (_row_distribution.need_deal_batching()) {
+        {
+            SCOPED_TIMER(_local_state->send_new_partition_timer());
+            RETURN_IF_ERROR(_row_distribution.automatic_create_partition());

Review Comment:
   it's a blocked operation. should be here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to