github-actions[bot] commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3375024673
##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java:
##########
@@ -1629,13 +1664,87 @@ public OlapTableStreamUpdate getStreamUpdate() {
Map<Long, Long> prev = Maps.newHashMap();
Map<Long, Long> next = Maps.newHashMap();
for (Long partitionId : getSelectedPartitionIds()) {
- Pair<Long, Long> streamUpdate = ((OlapTableStreamWrapper)
olapTable).getStreamUpdate(partitionId);
+ Pair<Long, Long> streamUpdate = getStreamUpdate(partitionId);
if (streamUpdate.first != null) {
- // prev could be null, ignore
+ // prev could be null, in case of historical scan
prev.put(partitionId, streamUpdate.first);
}
- next.put(partitionId, streamUpdate.second);
+ if (streamUpdate.second != null) {
Review Comment:
Because the DUP_KEYS fallback is part of the first branch, any unknown
`incrementType` on a duplicate-key table is silently accepted and coerced to
`APPEND_ONLY`. For example, `SELECT * FROM
dup_tbl@incr("incrementType"="TYPO")` reaches `parseBinlogScanType()`,
`olapTable.getKeysType() == DUP_KEYS` makes this branch true, and the
`Unsupported increment type` error is skipped. Please validate the string value
first, then apply the duplicate-table default/fallback only for an
omitted/default increment type.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java:
##########
@@ -21,11 +21,13 @@
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RowBinlogTableWrapper;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.stream.AbstractTableStreamUpdate;
import org.apache.doris.catalog.stream.OlapTableStreamUpdate;
Review Comment:
This collects stream scans so `TableStreamUpdateInfo` can be attached to the
transaction, but that update is only consumed by the non-cloud
`GlobalTransactionMgr`/`DatabaseTransactionMgr` path.
`CloudGlobalTransactionMgr.commitAndPublishTransaction()` commits through
meta-service directly and never checks or applies
`transactionState.getStreamUpdateInfos()`, so in cloud mode `INSERT INTO target
SELECT ... FROM stream` can commit the target rows without advancing the stream
offset; the next consumption will read the same changes again. Please either
update the cloud commit path to lock/check/apply the stream offsets atomically,
or reject table-stream consumption in cloud mode until it is supported.
##########
be/src/storage/iterator/block_reader.cpp:
##########
@@ -74,6 +77,302 @@ Status BlockReader::next_block_with_aggregation(Block*
block, bool* eof) {
return res;
}
+Status BlockReader::_ensure_binlog_column_pos(const Block& src_block) {
+ if (_binlog_column_pos_inited) {
+ if (_binlog_op_pos >= 0 && _binlog_op_pos < src_block.columns() &&
+ src_block.get_by_position(_binlog_op_pos).name ==
kRowBinlogOpColName) {
+ return Status::OK();
+ }
+ _binlog_op_pos = -1;
+ _binlog_lsn_pos = -1;
+ _binlog_timestamp_pos = -1;
+ _binlog_column_pos_inited = false;
+ }
+
+ const uint32_t col_num = src_block.columns();
+ _before_column_idx.resize(col_num);
+ for (uint32_t i = 0; i < col_num; ++i) {
+ const auto& name = src_block.get_by_position(i).name;
+ if (name == kRowBinlogOpColName) {
+ _binlog_op_pos = static_cast<int>(i);
+ } else if (name == kRowBinlogLsnColName) {
+ _binlog_lsn_pos = static_cast<int>(i);
+ } else if (name == kRowBinlogTimestampColName) {
+ _binlog_timestamp_pos = static_cast<int>(i);
+ } else {
+ std::string before_name = build_before_column_name(name);
+ int tmp_idx = src_block.get_position_by_name(before_name);
+ _before_column_idx[i] = tmp_idx < 0 ? i : tmp_idx;
+ }
+ }
+ _binlog_column_pos_inited = true;
+ return Status::OK();
+}
+
+int64_t BlockReader::_read_binlog_op(const IColumn& col, size_t row) const {
+ const IColumn* cur = &col;
+ if (const auto* nullable = check_and_get_column<ColumnNullable>(*cur)) {
+ if (nullable->is_null_at(row)) {
+ return ROW_BINLOG_UNKNOWN;
+ }
+ cur = &nullable->get_nested_column();
+ }
+
+ if (const auto* int64_col = check_and_get_column<ColumnInt64>(*cur)) {
+ return int64_col->get_element(row);
+ }
+
+ return ROW_BINLOG_UNKNOWN;
+}
+
+Status BlockReader::_write_binlog_op(IColumn& col, int64_t op) const {
+ IColumn* cur = &col;
+ ColumnNullable* nullable = nullptr;
+ if (auto* n = typeid_cast<ColumnNullable*>(cur)) {
+ nullable = n;
+ cur = &nullable->get_nested_column();
+ }
+
+ if (auto* int64_col = typeid_cast<ColumnInt64*>(cur)) {
+ int64_col->insert_value(op);
+ } else {
+ return Status::InternalError("invalid column type");
+ }
+
+ if (nullable != nullptr) {
+ nullable->get_null_map_data().push_back(0);
+ }
+ return Status::OK();
+}
+
+bool BlockReader::_is_binlog_meta_column(int idx) const {
+ return idx == _binlog_op_pos || idx == _binlog_lsn_pos || idx ==
_binlog_timestamp_pos;
+}
+
+int BlockReader::_resolve_source_column_index(int idx, bool use_before) const {
+ if (!use_before || _is_binlog_meta_column(idx)) {
+ return idx;
+ }
+
+ return _before_column_idx[idx];
+}
+
+void BlockReader::_init_pending_row_columns(const Block& block) {
+ if (!_pending_row_columns.empty()) {
+ return;
+ }
+ _pending_row_columns = block.clone_empty_columns();
+}
+
+bool BlockReader::_emit_pending_row(MutableColumns& target_columns, size_t&
output_row_count) {
+ if (!_has_pending_row) {
+ return false;
+ }
+ for (size_t i = 0; i < _pending_row_columns.size(); ++i) {
+ target_columns[i]->insert_from(*_pending_row_columns[i], 0);
+ _pending_row_columns[i]->clear();
+ }
+ _has_pending_row = false;
+ output_row_count++;
+ return true;
+}
+
+Status BlockReader::_append_change_row(MutableColumns& target_columns, const
Block& src_block,
+ size_t row_pos, int64_t output_op, bool
use_before) {
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (target_col_idx < 0) {
+ continue;
+ }
+ if (idx == _binlog_op_pos) {
+ RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
output_op));
+ continue;
+ }
+ int source_idx = _resolve_source_column_index(idx, use_before);
+
target_columns[target_col_idx]->insert_from(*src_block.get_by_position(source_idx).column,
+ row_pos);
+ }
+ return Status::OK();
+}
+
+Status BlockReader::_min_delta_next_block(Block* block, bool* eof) {
+ if (UNLIKELY(_eof && !_has_pending_row)) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ if (_stored_data_columns.empty()) {
+ _stored_data_columns = _next_row.block->clone_empty_columns();
+ }
+
+ auto target_columns_guard = block->mutate_columns_scoped();
+ auto& target_columns = target_columns_guard.mutable_columns();
+ size_t output_row_count = 0;
+ _init_pending_row_columns(*block);
+ RETURN_IF_ERROR(_ensure_binlog_column_pos(*_next_row.block));
+ while (output_row_count < batch_max_rows()) {
+ if (_emit_pending_row(target_columns, output_row_count)) {
+ continue;
+ }
+ if (_eof) {
+ break;
+ }
+ bool need_pop = _stored_data_columns[0]->size() > 1;
+ for (size_t i = 0; i < _stored_data_columns.size(); ++i) {
+ if (need_pop) {
+ _stored_data_columns[i]->pop_back(1);
+ }
+
_stored_data_columns[i]->insert_from(*_next_row.block->get_by_position(i).column,
+ _next_row.row_pos);
+ }
+ auto res = _vcollect_iter.next(&_next_row);
+ if (UNLIKELY(res.is<END_OF_FILE>())) {
+ _eof = true;
+ } else if (UNLIKELY(!res.ok())) {
+ return res;
+ }
+
+ if (!_eof && _next_row.is_same) {
+ continue;
+ }
+ size_t group_size = _stored_data_columns[0]->size();
+ auto first_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos],
0);
+ auto last_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos],
group_size - 1);
+ auto result = AggregateFunctionMinDelta::calculate_result(first_op,
last_op);
+ switch (result) {
+ case AggregateFunctionMinDelta::ResultType::SKIP:
+ break;
+ case AggregateFunctionMinDelta::ResultType::INSERT:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+ STREAM_CHANGE_INSERT));
+ } else {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size -
1);
+ }
+ }
+ output_row_count++;
+ break;
+ case AggregateFunctionMinDelta::ResultType::DELETE:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+ STREAM_CHANGE_DELETE));
+ } else {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size -
1);
+ }
+ }
+ output_row_count++;
+ break;
+ case AggregateFunctionMinDelta::ResultType::UPDATE_BEFORE_AFTER:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_BEFORE));
+ } else if (idx == _binlog_lsn_pos) {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size -
1);
+ } else {
+ int source_idx = _resolve_source_column_index(idx, true);
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[source_idx],
+ 0);
+ }
+ }
+ output_row_count++;
+ if (output_row_count >= batch_max_rows()) {
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*_pending_row_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_AFTER));
+ } else {
+ _pending_row_columns[target_col_idx]->insert_from(
+ *_stored_data_columns[idx], group_size - 1);
+ }
+ }
+ _has_pending_row = true;
+ } else {
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_AFTER));
+ } else {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size
- 1);
+ }
+ }
+ output_row_count++;
+ }
+ break;
+ }
+
+ for (auto& col : _stored_data_columns) {
+ col->clear();
Review Comment:
`_init_collect_iter()` sets `_eof = true` when `current_row(&_next_row)`
returns `END_OF_FILE`, but a DETAIL scan still installs
`_detail_change_next_block()`. On the first `next_block_with_aggregation()` for
an empty row-binlog range, or for a start/end TSO range filtered down to no
rows, this line dereferences `_next_row.block` before checking `_eof` or
`_next_row.block == nullptr`, so the query can crash instead of returning EOF.
Please mirror the `_min_delta_next_block()` guard and return `*eof = true`
before touching `_next_row.block` when `_eof && !_has_pending_row`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]