HappenLee commented on code in PR #48720: URL: https://github.com/apache/doris/pull/48720#discussion_r2097134631
########## be/src/vec/sink/tablet_sink_hash_partitioner.cpp: ########## @@ -82,47 +85,59 @@ Status TabletSinkHashPartitioner::open(RuntimeState* state) { return Status::OK(); } -Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* block, bool eos, - bool* already_sent) const { +Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* block) const { _hash_vals.resize(block->rows()); if (block->empty()) { return Status::OK(); } std::fill(_hash_vals.begin(), _hash_vals.end(), -1); - bool has_filtered_rows = false; - int64_t filtered_rows = 0; - int64_t number_input_rows = _local_state->rows_input_counter()->value(); + int64_t ___ = 0; // _local_state->rows_input_counter() updated in sink and write. std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>(); - RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( - *block, convert_block, filtered_rows, has_filtered_rows, _row_part_tablet_ids, - number_input_rows)); - if (_row_distribution.batching_rows() > 0) { - SCOPED_TIMER(_local_state->send_new_partition_timer()); - RETURN_IF_ERROR(_send_new_partition_batch(state, block, eos)); - *already_sent = true; - } else { - const auto& row_ids = _row_part_tablet_ids[0].row_ids; - const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids; - for (int idx = 0; idx < row_ids.size(); ++idx) { - const auto& row = row_ids[idx]; - const auto& tablet_id_hash = - HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType), 0); - _hash_vals[row] = tablet_id_hash % _partition_count; - } + // add local_exchange before this node to deal row distribution + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(*block, convert_block, + _row_part_tablet_ids, ___)); + _skipped = _row_distribution.get_skipped(); + const auto& row_ids = _row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType), 0); + _hash_vals[row] = tablet_id_hash % _partition_count; } return Status::OK(); } +Status TabletSinkHashPartitioner::try_cut_in_line(Block& prior_block) const { + // check if we need send batching block first + if (_row_distribution.need_deal_batching()) { + { + SCOPED_TIMER(_local_state->send_new_partition_timer()); + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); Review Comment: it's a blocked operation. should be here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org