zclllyybb commented on code in PR #48720:
URL: https://github.com/apache/doris/pull/48720#discussion_r2719451155
##########
be/src/vec/sink/tablet_sink_hash_partitioner.cpp:
##########
@@ -81,45 +86,67 @@ Status TabletSinkHashPartitioner::open(RuntimeState* state)
{
return Status::OK();
}
-Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block*
block, bool eos,
- bool* already_sent) const {
+Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block*
block) const {
_hash_vals.resize(block->rows());
if (block->empty()) {
return Status::OK();
}
- std::fill(_hash_vals.begin(), _hash_vals.end(), -1);
- int64_t filtered_rows = 0;
- int64_t number_input_rows = _local_state->rows_input_counter()->value();
+
+ // tablet_id_hash % invalid_val never get invalid_val, so we use
invalid_val as sentinel value
+ const auto& invalid_val = _partition_count;
+ std::ranges::fill(_hash_vals, invalid_val);
+
+ int64_t dummy_stats = 0; // _local_state->rows_input_counter() updated in
sink and write.
std::shared_ptr<vectorized::Block> convert_block =
std::make_shared<vectorized::Block>();
+
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
- *block, convert_block, filtered_rows, _row_part_tablet_ids,
number_input_rows));
- if (_row_distribution.batching_rows() > 0) {
- SCOPED_TIMER(_local_state->send_new_partition_timer());
- RETURN_IF_ERROR(_send_new_partition_batch(state, block, eos));
- *already_sent = true;
- } else {
- const auto& row_ids = _row_part_tablet_ids[0].row_ids;
- const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
- for (int idx = 0; idx < row_ids.size(); ++idx) {
- const auto& row = row_ids[idx];
- const auto& tablet_id_hash =
- HashUtil::zlib_crc_hash(&tablet_ids[idx],
sizeof(HashValType), 0);
- _hash_vals[row] = tablet_id_hash % _partition_count;
+ *block, convert_block, _row_part_tablet_ids, dummy_stats));
+ _skipped = _row_distribution.get_skipped();
+ const auto& row_ids = _row_part_tablet_ids[0].row_ids;
+ const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
+
+ for (int idx = 0; idx < row_ids.size(); ++idx) {
+ const auto& row = row_ids[idx];
+ const auto& tablet_id_hash =
+ HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(HashValType),
0);
+ _hash_vals[row] = tablet_id_hash % invalid_val;
+ }
+
+ // _hash_val == -1 = (_skipped = 1 or filtered = 1)
+#ifndef NDEBUG
+ for (size_t i = 0; i < _skipped.size(); ++i) {
+ if (_skipped[i]) {
Review Comment:
上面处理的是row_ids,这里处理的是_skipped,恰好不相交。row_ids返回不包含_skipped
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]